replace homemade threadpool framework with rxjava

This commit is contained in:
Athou
2023-04-29 22:48:30 +02:00
parent 15f93b198c
commit 05ae4eb529
20 changed files with 296 additions and 578 deletions

View File

@@ -0,0 +1,14 @@
package com.commafeed.backend.feed;
import java.util.List;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import lombok.Value;
@Value
public class FeedAndEntries {
Feed feed;
List<FeedEntry> entries;
}

View File

@@ -1,124 +0,0 @@
package com.commafeed.backend.feed;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.hibernate.SessionFactory;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.model.Feed;
@Singleton
public class FeedQueues {
private final SessionFactory sessionFactory;
private final FeedDAO feedDAO;
private final CommaFeedConfiguration config;
private final BlockingDeque<FeedRefreshContext> queue = new LinkedBlockingDeque<>();
private final Meter refill;
@Inject
public FeedQueues(SessionFactory sessionFactory, FeedDAO feedDAO, CommaFeedConfiguration config, MetricRegistry metrics) {
this.sessionFactory = sessionFactory;
this.config = config;
this.feedDAO = feedDAO;
this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill"));
metrics.register(MetricRegistry.name(getClass(), "queue"), (Gauge<Integer>) queue::size);
}
/**
* take a feed from the refresh queue
*/
public synchronized FeedRefreshContext take() {
FeedRefreshContext context = queue.poll();
if (context != null) {
return context;
}
refill();
try {
// try to get something from the queue
// if the queue is empty, wait a bit
// polling the queue instead of sleeping gives us the opportunity to process a feed immediately if it was added manually with
// add()
return queue.poll(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException("interrupted while waiting for a feed in the queue", e);
}
}
/**
* add a feed to the refresh queue
*/
public void add(Feed feed, boolean urgent) {
if (isFeedAlreadyQueued(feed)) {
return;
}
FeedRefreshContext context = new FeedRefreshContext(feed, urgent);
if (urgent) {
queue.addFirst(context);
} else {
queue.addLast(context);
}
}
/**
* refills the refresh queue
*/
private void refill() {
refill.mark();
// add feeds that are up to refresh from the database
int batchSize = Math.min(100, 3 * config.getApplicationSettings().getBackgroundThreads());
List<Feed> feeds = UnitOfWork.call(sessionFactory, () -> {
List<Feed> list = feedDAO.findNextUpdatable(batchSize, getLastLoginThreshold());
// set the disabledDate as we use it in feedDAO.findNextUpdatable() to decide what to refresh next
Date nextRefreshDate = DateUtils.addMinutes(new Date(), config.getApplicationSettings().getRefreshIntervalMinutes());
list.forEach(f -> f.setDisabledUntil(nextRefreshDate));
feedDAO.saveOrUpdate(list);
return list;
});
feeds.forEach(f -> add(f, false));
}
public void giveBack(Feed feed) {
String normalized = FeedUtils.normalizeURL(feed.getUrl());
feed.setNormalizedUrl(normalized);
feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized));
feed.setLastUpdated(new Date());
UnitOfWork.run(sessionFactory, () -> feedDAO.saveOrUpdate(feed));
// we just finished updating the feed, remove it from the queue
queue.removeIf(c -> isFeedAlreadyQueued(c.getFeed()));
}
private Date getLastLoginThreshold() {
if (config.getApplicationSettings().getHeavyLoad()) {
return DateUtils.addDays(new Date(), -30);
} else {
return null;
}
}
private boolean isFeedAlreadyQueued(Feed feed) {
return queue.stream().anyMatch(c -> c.getFeed().getId().equals(feed.getId()));
}
}

View File

@@ -1,22 +0,0 @@
package com.commafeed.backend.feed;
import java.util.List;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class FeedRefreshContext {
private Feed feed;
private List<FeedEntry> entries;
private boolean urgent;
public FeedRefreshContext(Feed feed, boolean isUrgent) {
this.feed = feed;
this.urgent = isUrgent;
}
}

View File

