diff --git a/commafeed-client/src/pages/admin/MetricsPage.tsx b/commafeed-client/src/pages/admin/MetricsPage.tsx index bea5c3dc..7bcc664b 100644 --- a/commafeed-client/src/pages/admin/MetricsPage.tsx +++ b/commafeed-client/src/pages/admin/MetricsPage.tsx @@ -1,7 +1,6 @@ -import { Accordion, Box, Tabs } from "@mantine/core" +import { Accordion, Tabs } from "@mantine/core" import { client } from "app/client" import { Loader } from "components/Loader" -import { Gauge } from "components/metrics/Gauge" import { Meter } from "components/metrics/Meter" import { MetricAccordionItem } from "components/metrics/MetricAccordionItem" import { Timer } from "components/metrics/Timer" @@ -9,26 +8,18 @@ import { useAsync } from "react-async-hook" import { TbChartAreaLine, TbClock } from "react-icons/tb" const shownMeters: { [key: string]: string } = { - "com.commafeed.backend.feed.FeedQueues.refill": "Refresh queue refill rate", - "com.commafeed.backend.feed.FeedRefreshTaskGiver.feedRefreshed": "Feed refreshed", - "com.commafeed.backend.feed.FeedRefreshUpdater.feedUpdated": "Feed updated", - "com.commafeed.backend.feed.FeedRefreshUpdater.entryCacheHit": "Entry cache hit", - "com.commafeed.backend.feed.FeedRefreshUpdater.entryCacheMiss": "Entry cache miss", -} - -const shownGauges: { [key: string]: string } = { - "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-updater.active": "Feed Updater active", - "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-updater.pending": "Feed Updater queued", - "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-worker.active": "Feed Worker active", - "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-worker.pending": "Feed Worker queued", - "com.commafeed.backend.feed.FeedQueues.queue": "Feed Refresh queue size", + "com.commafeed.backend.service.FeedRefreshFlowService.refill": "Feed queue refill rate", + "com.commafeed.backend.feed.FeedRefreshWorker.feedFetched": "Feed fetching rate", + "com.commafeed.backend.feed.FeedRefreshUpdater.feedUpdated": "Feed update rate", + "com.commafeed.backend.feed.FeedRefreshUpdater.entryCacheHit": "Entry cache hit rate", + "com.commafeed.backend.feed.FeedRefreshUpdater.entryCacheMiss": "Entry cache miss rate", } export function MetricsPage() { const query = useAsync(() => client.admin.getMetrics(), []) if (!query.result) return - const { meters, gauges, timers } = query.result.data + const { meters, timers } = query.result.data return ( @@ -48,15 +39,6 @@ export function MetricsPage() { ))} - - - {Object.keys(shownGauges).map(g => ( - - {shownGauges[g]}  - - - ))} - diff --git a/commafeed-server/pom.xml b/commafeed-server/pom.xml index 82ddda2e..3e8e1f57 100644 --- a/commafeed-server/pom.xml +++ b/commafeed-server/pom.xml @@ -301,6 +301,12 @@ 1.1.1 + + io.reactivex.rxjava3 + rxjava + 3.1.6 + + javax.xml.bind jaxb-api diff --git a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java index ed562fac..01e37d87 100644 --- a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java +++ b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java @@ -18,9 +18,6 @@ import javax.websocket.server.ServerEndpointConfig; import org.hibernate.cfg.AvailableSettings; import com.codahale.metrics.json.MetricsModule; -import com.commafeed.backend.feed.FeedRefreshTaskGiver; -import com.commafeed.backend.feed.FeedRefreshUpdater; -import com.commafeed.backend.feed.FeedRefreshWorker; import com.commafeed.backend.model.AbstractModel; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedCategory; @@ -32,7 +29,8 @@ import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; import com.commafeed.backend.model.UserRole; import com.commafeed.backend.model.UserSettings; -import com.commafeed.backend.service.StartupService; +import com.commafeed.backend.service.DatabaseStartupService; +import com.commafeed.backend.service.FeedRefreshEngine; import com.commafeed.backend.service.UserService; import com.commafeed.backend.task.ScheduledTask; import com.commafeed.frontend.auth.SecurityCheckFactoryProvider; @@ -195,12 +193,10 @@ public class CommaFeedApplication extends Application { } // database init/changelogs - environment.lifecycle().manage(injector.getInstance(StartupService.class)); + environment.lifecycle().manage(injector.getInstance(DatabaseStartupService.class)); - // background feed fetching - environment.lifecycle().manage(injector.getInstance(FeedRefreshTaskGiver.class)); - environment.lifecycle().manage(injector.getInstance(FeedRefreshWorker.class)); - environment.lifecycle().manage(injector.getInstance(FeedRefreshUpdater.class)); + // start feed fetching engine + environment.lifecycle().manage(injector.getInstance(FeedRefreshEngine.class)); // prevent caching index.html, so that the webapp is always up to date environment.servlets() diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedAndEntries.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedAndEntries.java new file mode 100644 index 00000000..20d77df0 --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedAndEntries.java @@ -0,0 +1,14 @@ +package com.commafeed.backend.feed; + +import java.util.List; + +import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedEntry; + +import lombok.Value; + +@Value +public class FeedAndEntries { + Feed feed; + List entries; +} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java deleted file mode 100644 index e571386a..00000000 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java +++ /dev/null @@ -1,124 +0,0 @@ -package com.commafeed.backend.feed; - -import java.util.Date; -import java.util.List; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.time.DateUtils; -import org.hibernate.SessionFactory; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.commafeed.CommaFeedConfiguration; -import com.commafeed.backend.dao.FeedDAO; -import com.commafeed.backend.dao.UnitOfWork; -import com.commafeed.backend.model.Feed; - -@Singleton -public class FeedQueues { - - private final SessionFactory sessionFactory; - private final FeedDAO feedDAO; - private final CommaFeedConfiguration config; - private final BlockingDeque queue = new LinkedBlockingDeque<>(); - private final Meter refill; - - @Inject - public FeedQueues(SessionFactory sessionFactory, FeedDAO feedDAO, CommaFeedConfiguration config, MetricRegistry metrics) { - this.sessionFactory = sessionFactory; - this.config = config; - this.feedDAO = feedDAO; - this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill")); - - metrics.register(MetricRegistry.name(getClass(), "queue"), (Gauge) queue::size); - } - - /** - * take a feed from the refresh queue - */ - public synchronized FeedRefreshContext take() { - FeedRefreshContext context = queue.poll(); - if (context != null) { - return context; - } - - refill(); - try { - // try to get something from the queue - // if the queue is empty, wait a bit - // polling the queue instead of sleeping gives us the opportunity to process a feed immediately if it was added manually with - // add() - return queue.poll(15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException("interrupted while waiting for a feed in the queue", e); - } - } - - /** - * add a feed to the refresh queue - */ - public void add(Feed feed, boolean urgent) { - if (isFeedAlreadyQueued(feed)) { - return; - } - - FeedRefreshContext context = new FeedRefreshContext(feed, urgent); - if (urgent) { - queue.addFirst(context); - } else { - queue.addLast(context); - } - } - - /** - * refills the refresh queue - */ - private void refill() { - refill.mark(); - - // add feeds that are up to refresh from the database - int batchSize = Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads()); - List feeds = UnitOfWork.call(sessionFactory, () -> { - List list = feedDAO.findNextUpdatable(batchSize, getLastLoginThreshold()); - - // set the disabledDate as we use it in feedDAO.findNextUpdatable() to decide what to refresh next - Date nextRefreshDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes()); - list.forEach(f -> f.setDisabledUntil(nextRefreshDate)); - feedDAO.saveOrUpdate(list); - - return list; - }); - - feeds.forEach(f -> add(f, false)); - } - - public void giveBack(Feed feed) { - String normalized = FeedUtils.normalizeURL(feed.getUrl()); - feed.setNormalizedUrl(normalized); - feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized)); - feed.setLastUpdated(new Date()); - UnitOfWork.run(sessionFactory, () -> feedDAO.saveOrUpdate(feed)); - - // we just finished updating the feed, remove it from the queue - queue.removeIf(c -> isFeedAlreadyQueued(c.getFeed())); - } - - private Date getLastLoginThreshold() { - if (config.getApplicationSettings().getHeavyLoad()) { - return DateUtils.addDays(new Date(), -30); - } else { - return null; - } - } - - private boolean isFeedAlreadyQueued(Feed feed) { - return queue.stream().anyMatch(c -> c.getFeed().getId().equals(feed.getId())); - } -} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshContext.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshContext.java deleted file mode 100644 index bf1fcde1..00000000 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshContext.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.commafeed.backend.feed; - -import java.util.List; - -import com.commafeed.backend.model.Feed; -import com.commafeed.backend.model.FeedEntry; - -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class FeedRefreshContext { - private Feed feed; - private List entries; - private boolean urgent; - - public FeedRefreshContext(Feed feed, boolean isUrgent) { - this.feed = feed; - this.urgent = isUrgent; - } -} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshExecutor.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshExecutor.java deleted file mode 100644 index dabd2d6c..00000000 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshExecutor.java +++ /dev/null @@ -1,99 +0,0 @@ -package com.commafeed.backend.feed; - -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.MetricRegistry; - -import lombok.extern.slf4j.Slf4j; - -/** - * Wraps a {@link ThreadPoolExecutor} instance. Blocks when queue is full instead of rejecting the task. Allow priority queueing by using - * {@link Task} instead of {@link Runnable} - * - */ -@Slf4j -public class FeedRefreshExecutor { - - private String poolName; - private ThreadPoolExecutor pool; - private LinkedBlockingDeque queue; - - public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity, MetricRegistry metrics) { - log.info("Creating pool {} with {} threads", poolName, threads); - this.poolName = poolName; - pool = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queue = new LinkedBlockingDeque(queueCapacity) { - private static final long serialVersionUID = 1L; - - @Override - public boolean offer(Runnable r) { - Task task = (Task) r; - if (task.isUrgent()) { - return offerFirst(r); - } else { - return offerLast(r); - } - } - }) { - @Override - protected void afterExecute(Runnable r, Throwable t) { - if (t != null) { - log.error("thread from pool {} threw a runtime exception", poolName, t); - } - } - }; - pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - log.debug("{} thread queue full, waiting...", poolName); - try { - Task task = (Task) r; - if (task.isUrgent()) { - queue.putFirst(r); - } else { - queue.put(r); - } - } catch (InterruptedException e1) { - log.error(poolName + " interrupted while waiting for queue.", e1); - } - } - }); - - metrics.register(MetricRegistry.name(getClass(), poolName, "active"), new Gauge() { - @Override - public Integer getValue() { - return pool.getActiveCount(); - } - }); - - metrics.register(MetricRegistry.name(getClass(), poolName, "pending"), new Gauge() { - @Override - public Integer getValue() { - return queue.size(); - } - }); - } - - public void execute(Task task) { - pool.execute(task); - } - - public void shutdown() { - pool.shutdownNow(); - while (!pool.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - log.error("{} interrupted while waiting for threads to finish.", poolName); - } - } - } - - public interface Task extends Runnable { - boolean isUrgent(); - } - -} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java deleted file mode 100644 index 7c985fbc..00000000 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.commafeed.backend.feed; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; -import com.commafeed.CommaFeedConfiguration; -import com.commafeed.backend.dao.FeedDAO; - -import io.dropwizard.lifecycle.Managed; -import lombok.extern.slf4j.Slf4j; - -/** - * Infinite loop fetching feeds from @FeedQueues and queuing them to the {@link FeedRefreshWorker} pool. - * - */ -@Slf4j -@Singleton -public class FeedRefreshTaskGiver implements Managed { - - private final FeedQueues queues; - private final FeedRefreshWorker worker; - - private final ExecutorService executor; - - private final Meter feedRefreshed; - - @Inject - public FeedRefreshTaskGiver(FeedQueues queues, FeedDAO feedDAO, FeedRefreshWorker worker, CommaFeedConfiguration config, - MetricRegistry metrics) { - this.queues = queues; - this.worker = worker; - - executor = Executors.newFixedThreadPool(1); - feedRefreshed = metrics.meter(MetricRegistry.name(getClass(), "feedRefreshed")); - } - - @Override - public void stop() { - log.info("shutting down feed refresh task giver"); - executor.shutdownNow(); - while (!executor.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - log.error("interrupted while waiting for threads to finish."); - } - } - } - - @Override - public void start() { - log.info("starting feed refresh task giver"); - executor.execute(new Runnable() { - @Override - public void run() { - while (!executor.isShutdown()) { - try { - FeedRefreshContext context = queues.take(); - if (context != null) { - feedRefreshed.mark(); - worker.updateFeed(context); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - } - }); - } -} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java index c23479ea..6c67e815 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java @@ -20,17 +20,16 @@ import org.hibernate.SessionFactory; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.commafeed.CommaFeedConfiguration; -import com.commafeed.CommaFeedConfiguration.ApplicationSettings; import com.commafeed.backend.cache.CacheService; import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.dao.UnitOfWork; -import com.commafeed.backend.feed.FeedRefreshExecutor.Task; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedEntryContent; import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; -import com.commafeed.backend.service.FeedUpdateService; +import com.commafeed.backend.service.FeedEntryService; +import com.commafeed.backend.service.FeedService; import com.commafeed.backend.service.PubSubService; import com.commafeed.frontend.ws.WebSocketMessageBuilder; import com.commafeed.frontend.ws.WebSocketSessions; @@ -45,15 +44,14 @@ import lombok.extern.slf4j.Slf4j; public class FeedRefreshUpdater implements Managed { private final SessionFactory sessionFactory; - private final FeedUpdateService feedUpdateService; + private final FeedService feedService; + private final FeedEntryService feedEntryService; private final PubSubService pubSubService; - private final FeedQueues queues; private final CommaFeedConfiguration config; private final FeedSubscriptionDAO feedSubscriptionDAO; private final CacheService cache; private final WebSocketSessions webSocketSessions; - private final FeedRefreshExecutor pool; private final Striped locks; private final Meter entryCacheMiss; @@ -62,22 +60,19 @@ public class FeedRefreshUpdater implements Managed { private final Meter entryInserted; @Inject - public FeedRefreshUpdater(SessionFactory sessionFactory, FeedUpdateService feedUpdateService, PubSubService pubSubService, - FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO, + public FeedRefreshUpdater(SessionFactory sessionFactory, FeedService feedService, FeedEntryService feedEntryService, + PubSubService pubSubService, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO, CacheService cache, WebSocketSessions webSocketSessions) { this.sessionFactory = sessionFactory; - this.feedUpdateService = feedUpdateService; + this.feedService = feedService; + this.feedEntryService = feedEntryService; this.pubSubService = pubSubService; - this.queues = queues; this.config = config; this.feedSubscriptionDAO = feedSubscriptionDAO; this.cache = cache; this.webSocketSessions = webSocketSessions; - ApplicationSettings settings = config.getApplicationSettings(); - int threads = Math.max(settings.getDatabaseUpdateThreads(), 1); - pool = new FeedRefreshExecutor("feed-refresh-updater", threads, Math.min(50 * threads, 1000), metrics); - locks = Striped.lazyWeakLock(threads * 100000); + locks = Striped.lazyWeakLock(100000); entryCacheMiss = metrics.meter(MetricRegistry.name(getClass(), "entryCacheMiss")); entryCacheHit = metrics.meter(MetricRegistry.name(getClass(), "entryCacheHit")); @@ -85,20 +80,6 @@ public class FeedRefreshUpdater implements Managed { entryInserted = metrics.meter(MetricRegistry.name(getClass(), "entryInserted")); } - @Override - public void start() throws Exception { - } - - @Override - public void stop() throws Exception { - log.info("shutting down feed refresh updater"); - pool.shutdown(); - } - - public void updateFeed(FeedRefreshContext context) { - pool.execute(new EntryTask(context)); - } - private AddEntryResult addEntry(final Feed feed, final FeedEntry entry, final List subscriptions) { boolean processed = false; boolean inserted = false; @@ -123,7 +104,7 @@ public class FeedRefreshUpdater implements Managed { locked2 = lock2.tryLock(1, TimeUnit.MINUTES); if (locked1 && locked2) { processed = true; - inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions)); + inserted = UnitOfWork.call(sessionFactory, () -> feedEntryService.addEntry(feed, entry, subscriptions)); if (inserted) { entryInserted.mark(); } @@ -166,76 +147,63 @@ public class FeedRefreshUpdater implements Managed { } } - private class EntryTask implements Task { + public boolean update(Feed feed, List entries) { + boolean processed = true; + boolean insertedAtLeastOneEntry = false; - private final FeedRefreshContext context; + if (!entries.isEmpty()) { + List lastEntries = cache.getLastEntries(feed); + List currentEntries = new ArrayList<>(); - public EntryTask(FeedRefreshContext context) { - this.context = context; - } - - @Override - public void run() { - boolean processed = true; - boolean insertedAtLeastOneEntry = false; - - final Feed feed = context.getFeed(); - List entries = context.getEntries(); - if (entries.isEmpty()) { - feed.setMessage("Feed has no entries"); - } else { - List lastEntries = cache.getLastEntries(feed); - List currentEntries = new ArrayList<>(); - - List subscriptions = null; - for (FeedEntry entry : entries) { - String cacheKey = cache.buildUniqueEntryKey(feed, entry); - if (!lastEntries.contains(cacheKey)) { - log.debug("cache miss for {}", entry.getUrl()); - if (subscriptions == null) { - subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed)); - } - AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions); - processed &= addEntryResult.processed; - insertedAtLeastOneEntry |= addEntryResult.inserted; - - entryCacheMiss.mark(); - } else { - log.debug("cache hit for {}", entry.getUrl()); - entryCacheHit.mark(); + List subscriptions = null; + for (FeedEntry entry : entries) { + String cacheKey = cache.buildUniqueEntryKey(feed, entry); + if (!lastEntries.contains(cacheKey)) { + log.debug("cache miss for {}", entry.getUrl()); + if (subscriptions == null) { + subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed)); } + AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions); + processed &= addEntryResult.processed; + insertedAtLeastOneEntry |= addEntryResult.inserted; - currentEntries.add(cacheKey); + entryCacheMiss.mark(); + } else { + log.debug("cache hit for {}", entry.getUrl()); + entryCacheHit.mark(); } - cache.setLastEntries(feed, currentEntries); - if (subscriptions == null) { - feed.setMessage("No new entries found"); - } else if (insertedAtLeastOneEntry) { - List users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList()); - cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); - cache.invalidateUserRootCategory(users.toArray(new User[0])); + currentEntries.add(cacheKey); + } + cache.setLastEntries(feed, currentEntries); - // notify over websocket - subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub))); - } - } + if (subscriptions == null) { + feed.setMessage("No new entries found"); + } else if (insertedAtLeastOneEntry) { + List users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList()); + cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); + cache.invalidateUserRootCategory(users.toArray(new User[0])); - if (config.getApplicationSettings().getPubsubhubbub()) { - handlePubSub(feed); - } - if (!processed) { - // requeue asap - feed.setDisabledUntil(new Date(0)); + // notify over websocket + subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub))); } + } + + if (Boolean.TRUE.equals(config.getApplicationSettings().getPubsubhubbub())) { + handlePubSub(feed); + } + if (!processed) { + // requeue asap + feed.setDisabledUntil(new Date(0)); + } + + if (insertedAtLeastOneEntry) { feedUpdated.mark(); - queues.giveBack(feed); } - @Override - public boolean isUrgent() { - return context.isUrgent(); - } + UnitOfWork.run(sessionFactory, () -> feedService.save(feed)); + + return processed; } @AllArgsConstructor diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshWorker.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshWorker.java index b1a88885..4a208b05 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshWorker.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshWorker.java @@ -1,5 +1,6 @@ package com.commafeed.backend.feed; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -10,58 +11,38 @@ import javax.inject.Singleton; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.commafeed.CommaFeedConfiguration; import com.commafeed.backend.HttpGetter.NotModifiedException; -import com.commafeed.backend.feed.FeedRefreshExecutor.Task; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; -import io.dropwizard.lifecycle.Managed; import lombok.extern.slf4j.Slf4j; /** * Calls {@link FeedFetcher} and handles its outcome - * */ @Slf4j @Singleton -public class FeedRefreshWorker implements Managed { +public class FeedRefreshWorker { - private final FeedRefreshUpdater feedRefreshUpdater; private final FeedRefreshIntervalCalculator refreshIntervalCalculator; private final FeedFetcher fetcher; - private final FeedQueues queues; private final CommaFeedConfiguration config; - private final FeedRefreshExecutor pool; + private final Meter feedFetched; @Inject - public FeedRefreshWorker(FeedRefreshUpdater feedRefreshUpdater, FeedRefreshIntervalCalculator refreshIntervalCalculator, - FeedFetcher fetcher, FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics) { - this.feedRefreshUpdater = feedRefreshUpdater; + public FeedRefreshWorker(FeedRefreshIntervalCalculator refreshIntervalCalculator, FeedFetcher fetcher, CommaFeedConfiguration config, + MetricRegistry metrics) { this.refreshIntervalCalculator = refreshIntervalCalculator; this.fetcher = fetcher; this.config = config; - this.queues = queues; - int threads = config.getApplicationSettings().getBackgroundThreads(); - pool = new FeedRefreshExecutor("feed-refresh-worker", threads, Math.min(20 * threads, 1000), metrics); + this.feedFetched = metrics.meter(MetricRegistry.name(getClass(), "feedFetched")); + } - @Override - public void start() throws Exception { - } - - @Override - public void stop() throws Exception { - pool.shutdown(); - } - - public void updateFeed(FeedRefreshContext context) { - pool.execute(new FeedTask(context)); - } - - private void update(FeedRefreshContext context) { - Feed feed = context.getFeed(); + public FeedAndEntries update(Feed feed) { try { String url = Optional.ofNullable(feed.getUrlAfterRedirect()).orElse(feed.getUrl()); FetchedFeed fetchedFeed = fetcher.fetch(url, false, feed.getLastModifiedHeader(), feed.getEtagHeader(), @@ -92,9 +73,8 @@ public class FeedRefreshWorker implements Managed { feed.setDisabledUntil(refreshIntervalCalculator.onFetchSuccess(fetchedFeed)); handlePubSub(feed, fetchedFeed.getFeed()); - context.setEntries(entries); - feedRefreshUpdater.updateFeed(context); + return new FeedAndEntries(feed, entries); } catch (NotModifiedException e) { log.debug("Feed not modified : {} - {}", feed.getUrl(), e.getMessage()); @@ -110,7 +90,7 @@ public class FeedRefreshWorker implements Managed { feed.setEtagHeader(e.getNewEtagHeader()); } - queues.giveBack(feed); + return new FeedAndEntries(feed, Collections.emptyList()); } catch (Exception e) { String message = "Unable to refresh feed " + feed.getUrl() + " : " + e.getMessage(); log.debug(e.getClass().getName() + " " + message, e); @@ -119,7 +99,9 @@ public class FeedRefreshWorker implements Managed { feed.setMessage(message); feed.setDisabledUntil(refreshIntervalCalculator.onFetchError(feed)); - queues.giveBack(feed); + return new FeedAndEntries(feed, Collections.emptyList()); + } finally { + feedFetched.mark(); } } @@ -145,22 +127,4 @@ public class FeedRefreshWorker implements Managed { } } - private class FeedTask implements Task { - - private final FeedRefreshContext context; - - public FeedTask(FeedRefreshContext context) { - this.context = context; - } - - @Override - public void run() { - update(context); - } - - @Override - public boolean isUrgent() { - return context.isUrgent(); - } - } } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/StartupService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/DatabaseStartupService.java similarity index 94% rename from commafeed-server/src/main/java/com/commafeed/backend/service/StartupService.java rename to commafeed-server/src/main/java/com/commafeed/backend/service/DatabaseStartupService.java index 3c9209a4..df645aab 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/StartupService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/DatabaseStartupService.java @@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequiredArgsConstructor(onConstructor = @__({ @Inject })) @Singleton -public class StartupService implements Managed { +public class DatabaseStartupService implements Managed { private final SessionFactory sessionFactory; private final UserDAO userDAO; diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java index 7a01b7b4..47b9c815 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedEntryService.java @@ -7,18 +7,24 @@ import java.util.List; import javax.inject.Inject; import javax.inject.Singleton; +import org.apache.commons.codec.digest.DigestUtils; + import com.commafeed.backend.cache.CacheService; import com.commafeed.backend.dao.FeedEntryDAO; import com.commafeed.backend.dao.FeedEntryStatusDAO; import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.feed.FeedEntryKeyword; +import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; +import com.commafeed.backend.model.FeedEntryContent; import com.commafeed.backend.model.FeedEntryStatus; import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +@Slf4j @RequiredArgsConstructor(onConstructor = @__({ @Inject })) @Singleton public class FeedEntryService { @@ -26,8 +32,45 @@ public class FeedEntryService { private final FeedSubscriptionDAO feedSubscriptionDAO; private final FeedEntryDAO feedEntryDAO; private final FeedEntryStatusDAO feedEntryStatusDAO; + private final FeedEntryContentService feedEntryContentService; + private final FeedEntryFilteringService feedEntryFilteringService; private final CacheService cache; + /** + * this is NOT thread-safe + */ + public boolean addEntry(Feed feed, FeedEntry entry, List subscriptions) { + + Long existing = feedEntryDAO.findExisting(entry.getGuid(), feed); + if (existing != null) { + return false; + } + + FeedEntryContent content = feedEntryContentService.findOrCreate(entry.getContent(), feed.getLink()); + entry.setGuidHash(DigestUtils.sha1Hex(entry.getGuid())); + entry.setContent(content); + entry.setInserted(new Date()); + entry.setFeed(feed); + feedEntryDAO.saveOrUpdate(entry); + + // if filter does not match the entry, mark it as read + for (FeedSubscription sub : subscriptions) { + boolean matches = true; + try { + matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), entry); + } catch (FeedEntryFilteringService.FeedEntryFilterException e) { + log.error("could not evaluate filter {}", sub.getFilter(), e); + } + if (!matches) { + FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, entry); + status.setRead(true); + feedEntryStatusDAO.saveOrUpdate(status); + } + } + + return true; + } + public void markEntry(User user, Long entryId, boolean read) { FeedEntry entry = feedEntryDAO.findById(entryId); diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedRefreshEngine.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedRefreshEngine.java new file mode 100644 index 00000000..d92a7484 --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedRefreshEngine.java @@ -0,0 +1,120 @@ +package com.commafeed.backend.service; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.apache.commons.lang3.time.DateUtils; +import org.hibernate.SessionFactory; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.commafeed.CommaFeedConfiguration; +import com.commafeed.backend.dao.FeedDAO; +import com.commafeed.backend.dao.UnitOfWork; +import com.commafeed.backend.feed.FeedRefreshUpdater; +import com.commafeed.backend.feed.FeedRefreshWorker; +import com.commafeed.backend.model.Feed; + +import io.dropwizard.lifecycle.Managed; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Singleton +public class FeedRefreshEngine implements Managed { + + private final SessionFactory sessionFactory; + private final FeedDAO feedDAO; + private final FeedRefreshWorker worker; + private final FeedRefreshUpdater updater; + private final CommaFeedConfiguration config; + private final Meter refill; + + private final PublishProcessor priorityQueue; + private Disposable flow; + + @Inject + public FeedRefreshEngine(SessionFactory sessionFactory, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater, + CommaFeedConfiguration config, MetricRegistry metrics) { + this.sessionFactory = sessionFactory; + this.feedDAO = feedDAO; + this.worker = worker; + this.updater = updater; + this.config = config; + this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill")); + this.priorityQueue = PublishProcessor.create(); + } + + @Override + public void start() throws Exception { + Flowable database = Flowable.fromCallable(() -> findNextUpdatableFeeds(getBatchSize(), getLastLoginThreshold())) + .onErrorResumeNext(e -> { + log.error("error while fetching next updatable feeds", e); + return Flowable.empty(); + }) + // repeat query 15s after the flowable has been emptied + // https://github.com/ReactiveX/RxJava/issues/448#issuecomment-233244964 + .repeatWhen(o -> o.concatMap(v -> Flowable.timer(15, TimeUnit.SECONDS))) + .flatMap(Flowable::fromIterable); + Flowable source = Flowable.merge(priorityQueue, database); + + this.flow = source.subscribeOn(Schedulers.io()) + // feed fetching + .parallel(config.getApplicationSettings().getBackgroundThreads()) + .runOn(Schedulers.io()) + .flatMap(f -> Flowable.fromCallable(() -> worker.update(f)).onErrorResumeNext(e -> { + log.error("error while fetching feed", e); + return Flowable.empty(); + })) + .sequential() + // database updating + .parallel(config.getApplicationSettings().getDatabaseUpdateThreads()) + .runOn(Schedulers.io()) + .flatMap(fae -> Flowable.fromCallable(() -> updater.update(fae.getFeed(), fae.getEntries())).onErrorResumeNext(e -> { + log.error("error while updating database", e); + return Flowable.empty(); + })) + .sequential() + // end flow + .subscribe(); + } + + public void refreshImmediately(Feed feed) { + priorityQueue.onNext(feed); + } + + private List findNextUpdatableFeeds(int max, Date lastLoginThreshold) { + refill.mark(); + + return UnitOfWork.call(sessionFactory, () -> { + List list = feedDAO.findNextUpdatable(max, lastLoginThreshold); + + // set the disabledDate as we use it in feedDAO.findNextUpdatable() to decide what to refresh next + Date nextRefreshDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes()); + list.forEach(f -> f.setDisabledUntil(nextRefreshDate)); + feedDAO.saveOrUpdate(list); + + return list; + }); + } + + private int getBatchSize() { + return Math.min(Flowable.bufferSize(), 3 * config.getApplicationSettings().getBackgroundThreads()); + } + + private Date getLastLoginThreshold() { + return Boolean.TRUE.equals(config.getApplicationSettings().getHeavyLoad()) ? DateUtils.addDays(new Date(), -30) : null; + } + + @Override + public void stop() throws Exception { + flow.dispose(); + } +} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedService.java index 2ee312af..99cf3c66 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedService.java @@ -50,6 +50,14 @@ public class FeedService { return feed; } + public void save(Feed feed) { + String normalized = FeedUtils.normalizeURL(feed.getUrl()); + feed.setNormalizedUrl(normalized); + feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized)); + feed.setLastUpdated(new Date()); + feedDAO.saveOrUpdate(feed); + } + public Favicon fetchFavicon(Feed feed) { Favicon icon = null; diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java index 263f5341..78bc936b 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java @@ -14,7 +14,6 @@ import com.commafeed.backend.cache.CacheService; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.dao.FeedEntryStatusDAO; import com.commafeed.backend.dao.FeedSubscriptionDAO; -import com.commafeed.backend.feed.FeedQueues; import com.commafeed.backend.feed.FeedUtils; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedCategory; @@ -35,7 +34,7 @@ public class FeedSubscriptionService { private final FeedEntryStatusDAO feedEntryStatusDAO; private final FeedSubscriptionDAO feedSubscriptionDAO; private final FeedService feedService; - private final FeedQueues queues; + private final FeedRefreshEngine feedRefreshEngine; private final CacheService cache; private final CommaFeedConfiguration config; @@ -76,7 +75,7 @@ public class FeedSubscriptionService { sub.setTitle(FeedUtils.truncate(title, 128)); feedSubscriptionDAO.saveOrUpdate(sub); - queues.add(feed, true); + feedRefreshEngine.refreshImmediately(feed); cache.invalidateUserRootCategory(user); return sub.getId(); } @@ -96,7 +95,7 @@ public class FeedSubscriptionService { List subs = feedSubscriptionDAO.findAll(user); for (FeedSubscription sub : subs) { Feed feed = sub.getFeed(); - queues.add(feed, true); + feedRefreshEngine.refreshImmediately(feed); } } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedUpdateService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedUpdateService.java deleted file mode 100644 index 24ab8fa6..00000000 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedUpdateService.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.commafeed.backend.service; - -import java.util.Date; -import java.util.List; - -import javax.inject.Inject; -import javax.inject.Singleton; - -import org.apache.commons.codec.digest.DigestUtils; - -import com.commafeed.backend.dao.FeedEntryDAO; -import com.commafeed.backend.dao.FeedEntryStatusDAO; -import com.commafeed.backend.model.Feed; -import com.commafeed.backend.model.FeedEntry; -import com.commafeed.backend.model.FeedEntryContent; -import com.commafeed.backend.model.FeedEntryStatus; -import com.commafeed.backend.model.FeedSubscription; -import com.commafeed.backend.service.FeedEntryFilteringService.FeedEntryFilterException; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@RequiredArgsConstructor(onConstructor = @__({ @Inject })) -@Singleton -public class FeedUpdateService { - - private final FeedEntryDAO feedEntryDAO; - private final FeedEntryStatusDAO feedEntryStatusDAO; - private final FeedEntryContentService feedEntryContentService; - private final FeedEntryFilteringService feedEntryFilteringService; - - /** - * this is NOT thread-safe - */ - public boolean addEntry(Feed feed, FeedEntry entry, List subscriptions) { - - Long existing = feedEntryDAO.findExisting(entry.getGuid(), feed); - if (existing != null) { - return false; - } - - FeedEntryContent content = feedEntryContentService.findOrCreate(entry.getContent(), feed.getLink()); - entry.setGuidHash(DigestUtils.sha1Hex(entry.getGuid())); - entry.setContent(content); - entry.setInserted(new Date()); - entry.setFeed(feed); - feedEntryDAO.saveOrUpdate(entry); - - // if filter does not match the entry, mark it as read - for (FeedSubscription sub : subscriptions) { - boolean matches = true; - try { - matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), entry); - } catch (FeedEntryFilterException e) { - log.error("could not evaluate filter {}", sub.getFilter(), e); - } - if (!matches) { - FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, entry); - status.setRead(true); - feedEntryStatusDAO.saveOrUpdate(status); - } - } - - return true; - } -} diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/PubSubService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/PubSubService.java index dd803c87..4fbe87ab 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/PubSubService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/PubSubService.java @@ -17,10 +17,11 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; +import org.hibernate.SessionFactory; import com.commafeed.CommaFeedConfiguration; import com.commafeed.backend.HttpGetter; -import com.commafeed.backend.feed.FeedQueues; +import com.commafeed.backend.dao.UnitOfWork; import com.commafeed.backend.feed.FeedUtils; import com.commafeed.backend.model.Feed; import com.commafeed.frontend.resource.PubSubHubbubCallbackREST; @@ -38,7 +39,8 @@ import lombok.extern.slf4j.Slf4j; public class PubSubService { private final CommaFeedConfiguration config; - private final FeedQueues queues; + private final FeedService feedService; + private final SessionFactory sessionFactory; public void subscribe(Feed feed) { String hub = feed.getPushHub(); @@ -73,7 +75,7 @@ public class PubSubService { if (code == 400 && StringUtils.contains(message, pushpressError)) { String[] tokens = message.split(" "); feed.setPushTopic(tokens[tokens.length - 1]); - queues.giveBack(feed); + UnitOfWork.run(sessionFactory, () -> feedService.save(feed)); log.debug("handled pushpress subfeed {} : {}", topic, feed.getPushTopic()); } else { throw new Exception( diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/resource/FeedREST.java b/commafeed-server/src/main/java/com/commafeed/frontend/resource/FeedREST.java index a2d6ab01..dfc48f54 100644 --- a/commafeed-server/src/main/java/com/commafeed/frontend/resource/FeedREST.java +++ b/commafeed-server/src/main/java/com/commafeed/frontend/resource/FeedREST.java @@ -45,7 +45,6 @@ import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.favicon.AbstractFaviconFetcher.Favicon; import com.commafeed.backend.feed.FeedEntryKeyword; import com.commafeed.backend.feed.FeedFetcher; -import com.commafeed.backend.feed.FeedQueues; import com.commafeed.backend.feed.FeedUtils; import com.commafeed.backend.feed.FetchedFeed; import com.commafeed.backend.model.Feed; @@ -62,6 +61,7 @@ import com.commafeed.backend.opml.OPMLImporter; import com.commafeed.backend.service.FeedEntryFilteringService; import com.commafeed.backend.service.FeedEntryFilteringService.FeedEntryFilterException; import com.commafeed.backend.service.FeedEntryService; +import com.commafeed.backend.service.FeedRefreshEngine; import com.commafeed.backend.service.FeedService; import com.commafeed.backend.service.FeedSubscriptionService; import com.commafeed.frontend.auth.SecurityCheck; @@ -109,7 +109,7 @@ public class FeedREST { private final FeedEntryService feedEntryService; private final FeedSubscriptionService feedSubscriptionService; private final FeedEntryFilteringService feedEntryFilteringService; - private final FeedQueues queues; + private final FeedRefreshEngine feedRefreshEngine; private final OPMLImporter opmlImporter; private final OPMLExporter opmlExporter; private final CacheService cache; @@ -303,7 +303,7 @@ public class FeedREST { FeedSubscription sub = feedSubscriptionDAO.findById(user, req.getId()); if (sub != null) { Feed feed = sub.getFeed(); - queues.add(feed, true); + feedRefreshEngine.refreshImmediately(feed); return Response.ok().build(); } return Response.ok(Status.NOT_FOUND).build(); diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/resource/PubSubHubbubCallbackREST.java b/commafeed-server/src/main/java/com/commafeed/frontend/resource/PubSubHubbubCallbackREST.java index dcce17b1..34d2845a 100644 --- a/commafeed-server/src/main/java/com/commafeed/frontend/resource/PubSubHubbubCallbackREST.java +++ b/commafeed-server/src/main/java/com/commafeed/frontend/resource/PubSubHubbubCallbackREST.java @@ -26,9 +26,9 @@ import com.codahale.metrics.annotation.Timed; import com.commafeed.CommaFeedConfiguration; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.feed.FeedParser; -import com.commafeed.backend.feed.FeedQueues; import com.commafeed.backend.feed.FetchedFeed; import com.commafeed.backend.model.Feed; +import com.commafeed.backend.service.FeedRefreshEngine; import com.google.common.base.Preconditions; import io.dropwizard.hibernate.UnitOfWork; @@ -46,7 +46,7 @@ public class PubSubHubbubCallbackREST { private final FeedDAO feedDAO; private final FeedParser parser; - private final FeedQueues queues; + private final FeedRefreshEngine feedRefreshEngine; private final CommaFeedConfiguration config; private final MetricRegistry metricRegistry; @@ -114,7 +114,7 @@ public class PubSubHubbubCallbackREST { for (Feed feed : feeds) { log.debug("pushing content to queue for {}", feed.getUrl()); - queues.add(feed, false); + feedRefreshEngine.refreshImmediately(feed); } metricRegistry.meter(MetricRegistry.name(getClass(), "pushReceived")).mark(); diff --git a/commafeed-server/src/test/java/com/commafeed/backend/service/PubSubServiceTest.java b/commafeed-server/src/test/java/com/commafeed/backend/service/PubSubServiceTest.java index 9ccb9a4f..1b69f0c3 100644 --- a/commafeed-server/src/test/java/com/commafeed/backend/service/PubSubServiceTest.java +++ b/commafeed-server/src/test/java/com/commafeed/backend/service/PubSubServiceTest.java @@ -1,6 +1,7 @@ package com.commafeed.backend.service; import org.apache.http.HttpHeaders; +import org.hibernate.SessionFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -15,7 +16,6 @@ import org.mockserver.model.HttpResponse; import org.mockserver.model.MediaType; import com.commafeed.CommaFeedConfiguration; -import com.commafeed.backend.feed.FeedQueues; import com.commafeed.backend.model.Feed; @ExtendWith(MockServerExtension.class) @@ -25,7 +25,10 @@ class PubSubServiceTest { private CommaFeedConfiguration config; @Mock - private FeedQueues queues; + private FeedService feedService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private SessionFactory sessionFactory; @Mock private Feed feed; @@ -40,7 +43,7 @@ class PubSubServiceTest { this.client = client; this.client.reset(); - this.underTest = new PubSubService(config, queues); + this.underTest = new PubSubService(config, feedService, sessionFactory); Integer port = client.getPort(); String hubUrl = String.format("http://localhost:%s/hub", port); @@ -69,7 +72,7 @@ class PubSubServiceTest { .withMethod("POST") .withPath("/hub")); Mockito.verify(feed, Mockito.never()).setPushTopic(Mockito.anyString()); - Mockito.verifyNoInteractions(queues); + Mockito.verifyNoInteractions(feedService); } @Test @@ -83,7 +86,7 @@ class PubSubServiceTest { // Assert Mockito.verify(feed).setPushTopic(Mockito.anyString()); - Mockito.verify(queues).giveBack(feed); + Mockito.verify(feedService).save(feed); } @Test @@ -96,7 +99,7 @@ class PubSubServiceTest { // Assert Mockito.verify(feed, Mockito.never()).setPushTopic(Mockito.anyString()); - Mockito.verifyNoInteractions(queues); + Mockito.verifyNoInteractions(feedService); } } \ No newline at end of file