manually queue feeds for refresh (#316)

This commit is contained in:
Athou
2013-06-22 20:06:09 +02:00
parent ddfca39d61
commit f54e41897d
4 changed files with 46 additions and 24 deletions

View File

@@ -49,10 +49,8 @@ public class FeedRefreshTaskGiver {
Date threshold = DateUtils.addMinutes(now, heavyLoad ? -10 : -1); Date threshold = DateUtils.addMinutes(now, heavyLoad ? -10 : -1);
if (feed.getLastUpdated() == null if (feed.getLastUpdated() == null
|| feed.getLastUpdated().before(threshold)) { || feed.getLastUpdated().before(threshold)) {
feed.setEtagHeader(null); addQueue.add(feed);
feed.setLastModifiedHeader(null);
} }
addQueue.add(feed);
} }
public synchronized Feed take() { public synchronized Feed take() {
@@ -77,10 +75,10 @@ public class FeedRefreshTaskGiver {
int size = addQueue.size(); int size = addQueue.size();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
feeds.add(addQueue.poll()); feeds.add(0, addQueue.poll());
} }
Map<Long, Feed> map = Maps.newHashMap(); Map<Long, Feed> map = Maps.newLinkedHashMap();
for (Feed f : feeds) { for (Feed f : feeds) {
f.setLastUpdated(now); f.setLastUpdated(now);
map.put(f.getId(), f); map.put(f.getId(), f);

View File

@@ -4,8 +4,7 @@ import java.util.Calendar;
import java.util.Collection; import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@@ -65,8 +64,8 @@ public class FeedRefreshUpdater {
FeedEntryDAO feedEntryDAO; FeedEntryDAO feedEntryDAO;
private ThreadPoolExecutor pool; private ThreadPoolExecutor pool;
private BlockingQueue<Runnable> queue;
private Striped<Lock> locks; private Striped<Lock> locks;
private LinkedBlockingDeque<Runnable> queue;
@PostConstruct @PostConstruct
public void init() { public void init() {
@@ -76,15 +75,20 @@ public class FeedRefreshUpdater {
locks = Striped.lazyWeakLock(threads * 100000); locks = Striped.lazyWeakLock(threads * 100000);
pool = new ThreadPoolExecutor(threads, threads, 0, pool = new ThreadPoolExecutor(threads, threads, 0,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
queue = new ArrayBlockingQueue<Runnable>(500 * threads)); queue = new LinkedBlockingDeque<Runnable>(500 * threads));
pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.debug("Thread queue full, waiting..."); log.debug("Thread queue full, waiting...");
try { try {
e.getQueue().put(r); Task task = (Task) r;
if (task.getFeed().isUrgent()) {
queue.putFirst(r);
} else {
queue.put(r);
}
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
log.error("Interrupted while waiting for queue.", e); log.error("Interrupted while waiting for queue.", e1);
} }
} }
}); });
@@ -136,6 +140,10 @@ public class FeedRefreshUpdater {
metricsBean.feedUpdated(); metricsBean.feedUpdated();
taskGiver.giveBack(feed); taskGiver.giveBack(feed);
} }
public Feed getFeed() {
return feed;
}
} }
private boolean updateEntry(final Feed feed, final FeedEntry entry, private boolean updateEntry(final Feed feed, final FeedEntry entry,

View File

@@ -12,6 +12,7 @@ import javax.persistence.OneToMany;
import javax.persistence.Table; import javax.persistence.Table;
import javax.persistence.Temporal; import javax.persistence.Temporal;
import javax.persistence.TemporalType; import javax.persistence.TemporalType;
import javax.persistence.Transient;
import org.hibernate.annotations.Cache; import org.hibernate.annotations.Cache;
import org.hibernate.annotations.CacheConcurrencyStrategy; import org.hibernate.annotations.CacheConcurrencyStrategy;
@@ -131,6 +132,13 @@ public class Feed extends AbstractModel {
@Temporal(TemporalType.TIMESTAMP) @Temporal(TemporalType.TIMESTAMP)
private Date pushLastPing; private Date pushLastPing;
/**
* Denotes a feed that needs to be refreshed before others. Currently used
* when a feed is queued manually for refresh. Not persisted.
*/
@Transient
private boolean urgent;
public Feed() { public Feed() {
} }
@@ -299,4 +307,12 @@ public class Feed extends AbstractModel {
this.pushTopicHash = pushTopicHash; this.pushTopicHash = pushTopicHash;
} }
public boolean isUrgent() {
return urgent;
}
public void setUrgent(boolean urgent) {
this.urgent = urgent;
}
} }

View File

@@ -209,20 +209,20 @@ public class FeedREST extends AbstractResourceREST {
@POST @POST
@ApiOperation(value = "Queue a feed for refresh", notes = "Manually add a feed to the refresh queue") @ApiOperation(value = "Queue a feed for refresh", notes = "Manually add a feed to the refresh queue")
public Response queueForRefresh(@ApiParam(value = "Feed id") IDRequest req) { public Response queueForRefresh(@ApiParam(value = "Feed id") IDRequest req) {
// TODO evaluate if this is needed
// Preconditions.checkNotNull(req);
// Preconditions.checkNotNull(req.getId());
//
// FeedSubscription sub = feedSubscriptionDAO.findById(getUser(),
// req.getId());
// if (sub != null) {
// taskGiver.add(sub.getFeed());
// return Response.ok(Status.OK).build();
// }
// return Response.ok(Status.NOT_FOUND).build();
return Response.ok("Disabled for now").build(); Preconditions.checkNotNull(req);
Preconditions.checkNotNull(req.getId());
FeedSubscription sub = feedSubscriptionDAO.findById(getUser(),
req.getId());
if (sub != null) {
Feed feed = sub.getFeed();
feed.setUrgent(true);
taskGiver.add(feed);
return Response.ok(Status.OK).build();
}
return Response.ok(Status.NOT_FOUND).build();
} }
@Path("/mark") @Path("/mark")