remove rxjava as its too magic and very hard to master

This commit is contained in:
Athou
2023-05-22 15:26:20 +02:00
parent 85ebcd6407
commit e84a1289e3
4 changed files with 147 additions and 48 deletions

View File

@@ -294,12 +294,6 @@
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.6</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>

View File

@@ -42,6 +42,10 @@ public class FeedDAO extends GenericDAO<Feed> {
return query.orderBy(feed.disabledUntil.asc()).limit(count).fetch();
}
public void setDisabledUntil(List<Long> feedIds, Date date) {
updateQuery(feed).set(feed.disabledUntil, date).where(feed.id.in(feedIds)).execute();
}
public Feed findByUrl(String normalizedUrl) {
List<Feed> feeds = query().selectFrom(feed).where(feed.normalizedUrlHash.eq(DigestUtils.sha1Hex(normalizedUrl))).fetch();
Feed feed = Iterables.getFirst(feeds, null);

View File

@@ -6,8 +6,10 @@ import org.hibernate.SessionFactory;
import org.hibernate.annotations.QueryHints;
import com.commafeed.backend.model.AbstractModel;
import com.querydsl.core.types.EntityPath;
import com.querydsl.jpa.impl.JPAQuery;
import com.querydsl.jpa.impl.JPAQueryFactory;
import com.querydsl.jpa.impl.JPAUpdateClause;
import io.dropwizard.hibernate.AbstractDAO;
@@ -24,6 +26,10 @@ public abstract class GenericDAO<T extends AbstractModel> extends AbstractDAO<T>
return factory;
}
protected JPAUpdateClause updateQuery(EntityPath<T> entityPath) {
return new JPAUpdateClause(currentSession(), entityPath);
}
public void saveOrUpdate(T model) {
persist(model);
}

View File

