mirror of
https://github.com/Athou/commafeed.git
synced 2026-03-21 21:37:29 +00:00
better refresh algorithm using queues instead of synchronous database call
This commit is contained in:
@@ -5,28 +5,20 @@ import java.io.InputStreamReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.ejb.ConcurrencyManagement;
|
||||
import javax.ejb.ConcurrencyManagementType;
|
||||
import javax.ejb.Singleton;
|
||||
import javax.ejb.Startup;
|
||||
import javax.enterprise.inject.Instance;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.commafeed.backend.dao.FeedCategoryDAO;
|
||||
import com.commafeed.backend.dao.FeedDAO;
|
||||
import com.commafeed.backend.dao.FeedSubscriptionDAO;
|
||||
import com.commafeed.backend.dao.UserDAO;
|
||||
import com.commafeed.backend.feeds.FeedRefreshWorker;
|
||||
import com.commafeed.backend.feeds.FeedRefreshTaskGiver;
|
||||
import com.commafeed.backend.model.ApplicationSettings;
|
||||
import com.commafeed.backend.model.UserRole.Role;
|
||||
import com.commafeed.backend.services.ApplicationSettingsService;
|
||||
@@ -45,15 +37,6 @@ public class StartupBean {
|
||||
@Inject
|
||||
DatabaseUpdater databaseUpdater;
|
||||
|
||||
@Inject
|
||||
FeedDAO feedDAO;
|
||||
|
||||
@Inject
|
||||
FeedCategoryDAO feedCategoryDAO;
|
||||
|
||||
@Inject
|
||||
FeedSubscriptionDAO feedSubscriptionDAO;
|
||||
|
||||
@Inject
|
||||
UserDAO userDAO;
|
||||
|
||||
@@ -61,17 +44,14 @@ public class StartupBean {
|
||||
UserService userService;
|
||||
|
||||
@Inject
|
||||
ApplicationSettingsService applicationSettingsService;
|
||||
FeedRefreshTaskGiver taskGiver;
|
||||
|
||||
@Inject
|
||||
Instance<FeedRefreshWorker> workers;
|
||||
ApplicationSettingsService applicationSettingsService;
|
||||
|
||||
private long startupTime;
|
||||
private Map<String, String> supportedLanguages = Maps.newHashMap();
|
||||
|
||||
private ExecutorService executor;
|
||||
private MutableBoolean running = new MutableBoolean(true);
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
|
||||
@@ -84,23 +64,7 @@ public class StartupBean {
|
||||
applicationSettingsService.applyLogLevel();
|
||||
|
||||
initSupportedLanguages();
|
||||
|
||||
ApplicationSettings settings = applicationSettingsService.get();
|
||||
int threads = settings.getBackgroundThreads();
|
||||
log.info("Starting {} background threads", threads);
|
||||
|
||||
executor = Executors.newFixedThreadPool(Math.max(threads, 1));
|
||||
for (int i = 0; i < threads; i++) {
|
||||
final int threadId = i;
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
FeedRefreshWorker worker = workers.get();
|
||||
worker.start(running, "Thread " + threadId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
taskGiver.start();
|
||||
}
|
||||
|
||||
private void initSupportedLanguages() {
|
||||
@@ -145,17 +109,4 @@ public class StartupBean {
|
||||
public Map<String, String> getSupportedLanguages() {
|
||||
return supportedLanguages;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
running.setValue(false);
|
||||
executor.shutdownNow();
|
||||
while (!executor.isTerminated()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("interrupted while waiting for threads to finish.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import javax.persistence.criteria.SetJoin;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedSubscription;
|
||||
@@ -25,42 +24,41 @@ import com.google.common.collect.Lists;
|
||||
@Stateless
|
||||
public class FeedDAO extends GenericDAO<Feed> {
|
||||
|
||||
private List<Predicate> getUpdatablePredicates(Root<Feed> root) {
|
||||
Date now = new Date();
|
||||
private List<Predicate> getUpdatablePredicates(Root<Feed> root, Date threshold) {
|
||||
|
||||
Predicate hasSubscriptions = builder.isNotEmpty(root
|
||||
.get(Feed_.subscriptions));
|
||||
|
||||
Predicate neverUpdated = builder.isNull(root.get(Feed_.lastUpdated));
|
||||
Predicate updatedBeforeThreshold = builder.lessThan(
|
||||
root.get(Feed_.lastUpdated), DateUtils.addMinutes(now, -10));
|
||||
root.get(Feed_.lastUpdated), threshold);
|
||||
|
||||
Predicate disabledDateIsNull = builder.isNull(root
|
||||
.get(Feed_.disabledUntil));
|
||||
Predicate disabledDateIsInPast = builder.lessThan(
|
||||
root.get(Feed_.disabledUntil), now);
|
||||
root.get(Feed_.disabledUntil), new Date());
|
||||
|
||||
return Lists.newArrayList(hasSubscriptions,
|
||||
builder.or(neverUpdated, updatedBeforeThreshold),
|
||||
builder.or(disabledDateIsNull, disabledDateIsInPast));
|
||||
}
|
||||
|
||||
public Long getUpdatableCount() {
|
||||
public Long getUpdatableCount(Date threshold) {
|
||||
CriteriaQuery<Long> query = builder.createQuery(Long.class);
|
||||
Root<Feed> root = query.from(getType());
|
||||
|
||||
query.select(builder.count(root));
|
||||
query.where(getUpdatablePredicates(root).toArray(new Predicate[0]));
|
||||
query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0]));
|
||||
|
||||
TypedQuery<Long> q = em.createQuery(query);
|
||||
return q.getSingleResult();
|
||||
}
|
||||
|
||||
public List<Feed> findNextUpdatable(int count) {
|
||||
public List<Feed> findNextUpdatable(int count, Date threshold) {
|
||||
CriteriaQuery<Feed> query = builder.createQuery(getType());
|
||||
Root<Feed> root = query.from(getType());
|
||||
|
||||
query.where(getUpdatablePredicates(root).toArray(new Predicate[0]));
|
||||
query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0]));
|
||||
|
||||
query.orderBy(builder.asc(root.get(Feed_.lastUpdated)));
|
||||
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
package com.commafeed.backend.feeds;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class FeedRefreshExecutor {
|
||||
|
||||
private static Logger log = LoggerFactory
|
||||
.getLogger(FeedRefreshExecutor.class);
|
||||
|
||||
private String poolName;
|
||||
private ThreadPoolExecutor pool;
|
||||
private LinkedBlockingDeque<Runnable> queue;
|
||||
|
||||
public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity) {
|
||||
this.poolName = poolName;
|
||||
pool = new ThreadPoolExecutor(threads, threads, 0,
|
||||
TimeUnit.MILLISECONDS,
|
||||
queue = new LinkedBlockingDeque<Runnable>(queueCapacity) {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public boolean offer(Runnable r) {
|
||||
Task task = (Task) r;
|
||||
if (task.isUrgent()) {
|
||||
return offerFirst(r);
|
||||
} else {
|
||||
return offerLast(r);
|
||||
}
|
||||
}
|
||||
});
|
||||
pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
||||
log.debug("{} thread queue full, waiting...", poolName);
|
||||
try {
|
||||
Task task = (Task) r;
|
||||
if (task.isUrgent()) {
|
||||
queue.putFirst(r);
|
||||
} else {
|
||||
queue.put(r);
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
log.error(poolName + " interrupted while waiting for queue.", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void execute(Task task) {
|
||||
pool.execute(task);
|
||||
}
|
||||
|
||||
public int getQueueSize() {
|
||||
return queue.size();
|
||||
}
|
||||
|
||||
public static interface Task extends Runnable {
|
||||
boolean isUrgent();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
pool.shutdownNow();
|
||||
while (!pool.isTerminated()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("{} interrupted while waiting for threads to finish.", poolName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,12 +4,15 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
import org.apache.commons.lang.time.DateUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -20,7 +23,7 @@ import com.commafeed.backend.services.ApplicationSettingsService;
|
||||
import com.google.api.client.util.Maps;
|
||||
import com.google.common.collect.Queues;
|
||||
|
||||
@Singleton
|
||||
@ApplicationScoped
|
||||
public class FeedRefreshTaskGiver {
|
||||
|
||||
protected static final Logger log = LoggerFactory.getLogger(FeedRefreshTaskGiver.class);
|
||||
@@ -34,47 +37,103 @@ public class FeedRefreshTaskGiver {
|
||||
@Inject
|
||||
MetricsBean metricsBean;
|
||||
|
||||
@Inject
|
||||
FeedRefreshWorker worker;
|
||||
|
||||
private int backgroundThreads;
|
||||
|
||||
private Queue<Feed> addQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<Feed> takeQueue = Queues.newConcurrentLinkedQueue();
|
||||
private Queue<Feed> giveBackQueue = Queues.newConcurrentLinkedQueue();
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
backgroundThreads = applicationSettingsService.get()
|
||||
.getBackgroundThreads();
|
||||
executor = Executors.newFixedThreadPool(1);
|
||||
}
|
||||
|
||||
public void add(Feed feed) {
|
||||
Date now = new Date();
|
||||
boolean heavyLoad = applicationSettingsService.get().isHeavyLoad();
|
||||
Date threshold = DateUtils.addMinutes(now, heavyLoad ? -10 : -1);
|
||||
if (feed.getLastUpdated() == null
|
||||
|| feed.getLastUpdated().before(threshold)) {
|
||||
addQueue.add(feed);
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
executor.shutdownNow();
|
||||
while (!executor.isTerminated()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("interrupted while waiting for threads to finish.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Feed take() {
|
||||
public void start() {
|
||||
try {
|
||||
// sleeping for a little while, let everything settle
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("interrupted while sleeping");
|
||||
}
|
||||
log.info("starting feed refresh task giver");
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!executor.isShutdown()) {
|
||||
try {
|
||||
Feed feed = take();
|
||||
if (feed != null) {
|
||||
metricsBean.feedRefreshed();
|
||||
worker.updateFeed(feed);
|
||||
} else {
|
||||
log.debug("nothing to do, sleeping for 15s");
|
||||
try {
|
||||
Thread.sleep(15000);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("interrupted while sleeping");
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Feed take() {
|
||||
Feed feed = takeQueue.poll();
|
||||
|
||||
if (feed == null) {
|
||||
refill();
|
||||
feed = takeQueue.poll();
|
||||
}
|
||||
|
||||
if (feed != null) {
|
||||
metricsBean.feedRefreshed();
|
||||
}
|
||||
return feed;
|
||||
}
|
||||
|
||||
public Long getUpdatableCount() {
|
||||
return feedDAO.getUpdatableCount(getThreshold());
|
||||
}
|
||||
|
||||
private Date getThreshold() {
|
||||
boolean heavyLoad = applicationSettingsService.get().isHeavyLoad();
|
||||
Date threshold = DateUtils.addMinutes(new Date(), heavyLoad ? -15 : -5);
|
||||
return threshold;
|
||||
}
|
||||
|
||||
public void add(Feed feed) {
|
||||
Date threshold = getThreshold();
|
||||
if (feed.getLastUpdated() == null
|
||||
|| feed.getLastUpdated().before(threshold)) {
|
||||
addQueue.add(feed);
|
||||
}
|
||||
}
|
||||
|
||||
private void refill() {
|
||||
Date now = new Date();
|
||||
|
||||
int count = 3 * backgroundThreads;
|
||||
List<Feed> feeds = feedDAO.findNextUpdatable(count);
|
||||
List<Feed> feeds = feedDAO.findNextUpdatable(count, getThreshold());
|
||||
|
||||
int size = addQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
||||
@@ -3,16 +3,13 @@ package com.commafeed.backend.feeds;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang3.time.DateUtils;
|
||||
@@ -24,6 +21,7 @@ import com.commafeed.backend.cache.CacheService;
|
||||
import com.commafeed.backend.dao.FeedDAO;
|
||||
import com.commafeed.backend.dao.FeedEntryDAO;
|
||||
import com.commafeed.backend.dao.FeedSubscriptionDAO;
|
||||
import com.commafeed.backend.feeds.FeedRefreshExecutor.Task;
|
||||
import com.commafeed.backend.model.ApplicationSettings;
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
@@ -34,7 +32,7 @@ import com.commafeed.backend.services.FeedUpdateService;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.util.concurrent.Striped;
|
||||
|
||||
@Singleton
|
||||
@ApplicationScoped
|
||||
public class FeedRefreshUpdater {
|
||||
|
||||
protected static Logger log = LoggerFactory
|
||||
@@ -67,71 +65,33 @@ public class FeedRefreshUpdater {
|
||||
@Inject
|
||||
CacheService cache;
|
||||
|
||||
private ThreadPoolExecutor pool;
|
||||
private FeedRefreshExecutor pool;
|
||||
private Striped<Lock> locks;
|
||||
private LinkedBlockingDeque<Runnable> queue;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
ApplicationSettings settings = applicationSettingsService.get();
|
||||
int threads = Math.max(settings.getDatabaseUpdateThreads(), 1);
|
||||
log.info("Creating database pool with {} threads", threads);
|
||||
pool = new FeedRefreshExecutor("FeedRefreshUpdater", threads, 500 * threads);
|
||||
locks = Striped.lazyWeakLock(threads * 100000);
|
||||
pool = new ThreadPoolExecutor(threads, threads, 0,
|
||||
TimeUnit.MILLISECONDS,
|
||||
queue = new LinkedBlockingDeque<Runnable>(500 * threads) {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@Override
|
||||
public boolean offer(Runnable r) {
|
||||
Task task = (Task) r;
|
||||
if (task.getFeed().isUrgent()) {
|
||||
return offerFirst(r);
|
||||
} else {
|
||||
return offerLast(r);
|
||||
}
|
||||
}
|
||||
});
|
||||
pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
|
||||
@Override
|
||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
||||
log.debug("Thread queue full, waiting...");
|
||||
try {
|
||||
Task task = (Task) r;
|
||||
if (task.getFeed().isUrgent()) {
|
||||
queue.putFirst(r);
|
||||
} else {
|
||||
queue.put(r);
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
log.error("Interrupted while waiting for queue.", e1);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
pool.shutdownNow();
|
||||
while (!pool.isTerminated()) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("interrupted while waiting for threads to finish.");
|
||||
}
|
||||
}
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
public void updateFeed(Feed feed, Collection<FeedEntry> entries) {
|
||||
pool.execute(new Task(feed, entries));
|
||||
pool.execute(new EntryTask(feed, entries));
|
||||
}
|
||||
|
||||
private class Task implements Runnable {
|
||||
private class EntryTask implements Task {
|
||||
|
||||
private Feed feed;
|
||||
private Collection<FeedEntry> entries;
|
||||
|
||||
public Task(Feed feed, Collection<FeedEntry> entries) {
|
||||
public EntryTask(Feed feed, Collection<FeedEntry> entries) {
|
||||
this.feed = feed;
|
||||
this.entries = entries;
|
||||
}
|
||||
@@ -174,8 +134,9 @@ public class FeedRefreshUpdater {
|
||||
taskGiver.giveBack(feed);
|
||||
}
|
||||
|
||||
public Feed getFeed() {
|
||||
return feed;
|
||||
@Override
|
||||
public boolean isUrgent() {
|
||||
return feed.isUrgent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,7 +182,7 @@ public class FeedRefreshUpdater {
|
||||
}
|
||||
|
||||
public int getQueueSize() {
|
||||
return queue.size();
|
||||
return pool.getQueueSize();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,21 +3,26 @@ package com.commafeed.backend.feeds;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.enterprise.context.ApplicationScoped;
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.commafeed.backend.HttpGetter.NotModifiedException;
|
||||
import com.commafeed.backend.MetricsBean;
|
||||
import com.commafeed.backend.dao.FeedEntryDAO;
|
||||
import com.commafeed.backend.feeds.FeedRefreshExecutor.Task;
|
||||
import com.commafeed.backend.model.ApplicationSettings;
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
import com.commafeed.backend.services.ApplicationSettingsService;
|
||||
import com.sun.syndication.io.FeedException;
|
||||
|
||||
@ApplicationScoped
|
||||
public class FeedRefreshWorker {
|
||||
|
||||
private static Logger log = LoggerFactory
|
||||
@@ -41,40 +46,45 @@ public class FeedRefreshWorker {
|
||||
@Inject
|
||||
FeedEntryDAO feedEntryDAO;
|
||||
|
||||
public void start(MutableBoolean running, String threadName) {
|
||||
log.info("{} starting", threadName);
|
||||
private FeedRefreshExecutor pool;
|
||||
|
||||
try {
|
||||
// sleeping before starting, let everything settle
|
||||
Thread.sleep(5000);
|
||||
} catch (InterruptedException e) {
|
||||
log.error(threadName + e.getMessage(), e);
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
ApplicationSettings settings = applicationSettingsService.get();
|
||||
int threads = settings.getBackgroundThreads();
|
||||
log.info("Creating refresh worker pool with {} threads", threads);
|
||||
pool = new FeedRefreshExecutor("FeedRefreshUpdater", threads, 20 * threads);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
pool.shutdown();
|
||||
}
|
||||
|
||||
public void updateFeed(Feed feed) {
|
||||
pool.execute(new FeedTask(feed));
|
||||
}
|
||||
|
||||
public int getQueueSize(){
|
||||
return pool.getQueueSize();
|
||||
}
|
||||
|
||||
private class FeedTask implements Task {
|
||||
|
||||
private Feed feed;
|
||||
|
||||
public FeedTask(Feed feed) {
|
||||
this.feed = feed;
|
||||
}
|
||||
|
||||
while (running.isTrue()) {
|
||||
Feed feed = null;
|
||||
try {
|
||||
feed = getNextFeed();
|
||||
if (feed != null) {
|
||||
log.debug("refreshing " + feed.getUrl());
|
||||
update(feed);
|
||||
} else {
|
||||
log.debug("sleeping");
|
||||
metricsBean.threadWaited();
|
||||
Thread.sleep(15000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
log.info(threadName + " interrupted");
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
String feedUrl = "feed is null";
|
||||
if (feed != null) {
|
||||
feedUrl = feed.getUrl();
|
||||
}
|
||||
log.error(
|
||||
threadName + " (" + feedUrl + ") : " + e.getMessage(),
|
||||
e);
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
update(feed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isUrgent() {
|
||||
return feed.isUrgent();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -167,9 +177,4 @@ public class FeedRefreshWorker {
|
||||
feed.setPushTopicHash(DigestUtils.sha1Hex(topic));
|
||||
}
|
||||
}
|
||||
|
||||
private Feed getNextFeed() {
|
||||
return taskGiver.take();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ import com.commafeed.backend.feeds.FaviconFetcher;
|
||||
import com.commafeed.backend.feeds.FeedFetcher;
|
||||
import com.commafeed.backend.feeds.FeedRefreshTaskGiver;
|
||||
import com.commafeed.backend.feeds.FeedRefreshUpdater;
|
||||
import com.commafeed.backend.feeds.FeedRefreshWorker;
|
||||
import com.commafeed.backend.feeds.OPMLExporter;
|
||||
import com.commafeed.backend.feeds.OPMLImporter;
|
||||
import com.commafeed.backend.model.User;
|
||||
@@ -120,6 +121,9 @@ public abstract class AbstractREST {
|
||||
@Inject
|
||||
FeedRefreshTaskGiver taskGiver;
|
||||
|
||||
@Inject
|
||||
FeedRefreshWorker feedRefreshWorker;
|
||||
|
||||
@Inject
|
||||
FeedRefreshUpdater feedRefreshUpdater;
|
||||
|
||||
|
||||
@@ -185,9 +185,10 @@ public class AdminREST extends AbstractResourceREST {
|
||||
map.put("lastMinute", metricsBean.getLastMinute());
|
||||
map.put("lastHour", metricsBean.getLastHour());
|
||||
if (backlog) {
|
||||
map.put("backlog", feedDAO.getUpdatableCount());
|
||||
map.put("backlog", taskGiver.getUpdatableCount());
|
||||
}
|
||||
map.put("queue", feedRefreshUpdater.getQueueSize());
|
||||
map.put("http_queue", feedRefreshWorker.getQueueSize());
|
||||
map.put("database_queue", feedRefreshUpdater.getQueueSize());
|
||||
map.put("cache", metricsBean.getCacheStats());
|
||||
|
||||
return Response.ok(map).build();
|
||||
|
||||
Reference in New Issue
Block a user