From d3dd084dd74f0d65bc5a5854c266e1f6a58dbc96 Mon Sep 17 00:00:00 2001 From: Athou Date: Sun, 30 Jun 2013 12:18:24 +0200 Subject: [PATCH] better refresh algorithm using queues instead of synchronous database call --- .../com/commafeed/backend/StartupBean.java | 57 +----------- .../com/commafeed/backend/dao/FeedDAO.java | 16 ++-- .../backend/feeds/FeedRefreshExecutor.java | 77 ++++++++++++++++ .../backend/feeds/FeedRefreshTaskGiver.java | 91 +++++++++++++++---- .../backend/feeds/FeedRefreshUpdater.java | 65 +++---------- .../backend/feeds/FeedRefreshWorker.java | 79 ++++++++-------- .../frontend/rest/resources/AbstractREST.java | 4 + .../frontend/rest/resources/AdminREST.java | 5 +- 8 files changed, 225 insertions(+), 169 deletions(-) create mode 100644 src/main/java/com/commafeed/backend/feeds/FeedRefreshExecutor.java diff --git a/src/main/java/com/commafeed/backend/StartupBean.java b/src/main/java/com/commafeed/backend/StartupBean.java index 705d94b9..9499d4e3 100644 --- a/src/main/java/com/commafeed/backend/StartupBean.java +++ b/src/main/java/com/commafeed/backend/StartupBean.java @@ -5,28 +5,20 @@ import java.io.InputStreamReader; import java.util.Arrays; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.ejb.ConcurrencyManagement; import javax.ejb.ConcurrencyManagementType; import javax.ejb.Singleton; import javax.ejb.Startup; -import javax.enterprise.inject.Instance; import javax.inject.Inject; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.mutable.MutableBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.commafeed.backend.dao.FeedCategoryDAO; -import com.commafeed.backend.dao.FeedDAO; -import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.dao.UserDAO; -import com.commafeed.backend.feeds.FeedRefreshWorker; +import com.commafeed.backend.feeds.FeedRefreshTaskGiver; import com.commafeed.backend.model.ApplicationSettings; import com.commafeed.backend.model.UserRole.Role; import com.commafeed.backend.services.ApplicationSettingsService; @@ -45,15 +37,6 @@ public class StartupBean { @Inject DatabaseUpdater databaseUpdater; - @Inject - FeedDAO feedDAO; - - @Inject - FeedCategoryDAO feedCategoryDAO; - - @Inject - FeedSubscriptionDAO feedSubscriptionDAO; - @Inject UserDAO userDAO; @@ -61,17 +44,14 @@ public class StartupBean { UserService userService; @Inject - ApplicationSettingsService applicationSettingsService; + FeedRefreshTaskGiver taskGiver; @Inject - Instance workers; + ApplicationSettingsService applicationSettingsService; private long startupTime; private Map supportedLanguages = Maps.newHashMap(); - private ExecutorService executor; - private MutableBoolean running = new MutableBoolean(true); - @PostConstruct private void init() { @@ -84,23 +64,7 @@ public class StartupBean { applicationSettingsService.applyLogLevel(); initSupportedLanguages(); - - ApplicationSettings settings = applicationSettingsService.get(); - int threads = settings.getBackgroundThreads(); - log.info("Starting {} background threads", threads); - - executor = Executors.newFixedThreadPool(Math.max(threads, 1)); - for (int i = 0; i < threads; i++) { - final int threadId = i; - executor.execute(new Runnable() { - @Override - public void run() { - FeedRefreshWorker worker = workers.get(); - worker.start(running, "Thread " + threadId); - } - }); - } - + taskGiver.start(); } private void initSupportedLanguages() { @@ -145,17 +109,4 @@ public class StartupBean { public Map getSupportedLanguages() { return supportedLanguages; } - - @PreDestroy - public void shutdown() { - running.setValue(false); - executor.shutdownNow(); - while (!executor.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - log.error("interrupted while waiting for threads to finish."); - } - } - } } diff --git a/src/main/java/com/commafeed/backend/dao/FeedDAO.java b/src/main/java/com/commafeed/backend/dao/FeedDAO.java index 2a17d1d4..dd93e76a 100644 --- a/src/main/java/com/commafeed/backend/dao/FeedDAO.java +++ b/src/main/java/com/commafeed/backend/dao/FeedDAO.java @@ -13,7 +13,6 @@ import javax.persistence.criteria.SetJoin; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.time.DateUtils; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedSubscription; @@ -25,42 +24,41 @@ import com.google.common.collect.Lists; @Stateless public class FeedDAO extends GenericDAO { - private List getUpdatablePredicates(Root root) { - Date now = new Date(); + private List getUpdatablePredicates(Root root, Date threshold) { Predicate hasSubscriptions = builder.isNotEmpty(root .get(Feed_.subscriptions)); Predicate neverUpdated = builder.isNull(root.get(Feed_.lastUpdated)); Predicate updatedBeforeThreshold = builder.lessThan( - root.get(Feed_.lastUpdated), DateUtils.addMinutes(now, -10)); + root.get(Feed_.lastUpdated), threshold); Predicate disabledDateIsNull = builder.isNull(root .get(Feed_.disabledUntil)); Predicate disabledDateIsInPast = builder.lessThan( - root.get(Feed_.disabledUntil), now); + root.get(Feed_.disabledUntil), new Date()); return Lists.newArrayList(hasSubscriptions, builder.or(neverUpdated, updatedBeforeThreshold), builder.or(disabledDateIsNull, disabledDateIsInPast)); } - public Long getUpdatableCount() { + public Long getUpdatableCount(Date threshold) { CriteriaQuery query = builder.createQuery(Long.class); Root root = query.from(getType()); query.select(builder.count(root)); - query.where(getUpdatablePredicates(root).toArray(new Predicate[0])); + query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0])); TypedQuery q = em.createQuery(query); return q.getSingleResult(); } - public List findNextUpdatable(int count) { + public List findNextUpdatable(int count, Date threshold) { CriteriaQuery query = builder.createQuery(getType()); Root root = query.from(getType()); - query.where(getUpdatablePredicates(root).toArray(new Predicate[0])); + query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0])); query.orderBy(builder.asc(root.get(Feed_.lastUpdated))); diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshExecutor.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshExecutor.java new file mode 100644 index 00000000..94630cfb --- /dev/null +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshExecutor.java @@ -0,0 +1,77 @@ +package com.commafeed.backend.feeds; + +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FeedRefreshExecutor { + + private static Logger log = LoggerFactory + .getLogger(FeedRefreshExecutor.class); + + private String poolName; + private ThreadPoolExecutor pool; + private LinkedBlockingDeque queue; + + public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity) { + this.poolName = poolName; + pool = new ThreadPoolExecutor(threads, threads, 0, + TimeUnit.MILLISECONDS, + queue = new LinkedBlockingDeque(queueCapacity) { + private static final long serialVersionUID = 1L; + + @Override + public boolean offer(Runnable r) { + Task task = (Task) r; + if (task.isUrgent()) { + return offerFirst(r); + } else { + return offerLast(r); + } + } + }); + pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + log.debug("{} thread queue full, waiting...", poolName); + try { + Task task = (Task) r; + if (task.isUrgent()) { + queue.putFirst(r); + } else { + queue.put(r); + } + } catch (InterruptedException e1) { + log.error(poolName + " interrupted while waiting for queue.", e1); + } + } + }); + } + + public void execute(Task task) { + pool.execute(task); + } + + public int getQueueSize() { + return queue.size(); + } + + public static interface Task extends Runnable { + boolean isUrgent(); + } + + public void shutdown() { + pool.shutdownNow(); + while (!pool.isTerminated()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("{} interrupted while waiting for threads to finish.", poolName); + } + } + } +} diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java index d8c0b5d9..5c035393 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java @@ -4,12 +4,15 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import javax.inject.Singleton; -import org.apache.commons.lang3.time.DateUtils; +import org.apache.commons.lang.time.DateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +23,7 @@ import com.commafeed.backend.services.ApplicationSettingsService; import com.google.api.client.util.Maps; import com.google.common.collect.Queues; -@Singleton +@ApplicationScoped public class FeedRefreshTaskGiver { protected static final Logger log = LoggerFactory.getLogger(FeedRefreshTaskGiver.class); @@ -34,47 +37,103 @@ public class FeedRefreshTaskGiver { @Inject MetricsBean metricsBean; + @Inject + FeedRefreshWorker worker; + private int backgroundThreads; private Queue addQueue = Queues.newConcurrentLinkedQueue(); private Queue takeQueue = Queues.newConcurrentLinkedQueue(); private Queue giveBackQueue = Queues.newConcurrentLinkedQueue(); + private ExecutorService executor; + @PostConstruct public void init() { backgroundThreads = applicationSettingsService.get() .getBackgroundThreads(); + executor = Executors.newFixedThreadPool(1); } - public void add(Feed feed) { - Date now = new Date(); - boolean heavyLoad = applicationSettingsService.get().isHeavyLoad(); - Date threshold = DateUtils.addMinutes(now, heavyLoad ? -10 : -1); - if (feed.getLastUpdated() == null - || feed.getLastUpdated().before(threshold)) { - addQueue.add(feed); + @PreDestroy + public void shutdown() { + executor.shutdownNow(); + while (!executor.isTerminated()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + log.error("interrupted while waiting for threads to finish."); + } } } - public synchronized Feed take() { + public void start() { + try { + // sleeping for a little while, let everything settle + Thread.sleep(5000); + } catch (InterruptedException e) { + log.error("interrupted while sleeping"); + } + log.info("starting feed refresh task giver"); + + executor.execute(new Runnable() { + @Override + public void run() { + while (!executor.isShutdown()) { + try { + Feed feed = take(); + if (feed != null) { + metricsBean.feedRefreshed(); + worker.updateFeed(feed); + } else { + log.debug("nothing to do, sleeping for 15s"); + try { + Thread.sleep(15000); + } catch (InterruptedException e) { + log.error("interrupted while sleeping"); + } + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + }); + } + + private Feed take() { Feed feed = takeQueue.poll(); if (feed == null) { refill(); feed = takeQueue.poll(); } - - if (feed != null) { - metricsBean.feedRefreshed(); - } return feed; } + public Long getUpdatableCount() { + return feedDAO.getUpdatableCount(getThreshold()); + } + + private Date getThreshold() { + boolean heavyLoad = applicationSettingsService.get().isHeavyLoad(); + Date threshold = DateUtils.addMinutes(new Date(), heavyLoad ? -15 : -5); + return threshold; + } + + public void add(Feed feed) { + Date threshold = getThreshold(); + if (feed.getLastUpdated() == null + || feed.getLastUpdated().before(threshold)) { + addQueue.add(feed); + } + } + private void refill() { Date now = new Date(); int count = 3 * backgroundThreads; - List feeds = feedDAO.findNextUpdatable(count); + List feeds = feedDAO.findNextUpdatable(count, getThreshold()); int size = addQueue.size(); for (int i = 0; i < size; i++) { diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index f7ce0b32..4c282ec5 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -3,16 +3,13 @@ package com.commafeed.backend.feeds; import java.util.Collection; import java.util.Date; import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; -import javax.inject.Singleton; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.time.DateUtils; @@ -24,6 +21,7 @@ import com.commafeed.backend.cache.CacheService; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.dao.FeedEntryDAO; import com.commafeed.backend.dao.FeedSubscriptionDAO; +import com.commafeed.backend.feeds.FeedRefreshExecutor.Task; import com.commafeed.backend.model.ApplicationSettings; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; @@ -34,7 +32,7 @@ import com.commafeed.backend.services.FeedUpdateService; import com.google.api.client.util.Lists; import com.google.common.util.concurrent.Striped; -@Singleton +@ApplicationScoped public class FeedRefreshUpdater { protected static Logger log = LoggerFactory @@ -67,71 +65,33 @@ public class FeedRefreshUpdater { @Inject CacheService cache; - private ThreadPoolExecutor pool; + private FeedRefreshExecutor pool; private Striped locks; - private LinkedBlockingDeque queue; @PostConstruct public void init() { ApplicationSettings settings = applicationSettingsService.get(); int threads = Math.max(settings.getDatabaseUpdateThreads(), 1); log.info("Creating database pool with {} threads", threads); + pool = new FeedRefreshExecutor("FeedRefreshUpdater", threads, 500 * threads); locks = Striped.lazyWeakLock(threads * 100000); - pool = new ThreadPoolExecutor(threads, threads, 0, - TimeUnit.MILLISECONDS, - queue = new LinkedBlockingDeque(500 * threads) { - private static final long serialVersionUID = 1L; - - @Override - public boolean offer(Runnable r) { - Task task = (Task) r; - if (task.getFeed().isUrgent()) { - return offerFirst(r); - } else { - return offerLast(r); - } - } - }); - pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { - log.debug("Thread queue full, waiting..."); - try { - Task task = (Task) r; - if (task.getFeed().isUrgent()) { - queue.putFirst(r); - } else { - queue.put(r); - } - } catch (InterruptedException e1) { - log.error("Interrupted while waiting for queue.", e1); - } - } - }); } @PreDestroy public void shutdown() { - pool.shutdownNow(); - while (!pool.isTerminated()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - log.error("interrupted while waiting for threads to finish."); - } - } + pool.shutdown(); } public void updateFeed(Feed feed, Collection entries) { - pool.execute(new Task(feed, entries)); + pool.execute(new EntryTask(feed, entries)); } - private class Task implements Runnable { + private class EntryTask implements Task { private Feed feed; private Collection entries; - public Task(Feed feed, Collection entries) { + public EntryTask(Feed feed, Collection entries) { this.feed = feed; this.entries = entries; } @@ -174,8 +134,9 @@ public class FeedRefreshUpdater { taskGiver.giveBack(feed); } - public Feed getFeed() { - return feed; + @Override + public boolean isUrgent() { + return feed.isUrgent(); } } @@ -221,7 +182,7 @@ public class FeedRefreshUpdater { } public int getQueueSize() { - return queue.size(); + return pool.getQueueSize(); } } diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index 6e86f07f..bb2af26a 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -3,21 +3,26 @@ package com.commafeed.backend.feeds; import java.util.Date; import java.util.List; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.mutable.MutableBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.commafeed.backend.HttpGetter.NotModifiedException; import com.commafeed.backend.MetricsBean; import com.commafeed.backend.dao.FeedEntryDAO; +import com.commafeed.backend.feeds.FeedRefreshExecutor.Task; +import com.commafeed.backend.model.ApplicationSettings; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.services.ApplicationSettingsService; import com.sun.syndication.io.FeedException; +@ApplicationScoped public class FeedRefreshWorker { private static Logger log = LoggerFactory @@ -41,40 +46,45 @@ public class FeedRefreshWorker { @Inject FeedEntryDAO feedEntryDAO; - public void start(MutableBoolean running, String threadName) { - log.info("{} starting", threadName); + private FeedRefreshExecutor pool; - try { - // sleeping before starting, let everything settle - Thread.sleep(5000); - } catch (InterruptedException e) { - log.error(threadName + e.getMessage(), e); + @PostConstruct + private void init() { + ApplicationSettings settings = applicationSettingsService.get(); + int threads = settings.getBackgroundThreads(); + log.info("Creating refresh worker pool with {} threads", threads); + pool = new FeedRefreshExecutor("FeedRefreshUpdater", threads, 20 * threads); + } + + @PreDestroy + public void shutdown() { + pool.shutdown(); + } + + public void updateFeed(Feed feed) { + pool.execute(new FeedTask(feed)); + } + + public int getQueueSize(){ + return pool.getQueueSize(); + } + + private class FeedTask implements Task { + + private Feed feed; + + public FeedTask(Feed feed) { + this.feed = feed; } - while (running.isTrue()) { - Feed feed = null; - try { - feed = getNextFeed(); - if (feed != null) { - log.debug("refreshing " + feed.getUrl()); - update(feed); - } else { - log.debug("sleeping"); - metricsBean.threadWaited(); - Thread.sleep(15000); - } - } catch (InterruptedException e) { - log.info(threadName + " interrupted"); - return; - } catch (Exception e) { - String feedUrl = "feed is null"; - if (feed != null) { - feedUrl = feed.getUrl(); - } - log.error( - threadName + " (" + feedUrl + ") : " + e.getMessage(), - e); - } + @Override + public void run() { + update(feed); + } + + @Override + public boolean isUrgent() { + return feed.isUrgent(); } } @@ -167,9 +177,4 @@ public class FeedRefreshWorker { feed.setPushTopicHash(DigestUtils.sha1Hex(topic)); } } - - private Feed getNextFeed() { - return taskGiver.take(); - } - } diff --git a/src/main/java/com/commafeed/frontend/rest/resources/AbstractREST.java b/src/main/java/com/commafeed/frontend/rest/resources/AbstractREST.java index 42ad3972..101d4ca6 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/AbstractREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/AbstractREST.java @@ -40,6 +40,7 @@ import com.commafeed.backend.feeds.FaviconFetcher; import com.commafeed.backend.feeds.FeedFetcher; import com.commafeed.backend.feeds.FeedRefreshTaskGiver; import com.commafeed.backend.feeds.FeedRefreshUpdater; +import com.commafeed.backend.feeds.FeedRefreshWorker; import com.commafeed.backend.feeds.OPMLExporter; import com.commafeed.backend.feeds.OPMLImporter; import com.commafeed.backend.model.User; @@ -120,6 +121,9 @@ public abstract class AbstractREST { @Inject FeedRefreshTaskGiver taskGiver; + @Inject + FeedRefreshWorker feedRefreshWorker; + @Inject FeedRefreshUpdater feedRefreshUpdater; diff --git a/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java b/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java index 7ef3576e..6c98f0fa 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java @@ -185,9 +185,10 @@ public class AdminREST extends AbstractResourceREST { map.put("lastMinute", metricsBean.getLastMinute()); map.put("lastHour", metricsBean.getLastHour()); if (backlog) { - map.put("backlog", feedDAO.getUpdatableCount()); + map.put("backlog", taskGiver.getUpdatableCount()); } - map.put("queue", feedRefreshUpdater.getQueueSize()); + map.put("http_queue", feedRefreshWorker.getQueueSize()); + map.put("database_queue", feedRefreshUpdater.getQueueSize()); map.put("cache", metricsBean.getCacheStats()); return Response.ok(map).build();