From 5bba598b2e9c6e436b55685d7b7fe065443848cf Mon Sep 17 00:00:00 2001 From: Athou Date: Tue, 23 Apr 2013 20:05:08 +0200 Subject: [PATCH] allows http threads to continue their job by asynchronously queuing database updates --- .openshift/config/standalone.xml | 12 ++- .../backend/feeds/FeedRefreshUpdater.java | 77 +++++++++++++++++++ .../backend/feeds/FeedRefreshWorker.java | 48 +++++++++--- src/main/tomee/conf/tomee.xml | 11 ++- 4 files changed, 129 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java diff --git a/.openshift/config/standalone.xml b/.openshift/config/standalone.xml index d2746e83..587d8206 100644 --- a/.openshift/config/standalone.xml +++ b/.openshift/config/standalone.xml @@ -303,8 +303,8 @@ - true - true + false + false false 102400 2 @@ -359,6 +359,7 @@ + @@ -377,6 +379,12 @@ + + + + + + diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java new file mode 100644 index 00000000..0140a78d --- /dev/null +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -0,0 +1,77 @@ +package com.commafeed.backend.feeds; + +import java.io.Serializable; +import java.util.Collection; + +import javax.ejb.ActivationConfigProperty; +import javax.ejb.MessageDriven; +import javax.inject.Inject; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.commafeed.backend.dao.FeedDAO; +import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedEntry; +import com.commafeed.backend.services.FeedUpdateService; + +@MessageDriven(name = "FeedRefreshUpdater", activationConfig = { + @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"), + @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/refreshQueue") }) +public class FeedRefreshUpdater implements MessageListener { + + private static Logger log = LoggerFactory + .getLogger(FeedRefreshUpdater.class); + + @Inject + FeedDAO feedDAO; + + @Inject + FeedUpdateService feedUpdateService; + + @Override + public void onMessage(Message message) { + if (message instanceof ObjectMessage) { + ObjectMessage objectMessage = (ObjectMessage) message; + try { + FeedRefreshTask task = (FeedRefreshTask) objectMessage + .getObject(); + + if (task.getEntries() != null) { + feedUpdateService.updateEntries(task.getFeed(), + task.getEntries()); + } + feedDAO.update(task.getFeed()); + } catch (JMSException e) { + log.error(e.getMessage(), e); + } + } + } + + public static class FeedRefreshTask implements Serializable { + + private static final long serialVersionUID = 1L; + + private Feed feed; + private Collection entries; + + public FeedRefreshTask(Feed feed, Collection entries) { + this.feed = feed; + this.entries = entries; + } + + public Feed getFeed() { + return feed; + } + + public Collection getEntries() { + return entries; + } + + } + +} diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index bd93205e..be47c259 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -1,9 +1,19 @@ package com.commafeed.backend.feeds; import java.util.Calendar; +import java.util.Collection; import java.util.Date; +import javax.annotation.Resource; import javax.inject.Inject; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.NotSupportedException; @@ -16,24 +26,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.commafeed.backend.HttpGetter.NotModifiedException; -import com.commafeed.backend.dao.FeedDAO; +import com.commafeed.backend.feeds.FeedRefreshUpdater.FeedRefreshTask; import com.commafeed.backend.model.Feed; -import com.commafeed.backend.services.FeedUpdateService; +import com.commafeed.backend.model.FeedEntry; public class FeedRefreshWorker { private static Logger log = LoggerFactory .getLogger(FeedRefreshWorker.class); + @Resource(name = "jms/refreshQueue") + private Queue queue; + + @Resource + private ConnectionFactory connectionFactory; + @Inject FeedFetcher fetcher; - @Inject - FeedDAO feedDAO; - - @Inject - FeedUpdateService feedUpdateService; - @Inject FeedRefreshTaskGiver taskGiver; @@ -68,7 +78,7 @@ public class FeedRefreshWorker { private void update(Feed feed) throws NotSupportedException, SystemException, SecurityException, IllegalStateException, RollbackException, HeuristicMixedException, - HeuristicRollbackException { + HeuristicRollbackException, JMSException { String message = null; int errorCount = 0; @@ -115,16 +125,32 @@ public class FeedRefreshWorker { feed.setMessage(message); feed.setDisabledUntil(disabledUntil); + Collection entries = null; if (fetchedFeed != null) { feed.setLink(fetchedFeed.getLink()); feed.setLastModifiedHeader(fetchedFeed.getLastModifiedHeader()); feed.setEtagHeader(fetchedFeed.getEtagHeader()); - feedUpdateService.updateEntries(feed, fetchedFeed.getEntries()); + entries = fetchedFeed.getEntries(); } - feedDAO.update(feed); + FeedRefreshTask task = new FeedRefreshTask(feed, entries); + send(task); } + private void send(FeedRefreshTask task) throws JMSException { + Connection connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.setDisableMessageID(true); + producer.setDisableMessageTimestamp(true); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + ObjectMessage message = session.createObjectMessage(task); + producer.send(message); + connection.close(); + } + private Feed getNextFeed() { return taskGiver.take(); } diff --git a/src/main/tomee/conf/tomee.xml b/src/main/tomee/conf/tomee.xml index b53a4034..0a0dbc52 100644 --- a/src/main/tomee/conf/tomee.xml +++ b/src/main/tomee/conf/tomee.xml @@ -9,11 +9,10 @@ MaxActive 50 --> - + +