@@ -1,99 +0,0 @@
package com.commafeed.backend.feed;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.extern.slf4j.Slf4j;
/**
* Wraps a {@link ThreadPoolExecutor} instance. Blocks when queue is full instead of rejecting the task. Allow priority queueing by using
* {@link Task} instead of {@link Runnable}
*
*/
@Slf4j
public class FeedRefreshExecutor {
private String poolName;
private ThreadPoolExecutor pool;
private LinkedBlockingDeque<Runnable> queue;
public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity, MetricRegistry metrics) {
log.info("Creating pool {} with {} threads", poolName, threads);
this.poolName = poolName;
pool = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queue = new LinkedBlockingDeque<Runnable>(queueCapacity) {
private static final long serialVersionUID = 1L;
@Override
public boolean offer(Runnable r) {
Task task = (Task) r;
if (task.isUrgent()) {
return offerFirst(r);
} else {
return offerLast(r);
}
}
}) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
log.error("thread from pool {} threw a runtime exception", poolName, t);
}
}
};
pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.debug("{} thread queue full, waiting...", poolName);
try {
Task task = (Task) r;
if (task.isUrgent()) {
queue.putFirst(r);
} else {
queue.put(r);
}
} catch (InterruptedException e1) {
log.error(poolName + " interrupted while waiting for queue.", e1);
}
}
});
metrics.register(MetricRegistry.name(getClass(), poolName, "active"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return pool.getActiveCount();
}
});
metrics.register(MetricRegistry.name(getClass(), poolName, "pending"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
});
}
public void execute(Task task) {
pool.execute(task);
}
public void shutdown() {
pool.shutdownNow();
while (!pool.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("{} interrupted while waiting for threads to finish.", poolName);
}
}
}
public interface Task extends Runnable {
boolean isUrgent();
}
}

View File

