diff --git a/commafeed-server/pom.xml b/commafeed-server/pom.xml
index 4e77c0a1..5ac82d2a 100644
--- a/commafeed-server/pom.xml
+++ b/commafeed-server/pom.xml
@@ -294,12 +294,6 @@
1.1.1
-
- io.reactivex.rxjava3
- rxjava
- 3.1.6
-
-
javax.xml.bind
jaxb-api
diff --git a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java
index 4757cfb7..ca21ec31 100644
--- a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java
+++ b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java
@@ -42,6 +42,10 @@ public class FeedDAO extends GenericDAO {
return query.orderBy(feed.disabledUntil.asc()).limit(count).fetch();
}
+ public void setDisabledUntil(List feedIds, Date date) {
+ updateQuery(feed).set(feed.disabledUntil, date).where(feed.id.in(feedIds)).execute();
+ }
+
public Feed findByUrl(String normalizedUrl) {
List feeds = query().selectFrom(feed).where(feed.normalizedUrlHash.eq(DigestUtils.sha1Hex(normalizedUrl))).fetch();
Feed feed = Iterables.getFirst(feeds, null);
diff --git a/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java b/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java
index 4096dff6..4cd8608e 100644
--- a/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java
+++ b/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java
@@ -6,8 +6,10 @@ import org.hibernate.SessionFactory;
import org.hibernate.annotations.QueryHints;
import com.commafeed.backend.model.AbstractModel;
+import com.querydsl.core.types.EntityPath;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
+import com.querydsl.jpa.impl.JPAUpdateClause;
import io.dropwizard.hibernate.AbstractDAO;
@@ -24,6 +26,10 @@ public abstract class GenericDAO extends AbstractDAO
return factory;
}
+ protected JPAUpdateClause updateQuery(EntityPath entityPath) {
+ return new JPAUpdateClause(currentSession(), entityPath);
+ }
+
public void saveOrUpdate(T model) {
persist(model);
}
diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java
index 0a331468..8b01ecf7 100644
--- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java
+++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java
@@ -2,7 +2,15 @@ package com.commafeed.backend.feed;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -15,13 +23,10 @@ import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.UnitOfWork;
+import com.commafeed.backend.model.AbstractModel;
import com.commafeed.backend.model.Feed;
import io.dropwizard.lifecycle.Managed;
-import io.reactivex.rxjava3.core.Flowable;
-import io.reactivex.rxjava3.disposables.Disposable;
-import io.reactivex.rxjava3.processors.PublishProcessor;
-import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -35,8 +40,13 @@ public class FeedRefreshEngine implements Managed {
private final CommaFeedConfiguration config;
private final Meter refill;
- private final PublishProcessor priorityQueue;
- private Disposable flow;
+ private final BlockingDeque queue;
+
+ private final ExecutorService feedProcessingLoopExecutor;
+ private final ExecutorService refillLoopExecutor;
+ private final ExecutorService refillExecutor;
+ private final ExecutorService workerExecutor;
+ private final ExecutorService databaseUpdaterExecutor;
@Inject
public FeedRefreshEngine(SessionFactory sessionFactory, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater,
@@ -47,54 +57,106 @@ public class FeedRefreshEngine implements Managed {
this.updater = updater;
this.config = config;
this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill"));
- this.priorityQueue = PublishProcessor.create();
+
+ this.queue = new LinkedBlockingDeque<>();
+
+ this.feedProcessingLoopExecutor = Executors.newSingleThreadExecutor();
+ this.refillLoopExecutor = Executors.newSingleThreadExecutor();
+ this.refillExecutor = newDiscardingSingleThreadExecutorService();
+ this.workerExecutor = newBlockingExecutorService(config.getApplicationSettings().getBackgroundThreads());
+ this.databaseUpdaterExecutor = newBlockingExecutorService(config.getApplicationSettings().getDatabaseUpdateThreads());
}
@Override
public void start() {
- Flowable database = Flowable.fromCallable(() -> findNextUpdatableFeeds(getBatchSize(), getLastLoginThreshold()))
- .onErrorResumeNext(e -> {
- log.error("error while fetching next updatable feeds", e);
- return Flowable.empty();
- })
- // repeat query 15s after the flowable has been emptied
- // https://github.com/ReactiveX/RxJava/issues/448#issuecomment-233244964
- .repeatWhen(o -> o.concatMap(v -> Flowable.timer(15, TimeUnit.SECONDS)))
- .flatMap(Flowable::fromIterable);
- Flowable source = Flowable.merge(priorityQueue, database);
+ startFeedProcessingLoop();
+ startRefillLoop();
+ }
- this.flow = source.subscribeOn(Schedulers.io())
- // feed fetching
- .parallel(config.getApplicationSettings().getBackgroundThreads())
- .runOn(Schedulers.io())
- .flatMap(f -> Flowable.fromCallable(() -> worker.update(f)).onErrorResumeNext(e -> {
- log.error("error while fetching feed", e);
- return Flowable.empty();
- }))
- .sequential()
- // database updating
- .parallel(config.getApplicationSettings().getDatabaseUpdateThreads())
- .runOn(Schedulers.io())
- .flatMap(fae -> Flowable.fromCallable(() -> updater.update(fae.getFeed(), fae.getEntries())).onErrorResumeNext(e -> {
- log.error("error while updating database", e);
- return Flowable.empty();
- }))
- .sequential()
- // end flow
- .subscribe();
+ private void startFeedProcessingLoop() {
+ // take a feed from the queue, process it, rince, repeat
+ feedProcessingLoopExecutor.submit(() -> {
+ while (!feedProcessingLoopExecutor.isShutdown()) {
+ try {
+ // take() is blocking until a feed is available from the queue
+ Feed feed = queue.take();
+
+ // send the feed to be processed
+ processFeedAsync(feed);
+
+ // we removed a feed from the queue, try to refill it as it may now be empty
+ if (queue.isEmpty()) {
+ refillQueueAsync();
+ }
+ } catch (InterruptedException e) {
+ log.debug("interrupted while waiting for a feed in the queue");
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
+ }
+
+ private void startRefillLoop() {
+ // refill the queue at regular intervals if it's empty
+ refillLoopExecutor.submit(() -> {
+ while (!refillLoopExecutor.isShutdown()) {
+ try {
+ if (queue.isEmpty()) {
+ refillQueueAsync();
+ }
+
+ TimeUnit.SECONDS.sleep(15);
+ } catch (InterruptedException e) {
+ log.debug("interrupted while sleeping");
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
}
public void refreshImmediately(Feed feed) {
- priorityQueue.onNext(feed);
+ queue.addFirst(feed);
}
- private List findNextUpdatableFeeds(int max, Date lastLoginThreshold) {
- refill.mark();
- return UnitOfWork.call(sessionFactory, () -> feedDAO.findNextUpdatable(max, lastLoginThreshold));
+ private void refillQueueAsync() {
+ CompletableFuture.runAsync(() -> {
+ if (queue.isEmpty()) {
+ refill.mark();
+ queue.addAll(getNextUpdatableFeeds(getBatchSize()));
+ }
+ }, refillExecutor).whenComplete((data, ex) -> {
+ if (ex != null) {
+ log.error("error while refilling the queue", ex);
+ }
+ });
+ }
+
+ private void processFeedAsync(Feed feed) {
+ CompletableFuture.supplyAsync(() -> worker.update(feed), workerExecutor)
+ .thenApplyAsync(r -> updater.update(r.getFeed(), r.getEntries()), databaseUpdaterExecutor)
+ .whenComplete((data, ex) -> {
+ if (ex != null) {
+ log.error("error while processing feed {}", feed.getUrl(), ex);
+ }
+ });
+ }
+
+ private List getNextUpdatableFeeds(int max) {
+ return UnitOfWork.call(sessionFactory, () -> {
+ List feeds = feedDAO.findNextUpdatable(max, getLastLoginThreshold());
+ // update disabledUntil to prevent feeds from being returned again by feedDAO.findNextUpdatable()
+ Date nextUpdateDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes());
+ feedDAO.setDisabledUntil(feeds.stream().map(AbstractModel::getId).collect(Collectors.toList()), nextUpdateDate);
+ return feeds;
+ });
}
private int getBatchSize() {
- return Math.min(Flowable.bufferSize(), 3 * config.getApplicationSettings().getBackgroundThreads());
+ return Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads());
}
private Date getLastLoginThreshold() {
@@ -103,6 +165,39 @@ public class FeedRefreshEngine implements Managed {
@Override
public void stop() {
- flow.dispose();
+ this.feedProcessingLoopExecutor.shutdownNow();
+ this.refillLoopExecutor.shutdownNow();
+ this.refillExecutor.shutdownNow();
+ this.workerExecutor.shutdownNow();
+ this.databaseUpdaterExecutor.shutdownNow();
+ }
+
+ /**
+ * returns an ExecutorService with a single thread that discards tasks if a task is already running
+ */
+ private ExecutorService newDiscardingSingleThreadExecutorService() {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
+ pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ return pool;
+ }
+
+ /**
+ * returns an ExecutorService that blocks submissions until a thread is available
+ */
+ private ExecutorService newBlockingExecutorService(int threads) {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
+ pool.setRejectedExecutionHandler((r, e) -> {
+ if (e.isShutdown()) {
+ return;
+ }
+
+ try {
+ e.getQueue().put(r);
+ } catch (InterruptedException ex) {
+ log.debug("interrupted while waiting for a slot in the queue.", ex);
+ Thread.currentThread().interrupt();
+ }
+ });
+ return pool;
}
}