forked from Archives/Athou_commafeed
handle background threads ourselves, leave the stateless pool intact
This commit is contained in:
@@ -2,8 +2,6 @@ package com.commafeed.backend;
|
|||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
@@ -11,12 +9,16 @@ import javax.ejb.ConcurrencyManagement;
|
|||||||
import javax.ejb.ConcurrencyManagementType;
|
import javax.ejb.ConcurrencyManagementType;
|
||||||
import javax.ejb.Singleton;
|
import javax.ejb.Singleton;
|
||||||
import javax.ejb.Startup;
|
import javax.ejb.Startup;
|
||||||
|
import javax.enterprise.inject.Instance;
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
|
|
||||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import scala.actors.threadpool.ExecutorService;
|
||||||
|
import scala.actors.threadpool.Executors;
|
||||||
|
|
||||||
import com.commafeed.backend.dao.FeedCategoryDAO;
|
import com.commafeed.backend.dao.FeedCategoryDAO;
|
||||||
import com.commafeed.backend.dao.FeedDAO;
|
import com.commafeed.backend.dao.FeedDAO;
|
||||||
import com.commafeed.backend.dao.FeedSubscriptionDAO;
|
import com.commafeed.backend.dao.FeedSubscriptionDAO;
|
||||||
@@ -26,7 +28,6 @@ import com.commafeed.backend.model.ApplicationSettings;
|
|||||||
import com.commafeed.backend.model.UserRole.Role;
|
import com.commafeed.backend.model.UserRole.Role;
|
||||||
import com.commafeed.backend.services.ApplicationSettingsService;
|
import com.commafeed.backend.services.ApplicationSettingsService;
|
||||||
import com.commafeed.backend.services.UserService;
|
import com.commafeed.backend.services.UserService;
|
||||||
import com.google.api.client.util.Lists;
|
|
||||||
|
|
||||||
@Startup
|
@Startup
|
||||||
@Singleton
|
@Singleton
|
||||||
@@ -56,12 +57,11 @@ public class StartupBean {
|
|||||||
ApplicationSettingsService applicationSettingsService;
|
ApplicationSettingsService applicationSettingsService;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
FeedRefreshWorker worker;
|
Instance<FeedRefreshWorker> workers;
|
||||||
|
|
||||||
private List<Future<Void>> threads = Lists.newArrayList();
|
|
||||||
|
|
||||||
private long startupTime;
|
private long startupTime;
|
||||||
|
|
||||||
|
private ExecutorService executor;
|
||||||
private MutableBoolean running = new MutableBoolean(true);
|
private MutableBoolean running = new MutableBoolean(true);
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
@@ -72,11 +72,19 @@ public class StartupBean {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ApplicationSettings settings = applicationSettingsService.get();
|
ApplicationSettings settings = applicationSettingsService.get();
|
||||||
log.info("Starting {} background threads",
|
int threads = settings.getBackgroundThreads();
|
||||||
settings.getBackgroundThreads());
|
log.info("Starting {} background threads", threads);
|
||||||
for (int i = 0; i < settings.getBackgroundThreads(); i++) {
|
|
||||||
Future<Void> thread = worker.start(running, "Thread " + i);
|
executor = Executors.newFixedThreadPool(threads);
|
||||||
threads.add(thread);
|
for (int i = 0; i < threads; i++) {
|
||||||
|
final int threadId = i;
|
||||||
|
executor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
FeedRefreshWorker worker = workers.get();
|
||||||
|
worker.start(running, "Thread " + threadId);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -96,6 +104,14 @@ public class StartupBean {
|
|||||||
@PreDestroy
|
@PreDestroy
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
running.setValue(false);
|
running.setValue(false);
|
||||||
|
executor.shutdownNow();
|
||||||
|
while (!executor.isTerminated()) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("interrupted while waiting for threads to finish.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,7 @@ package com.commafeed.backend.feeds;
|
|||||||
|
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import javax.ejb.AsyncResult;
|
|
||||||
import javax.ejb.Asynchronous;
|
|
||||||
import javax.ejb.Stateless;
|
|
||||||
import javax.ejb.TransactionAttribute;
|
|
||||||
import javax.ejb.TransactionAttributeType;
|
|
||||||
import javax.inject.Inject;
|
import javax.inject.Inject;
|
||||||
import javax.transaction.HeuristicMixedException;
|
import javax.transaction.HeuristicMixedException;
|
||||||
import javax.transaction.HeuristicRollbackException;
|
import javax.transaction.HeuristicRollbackException;
|
||||||
@@ -26,8 +20,6 @@ import com.commafeed.backend.dao.FeedDAO;
|
|||||||
import com.commafeed.backend.model.Feed;
|
import com.commafeed.backend.model.Feed;
|
||||||
import com.commafeed.backend.services.FeedUpdateService;
|
import com.commafeed.backend.services.FeedUpdateService;
|
||||||
|
|
||||||
@Stateless
|
|
||||||
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
|
|
||||||
public class FeedRefreshWorker {
|
public class FeedRefreshWorker {
|
||||||
|
|
||||||
private static Logger log = LoggerFactory
|
private static Logger log = LoggerFactory
|
||||||
@@ -45,8 +37,7 @@ public class FeedRefreshWorker {
|
|||||||
@Inject
|
@Inject
|
||||||
FeedRefreshTaskGiver taskGiver;
|
FeedRefreshTaskGiver taskGiver;
|
||||||
|
|
||||||
@Asynchronous
|
public void start(MutableBoolean running, String threadName) {
|
||||||
public Future<Void> start(MutableBoolean running, String threadName) {
|
|
||||||
log.info("{} starting", threadName);
|
log.info("{} starting", threadName);
|
||||||
while (running.isTrue()) {
|
while (running.isTrue()) {
|
||||||
Feed feed = null;
|
Feed feed = null;
|
||||||
@@ -59,6 +50,9 @@ public class FeedRefreshWorker {
|
|||||||
log.debug("sleeping");
|
log.debug("sleeping");
|
||||||
Thread.sleep(15000);
|
Thread.sleep(15000);
|
||||||
}
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.info(threadName + " interrupted");
|
||||||
|
return;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String feedUrl = "feed is null";
|
String feedUrl = "feed is null";
|
||||||
if (feed != null) {
|
if (feed != null) {
|
||||||
@@ -69,7 +63,6 @@ public class FeedRefreshWorker {
|
|||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new AsyncResult<Void>(null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void update(Feed feed) throws NotSupportedException,
|
private void update(Feed feed) throws NotSupportedException,
|
||||||
|
|||||||
@@ -46,5 +46,5 @@
|
|||||||
# javax.persistence.jtaDataSource =
|
# javax.persistence.jtaDataSource =
|
||||||
# javax.persistence.nonJtaDataSource =
|
# javax.persistence.nonJtaDataSource =
|
||||||
|
|
||||||
AsynchronousPool.CorePoolSize = 1
|
#AsynchronousPool.CorePoolSize = 1
|
||||||
AsynchronousPool.MaximumPoolSize = 100
|
#AsynchronousPool.MaximumPoolSize = 100
|
||||||
@@ -9,9 +9,11 @@
|
|||||||
MaxActive 50
|
MaxActive 50
|
||||||
</Resource>
|
</Resource>
|
||||||
-->
|
-->
|
||||||
|
<!--
|
||||||
<Container id="CommaFeedStateless" type="STATELESS">
|
<Container id="CommaFeedStateless" type="STATELESS">
|
||||||
MaxSize=50
|
MaxSize=50
|
||||||
PoolSize=50
|
PoolSize=50
|
||||||
StrictPooling=true
|
StrictPooling=true
|
||||||
</Container>
|
</Container>
|
||||||
|
-->
|
||||||
</tomee>
|
</tomee>
|
||||||
|
|||||||
Reference in New Issue
Block a user