@@ -1,75 +0,0 @@
package com.commafeed.backend.feed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.inject.Inject;
import javax.inject.Singleton;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.FeedDAO;
import io.dropwizard.lifecycle.Managed;
import lombok.extern.slf4j.Slf4j;
/**
* Infinite loop fetching feeds from @FeedQueues and queuing them to the {@link FeedRefreshWorker} pool.
*
*/
@Slf4j
@Singleton
public class FeedRefreshTaskGiver implements Managed {
private final FeedQueues queues;
private final FeedRefreshWorker worker;
private final ExecutorService executor;
private final Meter feedRefreshed;
@Inject
public FeedRefreshTaskGiver(FeedQueues queues, FeedDAO feedDAO, FeedRefreshWorker worker, CommaFeedConfiguration config,
MetricRegistry metrics) {
this.queues = queues;
this.worker = worker;
executor = Executors.newFixedThreadPool(1);
feedRefreshed = metrics.meter(MetricRegistry.name(getClass(), "feedRefreshed"));
}
@Override
public void stop() {
log.info("shutting down feed refresh task giver");
executor.shutdownNow();
while (!executor.isTerminated()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("interrupted while waiting for threads to finish.");
}
}
}
@Override
public void start() {
log.info("starting feed refresh task giver");
executor.execute(new Runnable() {
@Override
public void run() {
while (!executor.isShutdown()) {
try {
FeedRefreshContext context = queues.take();
if (context != null) {
feedRefreshed.mark();
worker.updateFeed(context);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
});
}
}

View File

@@ -20,17 +20,16 @@ import org.hibernate.SessionFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.CommaFeedConfiguration.ApplicationSettings;
import com.commafeed.backend.cache.CacheService;
import com.commafeed.backend.dao.FeedSubscriptionDAO;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.feed.FeedRefreshExecutor.Task;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedEntryContent;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.commafeed.backend.service.FeedUpdateService;
import com.commafeed.backend.service.FeedEntryService;
import com.commafeed.backend.service.FeedService;
import com.commafeed.backend.service.PubSubService;
import com.commafeed.frontend.ws.WebSocketMessageBuilder;
import com.commafeed.frontend.ws.WebSocketSessions;
@@ -45,15 +44,14 @@ import lombok.extern.slf4j.Slf4j;
public class FeedRefreshUpdater implements Managed {
private final SessionFactory sessionFactory;
private final FeedUpdateService feedUpdateService;
private final FeedService feedService;
private final FeedEntryService feedEntryService;
private final PubSubService pubSubService;
private final FeedQueues queues;
private final CommaFeedConfiguration config;
private final FeedSubscriptionDAO feedSubscriptionDAO;
private final CacheService cache;
private final WebSocketSessions webSocketSessions;
private final FeedRefreshExecutor pool;
private final Striped<Lock> locks;
private final Meter entryCacheMiss;
@@ -62,22 +60,19 @@ public class FeedRefreshUpdater implements Managed {
private final Meter entryInserted;
@Inject
public FeedRefreshUpdater(SessionFactory sessionFactory, FeedUpdateService feedUpdateService, PubSubService pubSubService,
FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO,
public FeedRefreshUpdater(SessionFactory sessionFactory, FeedService feedService, FeedEntryService feedEntryService,
PubSubService pubSubService, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO,
CacheService cache, WebSocketSessions webSocketSessions) {
this.sessionFactory = sessionFactory;
this.feedUpdateService = feedUpdateService;
this.feedService = feedService;
this.feedEntryService = feedEntryService;
this.pubSubService = pubSubService;
this.queues = queues;
this.config = config;
this.feedSubscriptionDAO = feedSubscriptionDAO;
this.cache = cache;
this.webSocketSessions = webSocketSessions;
ApplicationSettings settings = config.getApplicationSettings();
int threads = Math.max(settings.getDatabaseUpdateThreads(), 1);
pool = new FeedRefreshExecutor("feed-refresh-updater", threads, Math.min(50 * threads, 1000), metrics);
locks = Striped.lazyWeakLock(threads * 100000);
locks = Striped.lazyWeakLock(100000);
entryCacheMiss = metrics.meter(MetricRegistry.name(getClass(), "entryCacheMiss"));
entryCacheHit = metrics.meter(MetricRegistry.name(getClass(), "entryCacheHit"));
@@ -85,20 +80,6 @@ public class FeedRefreshUpdater implements Managed {
entryInserted = metrics.meter(MetricRegistry.name(getClass(), "entryInserted"));
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
log.info("shutting down feed refresh updater");
pool.shutdown();
}
public void updateFeed(FeedRefreshContext context) {
pool.execute(new EntryTask(context));
}
private AddEntryResult addEntry(final Feed feed, final FeedEntry entry, final List<FeedSubscription> subscriptions) {
boolean processed = false;
boolean inserted = false;
@@ -123,7 +104,7 @@ public class FeedRefreshUpdater implements Managed {
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
if (locked1 && locked2) {
processed = true;
inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions));
inserted = UnitOfWork.call(sessionFactory, () -> feedEntryService.addEntry(feed, entry, subscriptions));
if (inserted) {
entryInserted.mark();
}
@@ -166,76 +147,63 @@ public class FeedRefreshUpdater implements Managed {
}
}
private class EntryTask implements Task {
public boolean update(Feed feed, List<FeedEntry> entries) {
boolean processed = true;
boolean insertedAtLeastOneEntry = false;
private final FeedRefreshContext context;
if (!entries.isEmpty()) {
List<String> lastEntries = cache.getLastEntries(feed);
List<String> currentEntries = new ArrayList<>();
public EntryTask(FeedRefreshContext context) {
this.context = context;
}
@Override
public void run() {
boolean processed = true;
boolean insertedAtLeastOneEntry = false;
final Feed feed = context.getFeed();
List<FeedEntry> entries = context.getEntries();
if (entries.isEmpty()) {
feed.setMessage("Feed has no entries");
} else {
List<String> lastEntries = cache.getLastEntries(feed);
List<String> currentEntries = new ArrayList<>();
List<FeedSubscription> subscriptions = null;
for (FeedEntry entry : entries) {
String cacheKey = cache.buildUniqueEntryKey(feed, entry);
if (!lastEntries.contains(cacheKey)) {
log.debug("cache miss for {}", entry.getUrl());
if (subscriptions == null) {
subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed));
}
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
processed &= addEntryResult.processed;
insertedAtLeastOneEntry |= addEntryResult.inserted;
entryCacheMiss.mark();
} else {
log.debug("cache hit for {}", entry.getUrl());
entryCacheHit.mark();
List<FeedSubscription> subscriptions = null;
for (FeedEntry entry : entries) {
String cacheKey = cache.buildUniqueEntryKey(feed, entry);
if (!lastEntries.contains(cacheKey)) {
log.debug("cache miss for {}", entry.getUrl());
if (subscriptions == null) {
subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed));
}
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
processed &= addEntryResult.processed;
insertedAtLeastOneEntry |= addEntryResult.inserted;
currentEntries.add(cacheKey);
entryCacheMiss.mark();
} else {
log.debug("cache hit for {}", entry.getUrl());
entryCacheHit.mark();
}
cache.setLastEntries(feed, currentEntries);
if (subscriptions == null) {
feed.setMessage("No new entries found");
} else if (insertedAtLeastOneEntry) {
List<User> users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList());
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
cache.invalidateUserRootCategory(users.toArray(new User[0]));
currentEntries.add(cacheKey);
}
cache.setLastEntries(feed, currentEntries);
// notify over websocket
subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub)));
}
}
if (subscriptions == null) {
feed.setMessage("No new entries found");
} else if (insertedAtLeastOneEntry) {
List<User> users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList());
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
cache.invalidateUserRootCategory(users.toArray(new User[0]));
if (config.getApplicationSettings().getPubsubhubbub()) {
handlePubSub(feed);
}
if (!processed) {
// requeue asap
feed.setDisabledUntil(new Date(0));
// notify over websocket
subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub)));
}
}
if (Boolean.TRUE.equals(config.getApplicationSettings().getPubsubhubbub())) {
handlePubSub(feed);
}
if (!processed) {
// requeue asap
feed.setDisabledUntil(new Date(0));
}
if (insertedAtLeastOneEntry) {
feedUpdated.mark();
queues.giveBack(feed);
}
@Override
public boolean isUrgent() {
return context.isUrgent();
}
UnitOfWork.run(sessionFactory, () -> feedService.save(feed));
return processed;
}
@AllArgsConstructor

