From 33e3f7ea3cdce9d62481698d84a565fe5c3879e6 Mon Sep 17 00:00:00 2001 From: Athou Date: Mon, 16 Jan 2023 16:19:51 +0100 Subject: [PATCH] feeds added manually to the queue now refresh immediately instead of waiting up to 15s (#1036) --- .../src/pages/admin/MetricsPage.tsx | 4 +- .../commafeed/backend/feed/FeedQueues.java | 131 +++++++----------- .../backend/feed/FeedRefreshTaskGiver.java | 10 -- 3 files changed, 50 insertions(+), 95 deletions(-) diff --git a/commafeed-client/src/pages/admin/MetricsPage.tsx b/commafeed-client/src/pages/admin/MetricsPage.tsx index 15b024d2..bea5c3dc 100644 --- a/commafeed-client/src/pages/admin/MetricsPage.tsx +++ b/commafeed-client/src/pages/admin/MetricsPage.tsx @@ -21,9 +21,7 @@ const shownGauges: { [key: string]: string } = { "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-updater.pending": "Feed Updater queued", "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-worker.active": "Feed Worker active", "com.commafeed.backend.feed.FeedRefreshExecutor.feed-refresh-worker.pending": "Feed Worker queued", - "com.commafeed.backend.feed.FeedQueues.addQueue": "Task Giver Add Queue", - "com.commafeed.backend.feed.FeedQueues.takeQueue": "Task Giver Take Queue", - "com.commafeed.backend.feed.FeedQueues.giveBackQueue": "Task Giver Give Back Queue", + "com.commafeed.backend.feed.FeedQueues.queue": "Feed Refresh queue size", } export function MetricsPage() { diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java index b85ac4db..e571386a 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedQueues.java @@ -1,13 +1,10 @@ package com.commafeed.backend.feed; -import java.util.ArrayList; import java.util.Date; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.inject.Singleton; @@ -27,123 +24,90 @@ import com.commafeed.backend.model.Feed; @Singleton public class FeedQueues { - private SessionFactory sessionFactory; + private final SessionFactory sessionFactory; private final FeedDAO feedDAO; private final CommaFeedConfiguration config; - - private Queue addQueue = new ConcurrentLinkedQueue<>(); - private Queue takeQueue = new ConcurrentLinkedQueue<>(); - private Queue giveBackQueue = new ConcurrentLinkedQueue<>(); - - private Meter refill; + private final BlockingDeque queue = new LinkedBlockingDeque<>(); + private final Meter refill; @Inject public FeedQueues(SessionFactory sessionFactory, FeedDAO feedDAO, CommaFeedConfiguration config, MetricRegistry metrics) { this.sessionFactory = sessionFactory; this.config = config; this.feedDAO = feedDAO; + this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill")); - refill = metrics.meter(MetricRegistry.name(getClass(), "refill")); - metrics.register(MetricRegistry.name(getClass(), "addQueue"), new Gauge() { - @Override - public Integer getValue() { - return addQueue.size(); - } - }); - metrics.register(MetricRegistry.name(getClass(), "takeQueue"), new Gauge() { - @Override - public Integer getValue() { - return takeQueue.size(); - } - }); - metrics.register(MetricRegistry.name(getClass(), "giveBackQueue"), new Gauge() { - @Override - public Integer getValue() { - return giveBackQueue.size(); - } - }); + metrics.register(MetricRegistry.name(getClass(), "queue"), (Gauge) queue::size); } /** * take a feed from the refresh queue */ public synchronized FeedRefreshContext take() { - FeedRefreshContext context = takeQueue.poll(); - - if (context == null) { - refill(); - context = takeQueue.poll(); + FeedRefreshContext context = queue.poll(); + if (context != null) { + return context; + } + + refill(); + try { + // try to get something from the queue + // if the queue is empty, wait a bit + // polling the queue instead of sleeping gives us the opportunity to process a feed immediately if it was added manually with + // add() + return queue.poll(15, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("interrupted while waiting for a feed in the queue", e); } - return context; } /** * add a feed to the refresh queue */ public void add(Feed feed, boolean urgent) { - boolean alreadyQueued = addQueue.stream().anyMatch(c -> c.getFeed().getId().equals(feed.getId())); - if (!alreadyQueued) { - addQueue.add(new FeedRefreshContext(feed, urgent)); + if (isFeedAlreadyQueued(feed)) { + return; + } + + FeedRefreshContext context = new FeedRefreshContext(feed, urgent); + if (urgent) { + queue.addFirst(context); + } else { + queue.addLast(context); } } /** - * refills the refresh queue and empties the giveBack queue while at it + * refills the refresh queue */ private void refill() { refill.mark(); - List contexts = new ArrayList<>(); - int batchSize = Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads()); - - // add feeds we got from the add() method - int addQueueSize = addQueue.size(); - for (int i = 0; i < Math.min(batchSize, addQueueSize); i++) { - contexts.add(addQueue.poll()); - } - // add feeds that are up to refresh from the database - int count = batchSize - contexts.size(); - if (count > 0) { - List feeds = UnitOfWork.call(sessionFactory, () -> feedDAO.findNextUpdatable(count, getLastLoginThreshold())); - for (Feed feed : feeds) { - contexts.add(new FeedRefreshContext(feed, false)); - } - } + int batchSize = Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads()); + List feeds = UnitOfWork.call(sessionFactory, () -> { + List list = feedDAO.findNextUpdatable(batchSize, getLastLoginThreshold()); - // set the disabledDate as we use it in feedDAO to decide what to refresh next. We also use a map to remove - // duplicates. - Map map = new LinkedHashMap<>(); - for (FeedRefreshContext context : contexts) { - Feed feed = context.getFeed(); - feed.setDisabledUntil(DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes())); - map.put(feed.getId(), context); - } + // set the disabledDate as we use it in feedDAO.findNextUpdatable() to decide what to refresh next + Date nextRefreshDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes()); + list.forEach(f -> f.setDisabledUntil(nextRefreshDate)); + feedDAO.saveOrUpdate(list); - // refill the queue - takeQueue.addAll(map.values()); + return list; + }); - // add feeds from the giveBack queue to the map, overriding duplicates - int giveBackQueueSize = giveBackQueue.size(); - for (int i = 0; i < giveBackQueueSize; i++) { - Feed feed = giveBackQueue.poll(); - map.put(feed.getId(), new FeedRefreshContext(feed, false)); - } - - // update all feeds in the database - List feeds = map.values().stream().map(FeedRefreshContext::getFeed).collect(Collectors.toList()); - UnitOfWork.run(sessionFactory, () -> feedDAO.saveOrUpdate(feeds)); + feeds.forEach(f -> add(f, false)); } - /** - * give a feed back, updating it to the database during the next refill() - */ public void giveBack(Feed feed) { String normalized = FeedUtils.normalizeURL(feed.getUrl()); feed.setNormalizedUrl(normalized); feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized)); feed.setLastUpdated(new Date()); - giveBackQueue.add(feed); + UnitOfWork.run(sessionFactory, () -> feedDAO.saveOrUpdate(feed)); + + // we just finished updating the feed, remove it from the queue + queue.removeIf(c -> isFeedAlreadyQueued(c.getFeed())); } private Date getLastLoginThreshold() { @@ -154,4 +118,7 @@ public class FeedQueues { } } + private boolean isFeedAlreadyQueued(Feed feed) { + return queue.stream().anyMatch(c -> c.getFeed().getId().equals(feed.getId())); + } } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java index c8726e10..7c985fbc 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshTaskGiver.java @@ -28,7 +28,6 @@ public class FeedRefreshTaskGiver implements Managed { private final ExecutorService executor; private final Meter feedRefreshed; - private final Meter threadWaited; @Inject public FeedRefreshTaskGiver(FeedQueues queues, FeedDAO feedDAO, FeedRefreshWorker worker, CommaFeedConfiguration config, @@ -38,7 +37,6 @@ public class FeedRefreshTaskGiver implements Managed { executor = Executors.newFixedThreadPool(1); feedRefreshed = metrics.meter(MetricRegistry.name(getClass(), "feedRefreshed")); - threadWaited = metrics.meter(MetricRegistry.name(getClass(), "threadWaited")); } @Override @@ -66,14 +64,6 @@ public class FeedRefreshTaskGiver implements Managed { if (context != null) { feedRefreshed.mark(); worker.updateFeed(context); - } else { - log.debug("nothing to do, sleeping for 15s"); - threadWaited.mark(); - try { - Thread.sleep(15000); - } catch (InterruptedException e) { - log.debug("interrupted while sleeping"); - } } } catch (Exception e) { log.error(e.getMessage(), e);