@@ -2,7 +2,15 @@ package com.commafeed.backend.feed;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -15,13 +23,10 @@ import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.model.AbstractModel;
import com.commafeed.backend.model.Feed;
import io.dropwizard.lifecycle.Managed;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -35,8 +40,13 @@ public class FeedRefreshEngine implements Managed {
private final CommaFeedConfiguration config;
private final Meter refill;
private final PublishProcessor<Feed> priorityQueue;
private Disposable flow;
private final BlockingDeque<Feed> queue;
private final ExecutorService feedProcessingLoopExecutor;
private final ExecutorService refillLoopExecutor;
private final ExecutorService refillExecutor;
private final ExecutorService workerExecutor;
private final ExecutorService databaseUpdaterExecutor;
@Inject
public FeedRefreshEngine(SessionFactory sessionFactory, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater,
@@ -47,54 +57,106 @@ public class FeedRefreshEngine implements Managed {
this.updater = updater;
this.config = config;
this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill"));
this.priorityQueue = PublishProcessor.create();
this.queue = new LinkedBlockingDeque<>();
this.feedProcessingLoopExecutor = Executors.newSingleThreadExecutor();
this.refillLoopExecutor = Executors.newSingleThreadExecutor();
this.refillExecutor = newDiscardingSingleThreadExecutorService();
this.workerExecutor = newBlockingExecutorService(config.getApplicationSettings().getBackgroundThreads());
this.databaseUpdaterExecutor = newBlockingExecutorService(config.getApplicationSettings().getDatabaseUpdateThreads());
}
@Override
public void start() {
Flowable<Feed> database = Flowable.fromCallable(() -> findNextUpdatableFeeds(getBatchSize(), getLastLoginThreshold()))
.onErrorResumeNext(e -> {
log.error("error while fetching next updatable feeds", e);
return Flowable.empty();
})
// repeat query 15s after the flowable has been emptied
// https://github.com/ReactiveX/RxJava/issues/448#issuecomment-233244964
.repeatWhen(o -> o.concatMap(v -> Flowable.timer(15, TimeUnit.SECONDS)))
.flatMap(Flowable::fromIterable);
Flowable<Feed> source = Flowable.merge(priorityQueue, database);
startFeedProcessingLoop();
startRefillLoop();
}
this.flow = source.subscribeOn(Schedulers.io())
// feed fetching
.parallel(config.getApplicationSettings().getBackgroundThreads())
.runOn(Schedulers.io())
.flatMap(f -> Flowable.fromCallable(() -> worker.update(f)).onErrorResumeNext(e -> {
log.error("error while fetching feed", e);
return Flowable.empty();
}))
.sequential()
// database updating
.parallel(config.getApplicationSettings().getDatabaseUpdateThreads())
.runOn(Schedulers.io())
.flatMap(fae -> Flowable.fromCallable(() -> updater.update(fae.getFeed(), fae.getEntries())).onErrorResumeNext(e -> {
log.error("error while updating database", e);
return Flowable.empty();
}))
.sequential()
// end flow
.subscribe();
private void startFeedProcessingLoop() {
// take a feed from the queue, process it, rince, repeat
feedProcessingLoopExecutor.submit(() -> {
while (!feedProcessingLoopExecutor.isShutdown()) {
try {
// take() is blocking until a feed is available from the queue
Feed feed = queue.take();
// send the feed to be processed
processFeedAsync(feed);
// we removed a feed from the queue, try to refill it as it may now be empty
if (queue.isEmpty()) {
refillQueueAsync();
}
} catch (InterruptedException e) {
log.debug("interrupted while waiting for a feed in the queue");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
private void startRefillLoop() {
// refill the queue at regular intervals if it's empty
refillLoopExecutor.submit(() -> {
while (!refillLoopExecutor.isShutdown()) {
try {
if (queue.isEmpty()) {
refillQueueAsync();
}
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
log.debug("interrupted while sleeping");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
});
}
public void refreshImmediately(Feed feed) {
priorityQueue.onNext(feed);
queue.addFirst(feed);
}
private List<Feed> findNextUpdatableFeeds(int max, Date lastLoginThreshold) {
refill.mark();
return UnitOfWork.call(sessionFactory, () -> feedDAO.findNextUpdatable(max, lastLoginThreshold));
private void refillQueueAsync() {
CompletableFuture.runAsync(() -> {
if (queue.isEmpty()) {
refill.mark();
queue.addAll(getNextUpdatableFeeds(getBatchSize()));
}
}, refillExecutor).whenComplete((data, ex) -> {
if (ex != null) {
log.error("error while refilling the queue", ex);
}
});
}
private void processFeedAsync(Feed feed) {
CompletableFuture.supplyAsync(() -> worker.update(feed), workerExecutor)
.thenApplyAsync(r -> updater.update(r.getFeed(), r.getEntries()), databaseUpdaterExecutor)
.whenComplete((data, ex) -> {
if (ex != null) {
log.error("error while processing feed {}", feed.getUrl(), ex);
}
});
}
private List<Feed> getNextUpdatableFeeds(int max) {
return UnitOfWork.call(sessionFactory, () -> {
List<Feed> feeds = feedDAO.findNextUpdatable(max, getLastLoginThreshold());
// update disabledUntil to prevent feeds from being returned again by feedDAO.findNextUpdatable()
Date nextUpdateDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes());
feedDAO.setDisabledUntil(feeds.stream().map(AbstractModel::getId).collect(Collectors.toList()), nextUpdateDate);
return feeds;
});
}
private int getBatchSize() {
return Math.min(Flowable.bufferSize(), 3 * config.getApplicationSettings().getBackgroundThreads());
return Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads());
}
private Date getLastLoginThreshold() {
@@ -103,6 +165,39 @@ public class FeedRefreshEngine implements Managed {
@Override
public void stop() {
flow.dispose();
this.feedProcessingLoopExecutor.shutdownNow();
this.refillLoopExecutor.shutdownNow();
this.refillExecutor.shutdownNow();
this.workerExecutor.shutdownNow();
this.databaseUpdaterExecutor.shutdownNow();
}
/**
* returns an ExecutorService with a single thread that discards tasks if a task is already running
*/
private ExecutorService newDiscardingSingleThreadExecutorService() {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return pool;
}
/**
* returns an ExecutorService that blocks submissions until a thread is available
*/
private ExecutorService newBlockingExecutorService(int threads) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>());
pool.setRejectedExecutionHandler((r, e) -> {
if (e.isShutdown()) {
return;
}
try {
e.getQueue().put(r);
} catch (InterruptedException ex) {
log.debug("interrupted while waiting for a slot in the queue.", ex);
Thread.currentThread().interrupt();
}
});
return pool;
}
}