View File

@@ -1,5 +1,6 @@
package com.commafeed.backend.feed;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -10,58 +11,38 @@ import javax.inject.Singleton;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.HttpGetter.NotModifiedException;
import com.commafeed.backend.feed.FeedRefreshExecutor.Task;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import io.dropwizard.lifecycle.Managed;
import lombok.extern.slf4j.Slf4j;
/**
* Calls {@link FeedFetcher} and handles its outcome
*
*/
@Slf4j
@Singleton
public class FeedRefreshWorker implements Managed {
public class FeedRefreshWorker {
private final FeedRefreshUpdater feedRefreshUpdater;
private final FeedRefreshIntervalCalculator refreshIntervalCalculator;
private final FeedFetcher fetcher;
private final FeedQueues queues;
private final CommaFeedConfiguration config;
private final FeedRefreshExecutor pool;
private final Meter feedFetched;
@Inject
public FeedRefreshWorker(FeedRefreshUpdater feedRefreshUpdater, FeedRefreshIntervalCalculator refreshIntervalCalculator,
FeedFetcher fetcher, FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics) {
this.feedRefreshUpdater = feedRefreshUpdater;
public FeedRefreshWorker(FeedRefreshIntervalCalculator refreshIntervalCalculator, FeedFetcher fetcher, CommaFeedConfiguration config,
MetricRegistry metrics) {
this.refreshIntervalCalculator = refreshIntervalCalculator;
this.fetcher = fetcher;
this.config = config;
this.queues = queues;
int threads = config.getApplicationSettings().getBackgroundThreads();
pool = new FeedRefreshExecutor("feed-refresh-worker", threads, Math.min(20 * threads, 1000), metrics);
this.feedFetched = metrics.meter(MetricRegistry.name(getClass(), "feedFetched"));
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
pool.shutdown();
}
public void updateFeed(FeedRefreshContext context) {
pool.execute(new FeedTask(context));
}
private void update(FeedRefreshContext context) {
Feed feed = context.getFeed();
public FeedAndEntries update(Feed feed) {
try {
String url = Optional.ofNullable(feed.getUrlAfterRedirect()).orElse(feed.getUrl());
FetchedFeed fetchedFeed = fetcher.fetch(url, false, feed.getLastModifiedHeader(), feed.getEtagHeader(),
@@ -92,9 +73,8 @@ public class FeedRefreshWorker implements Managed {
feed.setDisabledUntil(refreshIntervalCalculator.onFetchSuccess(fetchedFeed));
handlePubSub(feed, fetchedFeed.getFeed());
context.setEntries(entries);
feedRefreshUpdater.updateFeed(context);
return new FeedAndEntries(feed, entries);
} catch (NotModifiedException e) {
log.debug("Feed not modified : {} - {}", feed.getUrl(), e.getMessage());
@@ -110,7 +90,7 @@ public class FeedRefreshWorker implements Managed {
feed.setEtagHeader(e.getNewEtagHeader());
}
queues.giveBack(feed);
return new FeedAndEntries(feed, Collections.emptyList());
} catch (Exception e) {
String message = "Unable to refresh feed " + feed.getUrl() + " : " + e.getMessage();
log.debug(e.getClass().getName() + " " + message, e);
@@ -119,7 +99,9 @@ public class FeedRefreshWorker implements Managed {
feed.setMessage(message);
feed.setDisabledUntil(refreshIntervalCalculator.onFetchError(feed));
queues.giveBack(feed);
return new FeedAndEntries(feed, Collections.emptyList());
} finally {
feedFetched.mark();
}
}
@@ -145,22 +127,4 @@ public class FeedRefreshWorker implements Managed {
}
}
private class FeedTask implements Task {
private final FeedRefreshContext context;
public FeedTask(FeedRefreshContext context) {
this.context = context;
}
@Override
public void run() {
update(context);
}
@Override
public boolean isUrgent() {
return context.isUrgent();
}
}
}