diff --git a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java index 9cecaf31..545e21a0 100644 --- a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java +++ b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java @@ -10,7 +10,9 @@ import com.commafeed.security.password.PasswordConstraintValidator; import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +@Slf4j @Singleton @RequiredArgsConstructor public class CommaFeedApplication { @@ -27,6 +29,8 @@ public class CommaFeedApplication { } public void stop(@Observes ShutdownEvent ev) { + log.info("shutting down..."); + feedRefreshEngine.stop(); taskScheduler.stop(); } diff --git a/commafeed-server/src/main/java/com/commafeed/CommaFeedConfiguration.java b/commafeed-server/src/main/java/com/commafeed/CommaFeedConfiguration.java index a000ca92..d4ae4cfc 100644 --- a/commafeed-server/src/main/java/com/commafeed/CommaFeedConfiguration.java +++ b/commafeed-server/src/main/java/com/commafeed/CommaFeedConfiguration.java @@ -92,6 +92,12 @@ public interface CommaFeedConfiguration { @ConfigDocSection Websocket websocket(); + /** + * Duration to wait for the feed refresh engine and the task scheduler to stop when the application is shutting down. + */ + @WithDefault("2s") + Duration shutdownTimeout(); + interface HttpClient { /** * User-Agent string that will be used by the http client, leave empty for the default one. diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java index 42b7f18c..36498277 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshEngine.java @@ -24,6 +24,7 @@ import com.commafeed.backend.model.AbstractModel; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedSubscription; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; @@ -197,12 +198,12 @@ public class FeedRefreshEngine { } public void stop() { - this.feedProcessingLoopExecutor.shutdownNow(); - this.refillLoopExecutor.shutdownNow(); - this.refillExecutor.shutdownNow(); - this.workerExecutor.shutdownNow(); - this.databaseUpdaterExecutor.shutdownNow(); - this.notifierExecutor.shutdownNow(); + MoreExecutors.shutdownAndAwaitTermination(this.feedProcessingLoopExecutor, config.shutdownTimeout()); + MoreExecutors.shutdownAndAwaitTermination(this.refillLoopExecutor, config.shutdownTimeout()); + MoreExecutors.shutdownAndAwaitTermination(this.refillExecutor, config.shutdownTimeout()); + MoreExecutors.shutdownAndAwaitTermination(this.workerExecutor, config.shutdownTimeout()); + MoreExecutors.shutdownAndAwaitTermination(this.databaseUpdaterExecutor, config.shutdownTimeout()); + MoreExecutors.shutdownAndAwaitTermination(this.notifierExecutor, config.shutdownTimeout()); } /** diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/db/DatabaseCleaningService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/db/DatabaseCleaningService.java index 63098966..822bbe02 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/db/DatabaseCleaningService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/db/DatabaseCleaningService.java @@ -54,10 +54,20 @@ public class DatabaseCleaningService { int deleted; long entriesTotal = 0; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of feeds without subscriptions"); + return; + } + List feeds = unitOfWork.call(() -> feedDAO.findWithoutSubscriptions(1)); for (Feed feed : feeds) { long entriesDeleted; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of feeds without subscriptions"); + return; + } + entriesDeleted = unitOfWork.call(() -> feedEntryDAO.delete(feed.getId(), batchSize)); entriesDeletedMeter.mark(entriesDeleted); entriesTotal += entriesDeleted; @@ -76,6 +86,11 @@ public class DatabaseCleaningService { long total = 0; long deleted; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of contents without entries"); + return; + } + deleted = unitOfWork.call(() -> feedEntryContentDAO.deleteWithoutEntries(batchSize)); total += deleted; log.debug("removed {} contents without entries", total); @@ -87,6 +102,11 @@ public class DatabaseCleaningService { log.info("cleaning entries exceeding feed capacity"); long total = 0; while (true) { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of entries exceeding feed capacity"); + return; + } + List feeds = unitOfWork .call(() -> feedEntryDAO.findFeedsExceedingCapacity(maxFeedCapacity, batchSize, keepStarredEntries)); if (feeds.isEmpty()) { @@ -97,6 +117,11 @@ public class DatabaseCleaningService { long remaining = feed.capacity() - maxFeedCapacity; int deleted; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of entries exceeding feed capacity"); + return; + } + final long rem = remaining; deleted = unitOfWork.call(() -> feedEntryDAO.deleteOldEntries(feed.id(), Math.min(batchSize, rem), keepStarredEntries)); entriesDeletedMeter.mark(deleted); @@ -114,6 +139,11 @@ public class DatabaseCleaningService { long total = 0; long deleted; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of old entries"); + return; + } + deleted = unitOfWork.call(() -> feedEntryDAO.deleteEntriesOlderThan(olderThan, batchSize, keepStarredEntries)); entriesDeletedMeter.mark(deleted); total += deleted; @@ -127,6 +157,11 @@ public class DatabaseCleaningService { long total = 0; long deleted; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping cleanup of old read statuses"); + return; + } + deleted = unitOfWork.call(() -> feedEntryStatusDAO.deleteOldStatuses(olderThan, batchSize)); total += deleted; log.debug("removed {} old read statuses", total); @@ -139,6 +174,11 @@ public class DatabaseCleaningService { long total = 0; long marked; do { + if (Thread.currentThread().isInterrupted()) { + log.info("interrupted, stopping marking entries as read"); + return; + } + marked = unitOfWork.call(() -> feedEntryStatusDAO.autoMarkAsRead(batchSize)); total += marked; log.debug("marked {} entries as read", total); diff --git a/commafeed-server/src/main/java/com/commafeed/backend/task/TaskScheduler.java b/commafeed-server/src/main/java/com/commafeed/backend/task/TaskScheduler.java index aad6d2f6..f63c5cb9 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/task/TaskScheduler.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/task/TaskScheduler.java @@ -6,16 +6,23 @@ import java.util.concurrent.ScheduledExecutorService; import jakarta.inject.Singleton; -import io.quarkus.arc.All; +import com.commafeed.CommaFeedConfiguration; +import com.google.common.util.concurrent.MoreExecutors; +import io.quarkus.arc.All; +import lombok.extern.slf4j.Slf4j; + +@Slf4j @Singleton public class TaskScheduler { private final List tasks; + private final CommaFeedConfiguration config; private final ScheduledExecutorService executor; - public TaskScheduler(@All List tasks) { + public TaskScheduler(@All List tasks, CommaFeedConfiguration config) { this.tasks = tasks; + this.config = config; this.executor = Executors.newScheduledThreadPool(tasks.size()); } @@ -24,6 +31,6 @@ public class TaskScheduler { } public void stop() { - executor.shutdownNow(); + MoreExecutors.shutdownAndAwaitTermination(executor, config.shutdownTimeout()); } }