stop the application between tests to make sure that there are no active transactions when we truncate the tables

This commit is contained in:
Athou
2026-03-04 13:49:44 +01:00
parent 42d1db5fc3
commit eaa5bc896e
5 changed files with 40 additions and 24 deletions

View File

@@ -22,6 +22,8 @@ public class CommaFeedApplication {
private final CommaFeedConfiguration config; private final CommaFeedConfiguration config;
public void start(@Observes StartupEvent ev) { public void start(@Observes StartupEvent ev) {
log.info("starting up...");
PasswordConstraintValidator.setMinimumPasswordLength(config.users().minimumPasswordLength()); PasswordConstraintValidator.setMinimumPasswordLength(config.users().minimumPasswordLength());
feedRefreshEngine.start(); feedRefreshEngine.start();

View File

@@ -42,12 +42,12 @@ public class FeedRefreshEngine {
private final BlockingDeque<Feed> queue; private final BlockingDeque<Feed> queue;
private final ExecutorService feedProcessingLoopExecutor; private ExecutorService feedProcessingLoopExecutor;
private final ExecutorService refillLoopExecutor; private ExecutorService refillLoopExecutor;
private final ExecutorService refillExecutor; private ThreadPoolExecutor refillExecutor;
private final ThreadPoolExecutor workerExecutor; private ThreadPoolExecutor workerExecutor;
private final ThreadPoolExecutor databaseUpdaterExecutor; private ThreadPoolExecutor databaseUpdaterExecutor;
private final ThreadPoolExecutor notifierExecutor; private ThreadPoolExecutor notifierExecutor;
public FeedRefreshEngine(UnitOfWork unitOfWork, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater, public FeedRefreshEngine(UnitOfWork unitOfWork, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater,
FeedUpdateNotifier notifier, CommaFeedConfiguration config, MetricRegistry metrics) { FeedUpdateNotifier notifier, CommaFeedConfiguration config, MetricRegistry metrics) {
@@ -61,6 +61,15 @@ public class FeedRefreshEngine {
this.queue = new LinkedBlockingDeque<>(); this.queue = new LinkedBlockingDeque<>();
metrics.register(MetricRegistry.name(getClass(), "queue", "size"), (Gauge<Integer>) queue::size);
metrics.register(MetricRegistry.name(getClass(), "worker", "active"), (Gauge<Integer>) () -> workerExecutor.getActiveCount());
metrics.register(MetricRegistry.name(getClass(), "updater", "active"),
(Gauge<Integer>) () -> databaseUpdaterExecutor.getActiveCount());
metrics.register(MetricRegistry.name(getClass(), "notifier", "active"), (Gauge<Integer>) () -> notifierExecutor.getActiveCount());
metrics.register(MetricRegistry.name(getClass(), "notifier", "queue"), (Gauge<Integer>) () -> notifierExecutor.getQueue().size());
}
private void createExecutors() {
this.feedProcessingLoopExecutor = Executors.newSingleThreadExecutor(); this.feedProcessingLoopExecutor = Executors.newSingleThreadExecutor();
this.refillLoopExecutor = Executors.newSingleThreadExecutor(); this.refillLoopExecutor = Executors.newSingleThreadExecutor();
this.refillExecutor = newDiscardingSingleThreadExecutorService(); this.refillExecutor = newDiscardingSingleThreadExecutorService();
@@ -68,15 +77,10 @@ public class FeedRefreshEngine {
this.databaseUpdaterExecutor = newBlockingExecutorService(config.feedRefresh().databaseThreads()); this.databaseUpdaterExecutor = newBlockingExecutorService(config.feedRefresh().databaseThreads());
this.notifierExecutor = newDiscardingExecutorService(config.pushNotifications().threads(), this.notifierExecutor = newDiscardingExecutorService(config.pushNotifications().threads(),
config.pushNotifications().queueCapacity()); config.pushNotifications().queueCapacity());
metrics.register(MetricRegistry.name(getClass(), "queue", "size"), (Gauge<Integer>) queue::size);
metrics.register(MetricRegistry.name(getClass(), "worker", "active"), (Gauge<Integer>) workerExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "updater", "active"), (Gauge<Integer>) databaseUpdaterExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "notifier", "active"), (Gauge<Integer>) notifierExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "notifier", "queue"), (Gauge<Integer>) () -> notifierExecutor.getQueue().size());
} }
public void start() { public void start() {
createExecutors();
startFeedProcessingLoop(); startFeedProcessingLoop();
startRefillLoop(); startRefillLoop();
} }
@@ -204,6 +208,8 @@ public class FeedRefreshEngine {
MoreExecutors.shutdownAndAwaitTermination(this.workerExecutor, config.shutdownTimeout()); MoreExecutors.shutdownAndAwaitTermination(this.workerExecutor, config.shutdownTimeout());
MoreExecutors.shutdownAndAwaitTermination(this.databaseUpdaterExecutor, config.shutdownTimeout()); MoreExecutors.shutdownAndAwaitTermination(this.databaseUpdaterExecutor, config.shutdownTimeout());
MoreExecutors.shutdownAndAwaitTermination(this.notifierExecutor, config.shutdownTimeout()); MoreExecutors.shutdownAndAwaitTermination(this.notifierExecutor, config.shutdownTimeout());
queue.clear();
} }
/** /**

View File

@@ -23,8 +23,8 @@ public abstract class ScheduledTask {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
}; };
log.info("registering task {} for execution every {} {}, starting in {} {}", getClass().getSimpleName(), getPeriod(), getTimeUnit(), log.debug("registering task {} for execution every {} {}, starting in {} {}", getClass().getSimpleName(), getPeriod(),
getInitialDelay(), getTimeUnit()); getTimeUnit(), getInitialDelay(), getTimeUnit());
executor.scheduleWithFixedDelay(runnable, getInitialDelay(), getPeriod(), getTimeUnit()); executor.scheduleWithFixedDelay(runnable, getInitialDelay(), getPeriod(), getTimeUnit());
} }
} }

View File

@@ -18,16 +18,17 @@ public class TaskScheduler {
private final List<ScheduledTask> tasks; private final List<ScheduledTask> tasks;
private final CommaFeedConfiguration config; private final CommaFeedConfiguration config;
private final ScheduledExecutorService executor;
private ScheduledExecutorService executor;
public TaskScheduler(@All List<ScheduledTask> tasks, CommaFeedConfiguration config) { public TaskScheduler(@All List<ScheduledTask> tasks, CommaFeedConfiguration config) {
this.tasks = tasks; this.tasks = tasks;
this.config = config; this.config = config;
this.executor = Executors.newScheduledThreadPool(tasks.size());
} }
public void start() { public void start() {
tasks.forEach(task -> task.register(executor)); this.executor = Executors.newScheduledThreadPool(tasks.size());
this.tasks.forEach(task -> task.register(executor));
} }
public void stop() { public void stop() {

View File

@@ -6,6 +6,8 @@ import jakarta.persistence.EntityManager;
import org.hibernate.Session; import org.hibernate.Session;
import org.kohsuke.MetaInfServices; import org.kohsuke.MetaInfServices;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.junit.callback.QuarkusTestBeforeEachCallback; import io.quarkus.test.junit.callback.QuarkusTestBeforeEachCallback;
import io.quarkus.test.junit.callback.QuarkusTestMethodContext; import io.quarkus.test.junit.callback.QuarkusTestMethodContext;
@@ -17,12 +19,17 @@ public class DatabaseReset implements QuarkusTestBeforeEachCallback {
@Override @Override
public void beforeEach(QuarkusTestMethodContext context) { public void beforeEach(QuarkusTestMethodContext context) {
CDI.current() // stop the application to make sure that there are no active transactions when we truncate the tables
.select(EntityManager.class) getBean(CommaFeedApplication.class).stop(new ShutdownEvent());
.get()
.unwrap(Session.class) // truncate all tables so that we have a clean slate for the next test
.getSessionFactory() getBean(EntityManager.class).unwrap(Session.class).getSessionFactory().getSchemaManager().truncateMappedObjects();
.getSchemaManager()
.truncateMappedObjects(); // restart the application
getBean(CommaFeedApplication.class).start(new StartupEvent());
}
private static <T> T getBean(Class<T> clazz) {
return CDI.current().select(clazz).get();
} }
} }