wait for tasks to complete when shutting down

This commit is contained in:
Athou
2026-03-04 12:45:09 +01:00
parent 231551d743
commit 78c017ddaf
5 changed files with 67 additions and 9 deletions

View File

@@ -10,7 +10,9 @@ import com.commafeed.security.password.PasswordConstraintValidator;
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent; import io.quarkus.runtime.StartupEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Singleton @Singleton
@RequiredArgsConstructor @RequiredArgsConstructor
public class CommaFeedApplication { public class CommaFeedApplication {
@@ -27,6 +29,8 @@ public class CommaFeedApplication {
} }
public void stop(@Observes ShutdownEvent ev) { public void stop(@Observes ShutdownEvent ev) {
log.info("shutting down...");
feedRefreshEngine.stop(); feedRefreshEngine.stop();
taskScheduler.stop(); taskScheduler.stop();
} }

View File

@@ -92,6 +92,12 @@ public interface CommaFeedConfiguration {
@ConfigDocSection @ConfigDocSection
Websocket websocket(); Websocket websocket();
/**
* Duration to wait for the feed refresh engine and the task scheduler to stop when the application is shutting down.
*/
@WithDefault("2s")
Duration shutdownTimeout();
interface HttpClient { interface HttpClient {
/** /**
* User-Agent string that will be used by the http client, leave empty for the default one. * User-Agent string that will be used by the http client, leave empty for the default one.

View File

@@ -24,6 +24,7 @@ import com.commafeed.backend.model.AbstractModel;
import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.FeedSubscription;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -197,12 +198,12 @@ public class FeedRefreshEngine {
} }
public void stop() { public void stop() {
this.feedProcessingLoopExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.feedProcessingLoopExecutor, config.shutdownTimeout());
this.refillLoopExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.refillLoopExecutor, config.shutdownTimeout());
this.refillExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.refillExecutor, config.shutdownTimeout());
this.workerExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.workerExecutor, config.shutdownTimeout());
this.databaseUpdaterExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.databaseUpdaterExecutor, config.shutdownTimeout());
this.notifierExecutor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(this.notifierExecutor, config.shutdownTimeout());
} }
/** /**

View File

@@ -54,10 +54,20 @@ public class DatabaseCleaningService {
int deleted; int deleted;
long entriesTotal = 0; long entriesTotal = 0;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of feeds without subscriptions");
return;
}
List<Feed> feeds = unitOfWork.call(() -> feedDAO.findWithoutSubscriptions(1)); List<Feed> feeds = unitOfWork.call(() -> feedDAO.findWithoutSubscriptions(1));
for (Feed feed : feeds) { for (Feed feed : feeds) {
long entriesDeleted; long entriesDeleted;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of feeds without subscriptions");
return;
}
entriesDeleted = unitOfWork.call(() -> feedEntryDAO.delete(feed.getId(), batchSize)); entriesDeleted = unitOfWork.call(() -> feedEntryDAO.delete(feed.getId(), batchSize));
entriesDeletedMeter.mark(entriesDeleted); entriesDeletedMeter.mark(entriesDeleted);
entriesTotal += entriesDeleted; entriesTotal += entriesDeleted;
@@ -76,6 +86,11 @@ public class DatabaseCleaningService {
long total = 0; long total = 0;
long deleted; long deleted;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of contents without entries");
return;
}
deleted = unitOfWork.call(() -> feedEntryContentDAO.deleteWithoutEntries(batchSize)); deleted = unitOfWork.call(() -> feedEntryContentDAO.deleteWithoutEntries(batchSize));
total += deleted; total += deleted;
log.debug("removed {} contents without entries", total); log.debug("removed {} contents without entries", total);
@@ -87,6 +102,11 @@ public class DatabaseCleaningService {
log.info("cleaning entries exceeding feed capacity"); log.info("cleaning entries exceeding feed capacity");
long total = 0; long total = 0;
while (true) { while (true) {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of entries exceeding feed capacity");
return;
}
List<FeedCapacity> feeds = unitOfWork List<FeedCapacity> feeds = unitOfWork
.call(() -> feedEntryDAO.findFeedsExceedingCapacity(maxFeedCapacity, batchSize, keepStarredEntries)); .call(() -> feedEntryDAO.findFeedsExceedingCapacity(maxFeedCapacity, batchSize, keepStarredEntries));
if (feeds.isEmpty()) { if (feeds.isEmpty()) {
@@ -97,6 +117,11 @@ public class DatabaseCleaningService {
long remaining = feed.capacity() - maxFeedCapacity; long remaining = feed.capacity() - maxFeedCapacity;
int deleted; int deleted;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of entries exceeding feed capacity");
return;
}
final long rem = remaining; final long rem = remaining;
deleted = unitOfWork.call(() -> feedEntryDAO.deleteOldEntries(feed.id(), Math.min(batchSize, rem), keepStarredEntries)); deleted = unitOfWork.call(() -> feedEntryDAO.deleteOldEntries(feed.id(), Math.min(batchSize, rem), keepStarredEntries));
entriesDeletedMeter.mark(deleted); entriesDeletedMeter.mark(deleted);
@@ -114,6 +139,11 @@ public class DatabaseCleaningService {
long total = 0; long total = 0;
long deleted; long deleted;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of old entries");
return;
}
deleted = unitOfWork.call(() -> feedEntryDAO.deleteEntriesOlderThan(olderThan, batchSize, keepStarredEntries)); deleted = unitOfWork.call(() -> feedEntryDAO.deleteEntriesOlderThan(olderThan, batchSize, keepStarredEntries));
entriesDeletedMeter.mark(deleted); entriesDeletedMeter.mark(deleted);
total += deleted; total += deleted;
@@ -127,6 +157,11 @@ public class DatabaseCleaningService {
long total = 0; long total = 0;
long deleted; long deleted;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping cleanup of old read statuses");
return;
}
deleted = unitOfWork.call(() -> feedEntryStatusDAO.deleteOldStatuses(olderThan, batchSize)); deleted = unitOfWork.call(() -> feedEntryStatusDAO.deleteOldStatuses(olderThan, batchSize));
total += deleted; total += deleted;
log.debug("removed {} old read statuses", total); log.debug("removed {} old read statuses", total);
@@ -139,6 +174,11 @@ public class DatabaseCleaningService {
long total = 0; long total = 0;
long marked; long marked;
do { do {
if (Thread.currentThread().isInterrupted()) {
log.info("interrupted, stopping marking entries as read");
return;
}
marked = unitOfWork.call(() -> feedEntryStatusDAO.autoMarkAsRead(batchSize)); marked = unitOfWork.call(() -> feedEntryStatusDAO.autoMarkAsRead(batchSize));
total += marked; total += marked;
log.debug("marked {} entries as read", total); log.debug("marked {} entries as read", total);

View File

@@ -6,16 +6,23 @@ import java.util.concurrent.ScheduledExecutorService;
import jakarta.inject.Singleton; import jakarta.inject.Singleton;
import io.quarkus.arc.All; import com.commafeed.CommaFeedConfiguration;
import com.google.common.util.concurrent.MoreExecutors;
import io.quarkus.arc.All;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Singleton @Singleton
public class TaskScheduler { public class TaskScheduler {
private final List<ScheduledTask> tasks; private final List<ScheduledTask> tasks;
private final CommaFeedConfiguration config;
private final ScheduledExecutorService executor; private final ScheduledExecutorService executor;
public TaskScheduler(@All List<ScheduledTask> tasks) { public TaskScheduler(@All List<ScheduledTask> tasks, CommaFeedConfiguration config) {
this.tasks = tasks; this.tasks = tasks;
this.config = config;
this.executor = Executors.newScheduledThreadPool(tasks.size()); this.executor = Executors.newScheduledThreadPool(tasks.size());
} }
@@ -24,6 +31,6 @@ public class TaskScheduler {
} }
public void stop() { public void stop() {
executor.shutdownNow(); MoreExecutors.shutdownAndAwaitTermination(executor, config.shutdownTimeout());
} }
} }