diff --git a/src/main/java/com/commafeed/backend/dao/FeedEntryStatusDAO.java b/src/main/java/com/commafeed/backend/dao/FeedEntryStatusDAO.java index a4189435..098b20ce 100644 --- a/src/main/java/com/commafeed/backend/dao/FeedEntryStatusDAO.java +++ b/src/main/java/com/commafeed/backend/dao/FeedEntryStatusDAO.java @@ -205,7 +205,6 @@ public class FeedEntryStatusDAO extends GenericDAO { for (Map map : list) { FeedEntryStatus status = (FeedEntryStatus) map.get(ALIAS_STATUS); FeedEntry entry = (FeedEntry) map.get(ALIAS_ENTRY); - entry.setSubscription(sub); status = handleStatus(status, sub, entry); diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshContext.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshContext.java new file mode 100644 index 00000000..155ecbca --- /dev/null +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshContext.java @@ -0,0 +1,42 @@ +package com.commafeed.backend.feeds; + +import java.util.List; + +import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedEntry; + +public class FeedRefreshContext { + private Feed feed; + private List entries; + private boolean isUrgent; + + public FeedRefreshContext(Feed feed, boolean isUrgent) { + this.feed = feed; + this.isUrgent = isUrgent; + } + + public Feed getFeed() { + return feed; + } + + public void setFeed(Feed feed) { + this.feed = feed; + } + + public boolean isUrgent() { + return isUrgent; + } + + public void setUrgent(boolean isUrgent) { + this.isUrgent = isUrgent; + } + + public List getEntries() { + return entries; + } + + public void setEntries(List entries) { + this.entries = entries; + } + +} diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java index e80d194f..e2ac9872 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java @@ -49,8 +49,8 @@ public class FeedRefreshTaskGiver { private int backgroundThreads; - private Queue addQueue = Queues.newConcurrentLinkedQueue(); - private Queue takeQueue = Queues.newConcurrentLinkedQueue(); + private Queue addQueue = Queues.newConcurrentLinkedQueue(); + private Queue takeQueue = Queues.newConcurrentLinkedQueue(); private Queue giveBackQueue = Queues.newConcurrentLinkedQueue(); private ExecutorService executor; @@ -87,10 +87,10 @@ public class FeedRefreshTaskGiver { public void run() { while (!executor.isShutdown()) { try { - Feed feed = take(); - if (feed != null) { + FeedRefreshContext context = take(); + if (context != null) { metricsBean.feedRefreshed(); - worker.updateFeed(feed); + worker.updateFeed(context); } else { log.debug("nothing to do, sleeping for 15s"); metricsBean.threadWaited(); @@ -111,14 +111,14 @@ public class FeedRefreshTaskGiver { /** * take a feed from the refresh queue */ - private Feed take() { - Feed feed = takeQueue.poll(); + private FeedRefreshContext take() { + FeedRefreshContext context = takeQueue.poll(); - if (feed == null) { + if (context == null) { refill(); - feed = takeQueue.poll(); + context = takeQueue.poll(); } - return feed; + return context; } public Long getUpdatableCount() { @@ -128,10 +128,10 @@ public class FeedRefreshTaskGiver { /** * add a feed to the refresh queue */ - public void add(Feed feed) { + public void add(Feed feed, boolean urgent) { int refreshInterval = applicationSettingsService.get().getRefreshIntervalMinutes(); if (feed.getLastUpdated() == null || feed.getLastUpdated().before(DateUtils.addMinutes(new Date(), -1 * refreshInterval))) { - addQueue.add(feed); + addQueue.add(new FeedRefreshContext(feed, urgent)); } } @@ -142,26 +142,28 @@ public class FeedRefreshTaskGiver { int count = Math.min(300, 3 * backgroundThreads); // first, get feeds that are up to refresh from the database - List feeds = null; - if (applicationSettingsService.get().isCrawlingPaused()) { - feeds = Lists.newArrayList(); - } else { - feeds = feedDAO.findNextUpdatable(count); + List contexts = Lists.newArrayList(); + if (!applicationSettingsService.get().isCrawlingPaused()) { + List feeds = feedDAO.findNextUpdatable(count); + for (Feed feed : feeds) { + contexts.add(new FeedRefreshContext(feed, false)); + } } // then, add to those the feeds we got from the add() method. We add them at the beginning of the list as they probably have a // higher priority int size = addQueue.size(); for (int i = 0; i < size; i++) { - feeds.add(0, addQueue.poll()); + contexts.add(0, addQueue.poll()); } // set the disabledDate to now as we use the disabledDate in feedDAO to decide what to refresh next. We also use a map to remove // duplicates. - Map map = Maps.newLinkedHashMap(); - for (Feed f : feeds) { - f.setDisabledUntil(new Date()); - map.put(f.getId(), f); + Map map = Maps.newLinkedHashMap(); + for (FeedRefreshContext context : contexts) { + Feed feed = context.getFeed(); + feed.setDisabledUntil(new Date()); + map.put(feed.getId(), context); } // refill the queue @@ -170,12 +172,16 @@ public class FeedRefreshTaskGiver { // add feeds from the giveBack queue to the map, overriding duplicates size = giveBackQueue.size(); for (int i = 0; i < size; i++) { - Feed f = giveBackQueue.poll(); - map.put(f.getId(), f); + Feed feed = giveBackQueue.poll(); + map.put(feed.getId(), new FeedRefreshContext(feed, false)); } // update all feeds in the database - feedDAO.saveOrUpdate(map.values()); + List feeds = Lists.newArrayList(); + for (FeedRefreshContext context : map.values()) { + feeds.add(context.getFeed()); + } + feedDAO.saveOrUpdate(feeds); } /** diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index 2ca810ff..59cd1549 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -1,7 +1,6 @@ package com.commafeed.backend.feeds; import java.util.Arrays; -import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -30,6 +29,7 @@ import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedEntryContent; import com.commafeed.backend.model.FeedSubscription; +import com.commafeed.backend.model.User; import com.commafeed.backend.pubsubhubbub.SubscriptionHandler; import com.commafeed.backend.services.ApplicationSettingsService; import com.commafeed.backend.services.FeedUpdateService; @@ -84,23 +84,23 @@ public class FeedRefreshUpdater { pool.shutdown(); } - public void updateFeed(Feed feed, Collection entries) { - pool.execute(new EntryTask(feed, entries)); + public void updateFeed(FeedRefreshContext context) { + pool.execute(new EntryTask(context)); } private class EntryTask implements Task { - private Feed feed; - private Collection entries; + private FeedRefreshContext context; - public EntryTask(Feed feed, Collection entries) { - this.feed = feed; - this.entries = entries; + public EntryTask(FeedRefreshContext context) { + this.context = context; } @Override public void run() { boolean ok = true; + Feed feed = context.getFeed(); + List entries = context.getEntries(); if (entries.isEmpty() == false) { List lastEntries = cache.getLastEntries(feed); @@ -138,7 +138,7 @@ public class FeedRefreshUpdater { @Override public boolean isUrgent() { - return feed.isUrgent(); + return context.isUrgent(); } } @@ -163,7 +163,14 @@ public class FeedRefreshUpdater { locked1 = lock1.tryLock(1, TimeUnit.MINUTES); locked2 = lock2.tryLock(1, TimeUnit.MINUTES); if (locked1 && locked2) { - feedUpdateService.updateEntry(feed, entry, subscriptions); + feedUpdateService.updateEntry(feed, entry); + List users = Lists.newArrayList(); + for (FeedSubscription sub : subscriptions) { + users.add(sub.getUser()); + } + cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); + cache.invalidateUserRootCategory(users.toArray(new User[0])); + metricsBean.entryInserted(); success = true; } else { log.error("lock timeout for " + feed.getUrl() + " - " + key1); diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index 41a24fb8..53219fdf 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -14,8 +14,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.commafeed.backend.HttpGetter.NotModifiedException; -import com.commafeed.backend.MetricsBean; -import com.commafeed.backend.dao.FeedEntryDAO; import com.commafeed.backend.feeds.FeedRefreshExecutor.Task; import com.commafeed.backend.model.ApplicationSettings; import com.commafeed.backend.model.Feed; @@ -43,12 +41,6 @@ public class FeedRefreshWorker { @Inject ApplicationSettingsService applicationSettingsService; - @Inject - MetricsBean metricsBean; - - @Inject - FeedEntryDAO feedEntryDAO; - private FeedRefreshExecutor pool; @PostConstruct @@ -63,8 +55,8 @@ public class FeedRefreshWorker { pool.shutdown(); } - public void updateFeed(Feed feed) { - pool.execute(new FeedTask(feed)); + public void updateFeed(FeedRefreshContext context) { + pool.execute(new FeedTask(context)); } public int getQueueSize() { @@ -77,24 +69,25 @@ public class FeedRefreshWorker { private class FeedTask implements Task { - private Feed feed; + private FeedRefreshContext context; - public FeedTask(Feed feed) { - this.feed = feed; + public FeedTask(FeedRefreshContext context) { + this.context = context; } @Override public void run() { - update(feed); + update(context); } @Override public boolean isUrgent() { - return feed.isUrgent(); + return context.isUrgent(); } } - private void update(Feed feed) { + private void update(FeedRefreshContext context) { + Feed feed = context.getFeed(); int refreshInterval = applicationSettingsService.get().getRefreshIntervalMinutes(); Date disabledUntil = DateUtils.addMinutes(new Date(), refreshInterval); try { @@ -122,7 +115,8 @@ public class FeedRefreshWorker { feed.setDisabledUntil(disabledUntil); handlePubSub(feed, fetchedFeed.getFeed()); - feedRefreshUpdater.updateFeed(feed, entries); + context.setEntries(entries); + feedRefreshUpdater.updateFeed(context); } catch (NotModifiedException e) { log.debug("Feed not modified : {} - {}", feed.getUrl(), e.getMessage()); diff --git a/src/main/java/com/commafeed/backend/model/Feed.java b/src/main/java/com/commafeed/backend/model/Feed.java index e5771b71..1860b904 100644 --- a/src/main/java/com/commafeed/backend/model/Feed.java +++ b/src/main/java/com/commafeed/backend/model/Feed.java @@ -11,7 +11,6 @@ import javax.persistence.OneToMany; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType; -import javax.persistence.Transient; import org.hibernate.annotations.Cache; import org.hibernate.annotations.CacheConcurrencyStrategy; @@ -126,12 +125,6 @@ public class Feed extends AbstractModel { @Temporal(TemporalType.TIMESTAMP) private Date pushLastPing; - /** - * Denotes a feed that needs to be refreshed before others. Currently used when a feed is queued manually for refresh. Not persisted. - */ - @Transient - private boolean urgent; - public Feed() { } @@ -276,14 +269,6 @@ public class Feed extends AbstractModel { this.pushTopicHash = pushTopicHash; } - public boolean isUrgent() { - return urgent; - } - - public void setUrgent(boolean urgent) { - this.urgent = urgent; - } - public String getNormalizedUrl() { return normalizedUrl; } diff --git a/src/main/java/com/commafeed/backend/model/FeedEntry.java b/src/main/java/com/commafeed/backend/model/FeedEntry.java index eec59d2c..0394a8cf 100644 --- a/src/main/java/com/commafeed/backend/model/FeedEntry.java +++ b/src/main/java/com/commafeed/backend/model/FeedEntry.java @@ -15,7 +15,6 @@ import javax.persistence.OneToOne; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType; -import javax.persistence.Transient; import org.hibernate.annotations.Cache; import org.hibernate.annotations.CacheConcurrencyStrategy; @@ -52,12 +51,6 @@ public class FeedEntry extends AbstractModel { @OneToMany(mappedBy = "entry", cascade = CascadeType.REMOVE) private Set statuses; - /** - * useful placeholder for the subscription, not persisted - */ - @Transient - private FeedSubscription subscription; - public String getGuid() { return guid; } @@ -121,13 +114,4 @@ public class FeedEntry extends AbstractModel { public void setFeed(Feed feed) { this.feed = feed; } - - public FeedSubscription getSubscription() { - return subscription; - } - - public void setSubscription(FeedSubscription subscription) { - this.subscription = subscription; - } - } diff --git a/src/main/java/com/commafeed/backend/services/FeedSubscriptionService.java b/src/main/java/com/commafeed/backend/services/FeedSubscriptionService.java index 81ebf654..c321dab9 100644 --- a/src/main/java/com/commafeed/backend/services/FeedSubscriptionService.java +++ b/src/main/java/com/commafeed/backend/services/FeedSubscriptionService.java @@ -79,7 +79,7 @@ public class FeedSubscriptionService { sub.setTitle(FeedUtils.truncate(title, 128)); feedSubscriptionDAO.saveOrUpdate(sub); - taskGiver.add(feed); + taskGiver.add(feed, false); cache.invalidateUserRootCategory(user); return feed; } diff --git a/src/main/java/com/commafeed/backend/services/FeedUpdateService.java b/src/main/java/com/commafeed/backend/services/FeedUpdateService.java index c07b869d..e1cc0d4a 100644 --- a/src/main/java/com/commafeed/backend/services/FeedUpdateService.java +++ b/src/main/java/com/commafeed/backend/services/FeedUpdateService.java @@ -1,55 +1,30 @@ package com.commafeed.backend.services; import java.util.Date; -import java.util.List; import javax.ejb.Stateless; import javax.inject.Inject; -import javax.persistence.EntityManager; -import javax.persistence.PersistenceContext; import org.apache.commons.codec.digest.DigestUtils; -import com.commafeed.backend.MetricsBean; -import com.commafeed.backend.cache.CacheService; import com.commafeed.backend.dao.FeedEntryDAO; -import com.commafeed.backend.dao.FeedEntryStatusDAO; -import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedEntryContent; -import com.commafeed.backend.model.FeedSubscription; -import com.commafeed.backend.model.User; -import com.google.common.collect.Lists; @Stateless public class FeedUpdateService { - @PersistenceContext - protected EntityManager em; - - @Inject - FeedSubscriptionDAO feedSubscriptionDAO; - @Inject FeedEntryDAO feedEntryDAO; - @Inject - FeedEntryStatusDAO feedEntryStatusDAO; - - @Inject - MetricsBean metricsBean; - - @Inject - CacheService cache; - @Inject FeedEntryContentService feedEntryContentService; /** * this is NOT thread-safe */ - public void updateEntry(Feed feed, FeedEntry entry, List subscriptions) { + public void updateEntry(Feed feed, FeedEntry entry) { Long existing = feedEntryDAO.findExisting(entry.getGuid(), feed.getId()); if (existing != null) { @@ -62,14 +37,6 @@ public class FeedUpdateService { entry.setInserted(new Date()); entry.setFeed(feed); - List users = Lists.newArrayList(); - for (FeedSubscription sub : subscriptions) { - User user = sub.getUser(); - users.add(user); - } feedEntryDAO.saveOrUpdate(entry); - metricsBean.entryInserted(); - cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); - cache.invalidateUserRootCategory(users.toArray(new User[0])); } } diff --git a/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java b/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java index 83786a9c..00b666fb 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java @@ -255,8 +255,7 @@ public class FeedREST extends AbstractResourceREST { List subs = feedSubscriptionDAO.findAll(getUser()); for (FeedSubscription sub : subs) { Feed feed = sub.getFeed(); - feed.setUrgent(true); - taskGiver.add(feed); + taskGiver.add(feed, true); } return Response.ok(Status.OK).build(); } @@ -272,8 +271,7 @@ public class FeedREST extends AbstractResourceREST { FeedSubscription sub = feedSubscriptionDAO.findById(getUser(), req.getId()); if (sub != null) { Feed feed = sub.getFeed(); - feed.setUrgent(true); - taskGiver.add(feed); + taskGiver.add(feed, true); return Response.ok(Status.OK).build(); } return Response.ok(Status.NOT_FOUND).build(); diff --git a/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java b/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java index 6c5c0e67..ea3ec2b6 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java @@ -114,7 +114,7 @@ public class PubSubHubbubCallbackREST { for (Feed feed : feeds) { log.debug("pushing content to queue for {}", feed.getUrl()); - taskGiver.add(feed); + taskGiver.add(feed, false); } metricsBean.pushReceived(feeds.size());