From f54e41897df9058d42a3e6b87287e9793886d592 Mon Sep 17 00:00:00 2001 From: Athou Date: Sat, 22 Jun 2013 20:06:09 +0200 Subject: [PATCH] manually queue feeds for refresh (#316) --- .../backend/feeds/FeedRefreshTaskGiver.java | 8 +++--- .../backend/feeds/FeedRefreshUpdater.java | 20 +++++++++----- .../com/commafeed/backend/model/Feed.java | 16 ++++++++++++ .../frontend/rest/resources/FeedREST.java | 26 +++++++++---------- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java index fa04a2d0..302630bc 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java @@ -49,10 +49,8 @@ public class FeedRefreshTaskGiver { Date threshold = DateUtils.addMinutes(now, heavyLoad ? -10 : -1); if (feed.getLastUpdated() == null || feed.getLastUpdated().before(threshold)) { - feed.setEtagHeader(null); - feed.setLastModifiedHeader(null); + addQueue.add(feed); } - addQueue.add(feed); } public synchronized Feed take() { @@ -77,10 +75,10 @@ public class FeedRefreshTaskGiver { int size = addQueue.size(); for (int i = 0; i < size; i++) { - feeds.add(addQueue.poll()); + feeds.add(0, addQueue.poll()); } - Map map = Maps.newHashMap(); + Map map = Maps.newLinkedHashMap(); for (Feed f : feeds) { f.setLastUpdated(now); map.put(f.getId(), f); diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index be450af6..1ac2dda3 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -4,8 +4,7 @@ import java.util.Calendar; import java.util.Collection; import java.util.Date; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -65,8 +64,8 @@ public class FeedRefreshUpdater { FeedEntryDAO feedEntryDAO; private ThreadPoolExecutor pool; - private BlockingQueue queue; private Striped locks; + private LinkedBlockingDeque queue; @PostConstruct public void init() { @@ -76,15 +75,20 @@ public class FeedRefreshUpdater { locks = Striped.lazyWeakLock(threads * 100000); pool = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, - queue = new ArrayBlockingQueue(500 * threads)); + queue = new LinkedBlockingDeque(500 * threads)); pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { log.debug("Thread queue full, waiting..."); try { - e.getQueue().put(r); + Task task = (Task) r; + if (task.getFeed().isUrgent()) { + queue.putFirst(r); + } else { + queue.put(r); + } } catch (InterruptedException e1) { - log.error("Interrupted while waiting for queue.", e); + log.error("Interrupted while waiting for queue.", e1); } } }); @@ -136,6 +140,10 @@ public class FeedRefreshUpdater { metricsBean.feedUpdated(); taskGiver.giveBack(feed); } + + public Feed getFeed() { + return feed; + } } private boolean updateEntry(final Feed feed, final FeedEntry entry, diff --git a/src/main/java/com/commafeed/backend/model/Feed.java b/src/main/java/com/commafeed/backend/model/Feed.java index 1393b930..924b2759 100644 --- a/src/main/java/com/commafeed/backend/model/Feed.java +++ b/src/main/java/com/commafeed/backend/model/Feed.java @@ -12,6 +12,7 @@ import javax.persistence.OneToMany; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType; +import javax.persistence.Transient; import org.hibernate.annotations.Cache; import org.hibernate.annotations.CacheConcurrencyStrategy; @@ -131,6 +132,13 @@ public class Feed extends AbstractModel { @Temporal(TemporalType.TIMESTAMP) private Date pushLastPing; + /** + * Denotes a feed that needs to be refreshed before others. Currently used + * when a feed is queued manually for refresh. Not persisted. + */ + @Transient + private boolean urgent; + public Feed() { } @@ -299,4 +307,12 @@ public class Feed extends AbstractModel { this.pushTopicHash = pushTopicHash; } + public boolean isUrgent() { + return urgent; + } + + public void setUrgent(boolean urgent) { + this.urgent = urgent; + } + } diff --git a/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java b/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java index f9ae3809..f1957e9b 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/FeedREST.java @@ -209,20 +209,20 @@ public class FeedREST extends AbstractResourceREST { @POST @ApiOperation(value = "Queue a feed for refresh", notes = "Manually add a feed to the refresh queue") public Response queueForRefresh(@ApiParam(value = "Feed id") IDRequest req) { - // TODO evaluate if this is needed - - // Preconditions.checkNotNull(req); - // Preconditions.checkNotNull(req.getId()); - // - // FeedSubscription sub = feedSubscriptionDAO.findById(getUser(), - // req.getId()); - // if (sub != null) { - // taskGiver.add(sub.getFeed()); - // return Response.ok(Status.OK).build(); - // } - // return Response.ok(Status.NOT_FOUND).build(); - return Response.ok("Disabled for now").build(); + Preconditions.checkNotNull(req); + Preconditions.checkNotNull(req.getId()); + + FeedSubscription sub = feedSubscriptionDAO.findById(getUser(), + req.getId()); + if (sub != null) { + Feed feed = sub.getFeed(); + feed.setUrgent(true); + taskGiver.add(feed); + return Response.ok(Status.OK).build(); + } + return Response.ok(Status.NOT_FOUND).build(); + } @Path("/mark")