From f20a772603ca93ddd70cfdfb136f18149d0abefa Mon Sep 17 00:00:00 2001 From: Athou Date: Wed, 22 May 2013 21:57:53 +0200 Subject: [PATCH] update database asynchronously --- .../backend/feeds/FeedRefreshUpdater.java | 57 ++++++++++++++++--- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index 221f2750..9bf33279 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -2,7 +2,11 @@ package com.commafeed.backend.feeds; import java.util.Collection; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.inject.Singleton; @@ -12,6 +16,7 @@ import org.slf4j.LoggerFactory; import com.commafeed.backend.LockMap; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.dao.FeedSubscriptionDAO; +import com.commafeed.backend.model.ApplicationSettings; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedPushInfo; @@ -46,19 +51,53 @@ public class FeedRefreshUpdater { @Inject FeedSubscriptionDAO feedSubscriptionDAO; - public void updateFeed(Feed feed, Collection entries) { - if (entries != null) { - List subscriptions = feedSubscriptionDAO - .findByFeed(feed); - for (FeedEntry entry : entries) { - updateEntry(feed, entry, subscriptions); + private ThreadPoolExecutor pool; + + @PostConstruct + public void init() { + ApplicationSettings settings = applicationSettingsService.get(); + int threads = Math.max(settings.getBackgroundThreads(), 1); + pool = new ThreadPoolExecutor(threads, threads, 0, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + super.rejectedExecution(r, e); + log.info("Thread queue full, executing in own thread."); } + }); + } + + public void updateFeed(Feed feed, Collection entries) { + pool.execute(new Task(feed, entries)); + } + + private class Task implements Runnable { + + private Feed feed; + private Collection entries; + + public Task(Feed feed, Collection entries) { + this.feed = feed; + this.entries = entries; } - if (applicationSettingsService.get().isPubsubhubbub()) { - handlePubSub(feed); + @Override + public void run() { + if (entries != null) { + List subscriptions = feedSubscriptionDAO + .findByFeed(feed); + for (FeedEntry entry : entries) { + updateEntry(feed, entry, subscriptions); + } + } + + if (applicationSettingsService.get().isPubsubhubbub()) { + handlePubSub(feed); + } + taskGiver.giveBack(feed); } - taskGiver.giveBack(feed); + } private void updateEntry(Feed feed, FeedEntry entry,