From e84a1289e3df39c65ca3a5b4206ae4085592a556 Mon Sep 17 00:00:00 2001 From: Athou Date: Mon, 22 May 2023 15:26:20 +0200 Subject: [PATCH] remove rxjava as its too magic and very hard to master --- commafeed-server/pom.xml | 6 - .../com/commafeed/backend/dao/FeedDAO.java | 4 + .../com/commafeed/backend/dao/GenericDAO.java | 6 + .../backend/feed/FeedRefreshEngine.java | 179 ++++++++++++++---- 4 files changed, 147 insertions(+), 48 deletions(-) diff --git a/commafeed-server/pom.xml b/commafeed-server/pom.xml index 4e77c0a1..5ac82d2a 100644 --- a/commafeed-server/pom.xml +++ b/commafeed-server/pom.xml @@ -294,12 +294,6 @@ 1.1.1 - - io.reactivex.rxjava3 - rxjava - 3.1.6 - - javax.xml.bind jaxb-api diff --git a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java index 4757cfb7..ca21ec31 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/dao/FeedDAO.java @@ -42,6 +42,10 @@ public class FeedDAO extends GenericDAO { return query.orderBy(feed.disabledUntil.asc()).limit(count).fetch(); } + public void setDisabledUntil(List feedIds, Date date) { + updateQuery(feed).set(feed.disabledUntil, date).where(feed.id.in(feedIds)).execute(); + } + public Feed findByUrl(String normalizedUrl) { List feeds = query().selectFrom(feed).where(feed.normalizedUrlHash.eq(DigestUtils.sha1Hex(normalizedUrl))).fetch(); Feed feed = Iterables.getFirst(feeds, null); diff --git a/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java b/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java index 4096dff6..4cd8608e 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/dao/GenericDAO.java @@ -6,8 +6,10 @@ import org.hibernate.SessionFactory; import org.hibernate.annotations.QueryHints; import com.commafeed.backend.model.AbstractModel; +import com.querydsl.core.types.EntityPath; import com.querydsl.jpa.impl.JPAQuery; import com.querydsl.jpa.impl.JPAQueryFactory; +import com.querydsl.jpa.impl.JPAUpdateClause; import io.dropwizard.hibernate.AbstractDAO; @@ -24,6 +26,10 @@ public abstract class GenericDAO extends AbstractDAO return factory; } + protected JPAUpdateClause updateQuery(EntityPath entityPath) { + return new JPAUpdateClause(currentSession(), entityPath); + } + public void saveOrUpdate(T model) { persist(model); } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java index 0a331468..8b01ecf7 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java @@ -2,7 +2,15 @@ package com.commafeed.backend.feed; import java.util.Date; import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.inject.Inject; import javax.inject.Singleton; @@ -15,13 +23,10 @@ import com.codahale.metrics.MetricRegistry; import com.commafeed.CommaFeedConfiguration; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.dao.UnitOfWork; +import com.commafeed.backend.model.AbstractModel; import com.commafeed.backend.model.Feed; import io.dropwizard.lifecycle.Managed; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.processors.PublishProcessor; -import io.reactivex.rxjava3.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -35,8 +40,13 @@ public class FeedRefreshEngine implements Managed { private final CommaFeedConfiguration config; private final Meter refill; - private final PublishProcessor priorityQueue; - private Disposable flow; + private final BlockingDeque queue; + + private final ExecutorService feedProcessingLoopExecutor; + private final ExecutorService refillLoopExecutor; + private final ExecutorService refillExecutor; + private final ExecutorService workerExecutor; + private final ExecutorService databaseUpdaterExecutor; @Inject public FeedRefreshEngine(SessionFactory sessionFactory, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater, @@ -47,54 +57,106 @@ public class FeedRefreshEngine implements Managed { this.updater = updater; this.config = config; this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill")); - this.priorityQueue = PublishProcessor.create(); + + this.queue = new LinkedBlockingDeque<>(); + + this.feedProcessingLoopExecutor = Executors.newSingleThreadExecutor(); + this.refillLoopExecutor = Executors.newSingleThreadExecutor(); + this.refillExecutor = newDiscardingSingleThreadExecutorService(); + this.workerExecutor = newBlockingExecutorService(config.getApplicationSettings().getBackgroundThreads()); + this.databaseUpdaterExecutor = newBlockingExecutorService(config.getApplicationSettings().getDatabaseUpdateThreads()); } @Override public void start() { - Flowable database = Flowable.fromCallable(() -> findNextUpdatableFeeds(getBatchSize(), getLastLoginThreshold())) - .onErrorResumeNext(e -> { - log.error("error while fetching next updatable feeds", e); - return Flowable.empty(); - }) - // repeat query 15s after the flowable has been emptied - // https://github.com/ReactiveX/RxJava/issues/448#issuecomment-233244964 - .repeatWhen(o -> o.concatMap(v -> Flowable.timer(15, TimeUnit.SECONDS))) - .flatMap(Flowable::fromIterable); - Flowable source = Flowable.merge(priorityQueue, database); + startFeedProcessingLoop(); + startRefillLoop(); + } - this.flow = source.subscribeOn(Schedulers.io()) - // feed fetching - .parallel(config.getApplicationSettings().getBackgroundThreads()) - .runOn(Schedulers.io()) - .flatMap(f -> Flowable.fromCallable(() -> worker.update(f)).onErrorResumeNext(e -> { - log.error("error while fetching feed", e); - return Flowable.empty(); - })) - .sequential() - // database updating - .parallel(config.getApplicationSettings().getDatabaseUpdateThreads()) - .runOn(Schedulers.io()) - .flatMap(fae -> Flowable.fromCallable(() -> updater.update(fae.getFeed(), fae.getEntries())).onErrorResumeNext(e -> { - log.error("error while updating database", e); - return Flowable.empty(); - })) - .sequential() - // end flow - .subscribe(); + private void startFeedProcessingLoop() { + // take a feed from the queue, process it, rince, repeat + feedProcessingLoopExecutor.submit(() -> { + while (!feedProcessingLoopExecutor.isShutdown()) { + try { + // take() is blocking until a feed is available from the queue + Feed feed = queue.take(); + + // send the feed to be processed + processFeedAsync(feed); + + // we removed a feed from the queue, try to refill it as it may now be empty + if (queue.isEmpty()) { + refillQueueAsync(); + } + } catch (InterruptedException e) { + log.debug("interrupted while waiting for a feed in the queue"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + }); + } + + private void startRefillLoop() { + // refill the queue at regular intervals if it's empty + refillLoopExecutor.submit(() -> { + while (!refillLoopExecutor.isShutdown()) { + try { + if (queue.isEmpty()) { + refillQueueAsync(); + } + + TimeUnit.SECONDS.sleep(15); + } catch (InterruptedException e) { + log.debug("interrupted while sleeping"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + }); } public void refreshImmediately(Feed feed) { - priorityQueue.onNext(feed); + queue.addFirst(feed); } - private List findNextUpdatableFeeds(int max, Date lastLoginThreshold) { - refill.mark(); - return UnitOfWork.call(sessionFactory, () -> feedDAO.findNextUpdatable(max, lastLoginThreshold)); + private void refillQueueAsync() { + CompletableFuture.runAsync(() -> { + if (queue.isEmpty()) { + refill.mark(); + queue.addAll(getNextUpdatableFeeds(getBatchSize())); + } + }, refillExecutor).whenComplete((data, ex) -> { + if (ex != null) { + log.error("error while refilling the queue", ex); + } + }); + } + + private void processFeedAsync(Feed feed) { + CompletableFuture.supplyAsync(() -> worker.update(feed), workerExecutor) + .thenApplyAsync(r -> updater.update(r.getFeed(), r.getEntries()), databaseUpdaterExecutor) + .whenComplete((data, ex) -> { + if (ex != null) { + log.error("error while processing feed {}", feed.getUrl(), ex); + } + }); + } + + private List getNextUpdatableFeeds(int max) { + return UnitOfWork.call(sessionFactory, () -> { + List feeds = feedDAO.findNextUpdatable(max, getLastLoginThreshold()); + // update disabledUntil to prevent feeds from being returned again by feedDAO.findNextUpdatable() + Date nextUpdateDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes()); + feedDAO.setDisabledUntil(feeds.stream().map(AbstractModel::getId).collect(Collectors.toList()), nextUpdateDate); + return feeds; + }); } private int getBatchSize() { - return Math.min(Flowable.bufferSize(), 3 * config.getApplicationSettings().getBackgroundThreads()); + return Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads()); } private Date getLastLoginThreshold() { @@ -103,6 +165,39 @@ public class FeedRefreshEngine implements Managed { @Override public void stop() { - flow.dispose(); + this.feedProcessingLoopExecutor.shutdownNow(); + this.refillLoopExecutor.shutdownNow(); + this.refillExecutor.shutdownNow(); + this.workerExecutor.shutdownNow(); + this.databaseUpdaterExecutor.shutdownNow(); + } + + /** + * returns an ExecutorService with a single thread that discards tasks if a task is already running + */ + private ExecutorService newDiscardingSingleThreadExecutorService() { + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); + pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + return pool; + } + + /** + * returns an ExecutorService that blocks submissions until a thread is available + */ + private ExecutorService newBlockingExecutorService(int threads) { + ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); + pool.setRejectedExecutionHandler((r, e) -> { + if (e.isShutdown()) { + return; + } + + try { + e.getQueue().put(r); + } catch (InterruptedException ex) { + log.debug("interrupted while waiting for a slot in the queue.", ex); + Thread.currentThread().interrupt(); + } + }); + return pool; } }