mirror of
https://github.com/Athou/commafeed.git
synced 2026-03-21 21:37:29 +00:00
pass a context object around instead of creating transient fields in model objects
This commit is contained in:
@@ -205,7 +205,6 @@ public class FeedEntryStatusDAO extends GenericDAO<FeedEntryStatus> {
|
||||
for (Map<String, Object> map : list) {
|
||||
FeedEntryStatus status = (FeedEntryStatus) map.get(ALIAS_STATUS);
|
||||
FeedEntry entry = (FeedEntry) map.get(ALIAS_ENTRY);
|
||||
entry.setSubscription(sub);
|
||||
|
||||
status = handleStatus(status, sub, entry);
|
||||
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
package com.commafeed.backend.feeds;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
|
||||
public class FeedRefreshContext {
|
||||
private Feed feed;
|
||||
private List<FeedEntry> entries;
|
||||
private boolean isUrgent;
|
||||
|
||||
public FeedRefreshContext(Feed feed, boolean isUrgent) {
|
||||
this.feed = feed;
|
||||
this.isUrgent = isUrgent;
|
||||
}
|
||||
|
||||
public Feed getFeed() {
|
||||
return feed;
|
||||
}
|
||||
|
||||
public void setFeed(Feed feed) {
|
||||
this.feed = feed;
|
||||
}
|
||||
|
||||
public boolean isUrgent() {
|
||||
return isUrgent;
|
||||
}
|
||||
|
||||
public void setUrgent(boolean isUrgent) {
|
||||
this.isUrgent = isUrgent;
|
||||
}
|
||||
|
||||
public List<FeedEntry> getEntries() {
|
||||
return entries;
|
||||
}
|
||||
|
||||
public void setEntries(List<FeedEntry> entries) {
|
||||
this.entries = entries;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -49,8 +49,8 @@ public class FeedRefreshTaskGiver {
|
||||
|
||||
private int backgroundThreads;
|
||||
|
||||
private Queue<Feed> addQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<Feed> takeQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<FeedRefreshContext> addQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<FeedRefreshContext> takeQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<Feed> giveBackQueue = Queues.newConcurrentLinkedQueue();
|
||||
|
||||
private ExecutorService executor;
|
||||
@@ -87,10 +87,10 @@ public class FeedRefreshTaskGiver {
|
||||
public void run() {
|
||||
while (!executor.isShutdown()) {
|
||||
try {
|
||||
Feed feed = take();
|
||||
if (feed != null) {
|
||||
FeedRefreshContext context = take();
|
||||
if (context != null) {
|
||||
metricsBean.feedRefreshed();
|
||||
worker.updateFeed(feed);
|
||||
worker.updateFeed(context);
|
||||
} else {
|
||||
log.debug("nothing to do, sleeping for 15s");
|
||||
metricsBean.threadWaited();
|
||||
@@ -111,14 +111,14 @@ public class FeedRefreshTaskGiver {
|
||||
/**
|
||||
* take a feed from the refresh queue
|
||||
*/
|
||||
private Feed take() {
|
||||
Feed feed = takeQueue.poll();
|
||||
private FeedRefreshContext take() {
|
||||
FeedRefreshContext context = takeQueue.poll();
|
||||
|
||||
if (feed == null) {
|
||||
if (context == null) {
|
||||
refill();
|
||||
feed = takeQueue.poll();
|
||||
context = takeQueue.poll();
|
||||
}
|
||||
return feed;
|
||||
return context;
|
||||
}
|
||||
|
||||
public Long getUpdatableCount() {
|
||||
@@ -128,10 +128,10 @@ public class FeedRefreshTaskGiver {
|
||||
/**
|
||||
* add a feed to the refresh queue
|
||||
*/
|
||||
public void add(Feed feed) {
|
||||
public void add(Feed feed, boolean urgent) {
|
||||
int refreshInterval = applicationSettingsService.get().getRefreshIntervalMinutes();
|
||||
if (feed.getLastUpdated() == null || feed.getLastUpdated().before(DateUtils.addMinutes(new Date(), -1 * refreshInterval))) {
|
||||
addQueue.add(feed);
|
||||
addQueue.add(new FeedRefreshContext(feed, urgent));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,26 +142,28 @@ public class FeedRefreshTaskGiver {
|
||||
int count = Math.min(300, 3 * backgroundThreads);
|
||||
|
||||
// first, get feeds that are up to refresh from the database
|
||||
List<Feed> feeds = null;
|
||||
if (applicationSettingsService.get().isCrawlingPaused()) {
|
||||
feeds = Lists.newArrayList();
|
||||
} else {
|
||||
feeds = feedDAO.findNextUpdatable(count);
|
||||
List<FeedRefreshContext> contexts = Lists.newArrayList();
|
||||
if (!applicationSettingsService.get().isCrawlingPaused()) {
|
||||
List<Feed> feeds = feedDAO.findNextUpdatable(count);
|
||||
for (Feed feed : feeds) {
|
||||
contexts.add(new FeedRefreshContext(feed, false));
|
||||
}
|
||||
}
|
||||
|
||||
// then, add to those the feeds we got from the add() method. We add them at the beginning of the list as they probably have a
|
||||
// higher priority
|
||||
int size = addQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
feeds.add(0, addQueue.poll());
|
||||
contexts.add(0, addQueue.poll());
|
||||
}
|
||||
|
||||
// set the disabledDate to now as we use the disabledDate in feedDAO to decide what to refresh next. We also use a map to remove
|
||||
// duplicates.
|
||||
Map<Long, Feed> map = Maps.newLinkedHashMap();
|
||||
for (Feed f : feeds) {
|
||||
f.setDisabledUntil(new Date());
|
||||
map.put(f.getId(), f);
|
||||
Map<Long, FeedRefreshContext> map = Maps.newLinkedHashMap();
|
||||
for (FeedRefreshContext context : contexts) {
|
||||
Feed feed = context.getFeed();
|
||||
feed.setDisabledUntil(new Date());
|
||||
map.put(feed.getId(), context);
|
||||
}
|
||||
|
||||
// refill the queue
|
||||
@@ -170,12 +172,16 @@ public class FeedRefreshTaskGiver {
|
||||
// add feeds from the giveBack queue to the map, overriding duplicates
|
||||
size = giveBackQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
Feed f = giveBackQueue.poll();
|
||||
map.put(f.getId(), f);
|
||||
Feed feed = giveBackQueue.poll();
|
||||
map.put(feed.getId(), new FeedRefreshContext(feed, false));
|
||||
}
|
||||
|
||||
// update all feeds in the database
|
||||
feedDAO.saveOrUpdate(map.values());
|
||||
List<Feed> feeds = Lists.newArrayList();
|
||||
for (FeedRefreshContext context : map.values()) {
|
||||
feeds.add(context.getFeed());
|
||||
}
|
||||
feedDAO.saveOrUpdate(feeds);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.commafeed.backend.feeds;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@@ -30,6 +29,7 @@ import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
import com.commafeed.backend.model.FeedEntryContent;
|
||||
import com.commafeed.backend.model.FeedSubscription;
|
||||
import com.commafeed.backend.model.User;
|
||||
import com.commafeed.backend.pubsubhubbub.SubscriptionHandler;
|
||||
import com.commafeed.backend.services.ApplicationSettingsService;
|
||||
import com.commafeed.backend.services.FeedUpdateService;
|
||||
@@ -84,23 +84,23 @@ public class FeedRefreshUpdater {
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
public void updateFeed(Feed feed, Collection<FeedEntry> entries) {
|
||||
pool.execute(new EntryTask(feed, entries));
|
||||
public void updateFeed(FeedRefreshContext context) {
|
||||
pool.execute(new EntryTask(context));
|
||||
}
|
||||
|
||||
private class EntryTask implements Task {
|
||||
|
||||
private Feed feed;
|
||||
private Collection<FeedEntry> entries;
|
||||
private FeedRefreshContext context;
|
||||
|
||||
public EntryTask(Feed feed, Collection<FeedEntry> entries) {
|
||||
this.feed = feed;
|
||||
this.entries = entries;
|
||||
public EntryTask(FeedRefreshContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
boolean ok = true;
|
||||
Feed feed = context.getFeed();
|
||||
List<FeedEntry> entries = context.getEntries();
|
||||
if (entries.isEmpty() == false) {
|
||||
|
||||
List<String> lastEntries = cache.getLastEntries(feed);
|
||||
@@ -138,7 +138,7 @@ public class FeedRefreshUpdater {
|
||||
|
||||
@Override
|
||||
public boolean isUrgent() {
|
||||
return feed.isUrgent();
|
||||
return context.isUrgent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -163,7 +163,14 @@ public class FeedRefreshUpdater {
|
||||
locked1 = lock1.tryLock(1, TimeUnit.MINUTES);
|
||||
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
|
||||
if (locked1 && locked2) {
|
||||
feedUpdateService.updateEntry(feed, entry, subscriptions);
|
||||
feedUpdateService.updateEntry(feed, entry);
|
||||
List<User> users = Lists.newArrayList();
|
||||
for (FeedSubscription sub : subscriptions) {
|
||||
users.add(sub.getUser());
|
||||
}
|
||||
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
|
||||
cache.invalidateUserRootCategory(users.toArray(new User[0]));
|
||||
metricsBean.entryInserted();
|
||||
success = true;
|
||||
} else {
|
||||
log.error("lock timeout for " + feed.getUrl() + " - " + key1);
|
||||
|
||||
@@ -14,8 +14,6 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.commafeed.backend.HttpGetter.NotModifiedException;
|
||||
import com.commafeed.backend.MetricsBean;
|
||||
import com.commafeed.backend.dao.FeedEntryDAO;
|
||||
import com.commafeed.backend.feeds.FeedRefreshExecutor.Task;
|
||||
import com.commafeed.backend.model.ApplicationSettings;
|
||||
import com.commafeed.backend.model.Feed;
|
||||
@@ -43,12 +41,6 @@ public class FeedRefreshWorker {
|
||||
@Inject
|
||||
ApplicationSettingsService applicationSettingsService;
|
||||
|
||||
@Inject
|
||||
MetricsBean metricsBean;
|
||||
|
||||
@Inject
|
||||
FeedEntryDAO feedEntryDAO;
|
||||
|
||||
private FeedRefreshExecutor pool;
|
||||
|
||||
@PostConstruct
|
||||
@@ -63,8 +55,8 @@ public class FeedRefreshWorker {
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
public void updateFeed(Feed feed) {
|
||||
pool.execute(new FeedTask(feed));
|
||||
public void updateFeed(FeedRefreshContext context) {
|
||||
pool.execute(new FeedTask(context));
|
||||
}
|
||||
|
||||
public int getQueueSize() {
|
||||
@@ -77,24 +69,25 @@ public class FeedRefreshWorker {
|
||||
|
||||
private class FeedTask implements Task {
|
||||
|
||||
private Feed feed;
|
||||
private FeedRefreshContext context;
|
||||
|
||||
public FeedTask(Feed feed) {
|
||||
this.feed = feed;
|
||||
public FeedTask(FeedRefreshContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
update(feed);
|
||||
update(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUrgent() {
|
||||
return feed.isUrgent();
|
||||
return context.isUrgent();
|
||||
}
|
||||
}
|
||||
|
||||
private void update(Feed feed) {
|
||||
private void update(FeedRefreshContext context) {
|
||||
Feed feed = context.getFeed();
|
||||
int refreshInterval = applicationSettingsService.get().getRefreshIntervalMinutes();
|
||||
Date disabledUntil = DateUtils.addMinutes(new Date(), refreshInterval);
|
||||
try {
|
||||
@@ -122,7 +115,8 @@ public class FeedRefreshWorker {
|
||||
feed.setDisabledUntil(disabledUntil);
|
||||
|
||||
handlePubSub(feed, fetchedFeed.getFeed());
|
||||
feedRefreshUpdater.updateFeed(feed, entries);
|
||||
context.setEntries(entries);
|
||||
feedRefreshUpdater.updateFeed(context);
|
||||
|
||||
} catch (NotModifiedException e) {
|
||||
log.debug("Feed not modified : {} - {}", feed.getUrl(), e.getMessage());
|
||||
|
||||
@@ -11,7 +11,6 @@ import javax.persistence.OneToMany;
|
||||
import javax.persistence.Table;
|
||||
import javax.persistence.Temporal;
|
||||
import javax.persistence.TemporalType;
|
||||
import javax.persistence.Transient;
|
||||
|
||||
import org.hibernate.annotations.Cache;
|
||||
import org.hibernate.annotations.CacheConcurrencyStrategy;
|
||||
@@ -126,12 +125,6 @@ public class Feed extends AbstractModel {
|
||||
@Temporal(TemporalType.TIMESTAMP)
|
||||
private Date pushLastPing;
|
||||
|
||||
/**
|
||||
* Denotes a feed that needs to be refreshed before others. Currently used when a feed is queued manually for refresh. Not persisted.
|
||||
*/
|
||||
@Transient
|
||||
private boolean urgent;
|
||||
|
||||
public Feed() {
|
||||
|
||||
}
|
||||
@@ -276,14 +269,6 @@ public class Feed extends AbstractModel {
|
||||
this.pushTopicHash = pushTopicHash;
|
||||
}
|
||||
|
||||
public boolean isUrgent() {
|
||||
return urgent;
|
||||
}
|
||||
|
||||
public void setUrgent(boolean urgent) {
|
||||
this.urgent = urgent;
|
||||
}
|
||||
|
||||
public String getNormalizedUrl() {
|
||||
return normalizedUrl;
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import javax.persistence.OneToOne;
|
||||
import javax.persistence.Table;
|
||||
import javax.persistence.Temporal;
|
||||
import javax.persistence.TemporalType;
|
||||
import javax.persistence.Transient;
|
||||
|
||||
import org.hibernate.annotations.Cache;
|
||||
import org.hibernate.annotations.CacheConcurrencyStrategy;
|
||||
@@ -52,12 +51,6 @@ public class FeedEntry extends AbstractModel {
|
||||
@OneToMany(mappedBy = "entry", cascade = CascadeType.REMOVE)
|
||||
private Set<FeedEntryStatus> statuses;
|
||||
|
||||
/**
|
||||
* useful placeholder for the subscription, not persisted
|
||||
*/
|
||||
@Transient
|
||||
private FeedSubscription subscription;
|
||||
|
||||
public String getGuid() {
|
||||
return guid;
|
||||
}
|
||||
@@ -121,13 +114,4 @@ public class FeedEntry extends AbstractModel {
|
||||
public void setFeed(Feed feed) {
|
||||
this.feed = feed;
|
||||
}
|
||||
|
||||
public FeedSubscription getSubscription() {
|
||||
return subscription;
|
||||
}
|
||||
|
||||
public void setSubscription(FeedSubscription subscription) {
|
||||
this.subscription = subscription;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -79,7 +79,7 @@ public class FeedSubscriptionService {
|
||||
sub.setTitle(FeedUtils.truncate(title, 128));
|
||||
feedSubscriptionDAO.saveOrUpdate(sub);
|
||||
|
||||
taskGiver.add(feed);
|
||||
taskGiver.add(feed, false);
|
||||
cache.invalidateUserRootCategory(user);
|
||||
return feed;
|
||||
}
|
||||
|
||||
@@ -1,55 +1,30 @@
|
||||
package com.commafeed.backend.services;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import javax.ejb.Stateless;
|
||||
import javax.inject.Inject;
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.PersistenceContext;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
|
||||
import com.commafeed.backend.MetricsBean;
|
||||
import com.commafeed.backend.cache.CacheService;
|
||||
import com.commafeed.backend.dao.FeedEntryDAO;
|
||||
import com.commafeed.backend.dao.FeedEntryStatusDAO;
|
||||
import com.commafeed.backend.dao.FeedSubscriptionDAO;
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
import com.commafeed.backend.model.FeedEntryContent;
|
||||
import com.commafeed.backend.model.FeedSubscription;
|
||||
import com.commafeed.backend.model.User;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@Stateless
|
||||
public class FeedUpdateService {
|
||||
|
||||
@PersistenceContext
|
||||
protected EntityManager em;
|
||||
|
||||
@Inject
|
||||
FeedSubscriptionDAO feedSubscriptionDAO;
|
||||
|
||||
@Inject
|
||||
FeedEntryDAO feedEntryDAO;
|
||||
|
||||
@Inject
|
||||
FeedEntryStatusDAO feedEntryStatusDAO;
|
||||
|
||||
@Inject
|
||||
MetricsBean metricsBean;
|
||||
|
||||
@Inject
|
||||
CacheService cache;
|
||||
|
||||
@Inject
|
||||
FeedEntryContentService feedEntryContentService;
|
||||
|
||||
/**
|
||||
* this is NOT thread-safe
|
||||
*/
|
||||
public void updateEntry(Feed feed, FeedEntry entry, List<FeedSubscription> subscriptions) {
|
||||
public void updateEntry(Feed feed, FeedEntry entry) {
|
||||
|
||||
Long existing = feedEntryDAO.findExisting(entry.getGuid(), feed.getId());
|
||||
if (existing != null) {
|
||||
@@ -62,14 +37,6 @@ public class FeedUpdateService {
|
||||
entry.setInserted(new Date());
|
||||
entry.setFeed(feed);
|
||||
|
||||
List<User> users = Lists.newArrayList();
|
||||
for (FeedSubscription sub : subscriptions) {
|
||||
User user = sub.getUser();
|
||||
users.add(user);
|
||||
}
|
||||
feedEntryDAO.saveOrUpdate(entry);
|
||||
metricsBean.entryInserted();
|
||||
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
|
||||
cache.invalidateUserRootCategory(users.toArray(new User[0]));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -255,8 +255,7 @@ public class FeedREST extends AbstractResourceREST {
|
||||
List<FeedSubscription> subs = feedSubscriptionDAO.findAll(getUser());
|
||||
for (FeedSubscription sub : subs) {
|
||||
Feed feed = sub.getFeed();
|
||||
feed.setUrgent(true);
|
||||
taskGiver.add(feed);
|
||||
taskGiver.add(feed, true);
|
||||
}
|
||||
return Response.ok(Status.OK).build();
|
||||
}
|
||||
@@ -272,8 +271,7 @@ public class FeedREST extends AbstractResourceREST {
|
||||
FeedSubscription sub = feedSubscriptionDAO.findById(getUser(), req.getId());
|
||||
if (sub != null) {
|
||||
Feed feed = sub.getFeed();
|
||||
feed.setUrgent(true);
|
||||
taskGiver.add(feed);
|
||||
taskGiver.add(feed, true);
|
||||
return Response.ok(Status.OK).build();
|
||||
}
|
||||
return Response.ok(Status.NOT_FOUND).build();
|
||||
|
||||
@@ -114,7 +114,7 @@ public class PubSubHubbubCallbackREST {
|
||||
|
||||
for (Feed feed : feeds) {
|
||||
log.debug("pushing content to queue for {}", feed.getUrl());
|
||||
taskGiver.add(feed);
|
||||
taskGiver.add(feed, false);
|
||||
}
|
||||
metricsBean.pushReceived(feeds.size());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user