Merge pull request #2 from Athou/master

update to new version
This commit is contained in:
Zhe LIN
2013-05-27 17:52:54 -07:00
4 changed files with 40 additions and 28 deletions

View File

@@ -86,6 +86,7 @@ public class HttpGetter {
HttpClient client = newClient(); HttpClient client = newClient();
try { try {
HttpGet httpget = new HttpGet(url); HttpGet httpget = new HttpGet(url);
httpget.addHeader(HttpHeaders.ACCEPT_LANGUAGE, "en");
httpget.addHeader(HttpHeaders.PRAGMA, "No-cache"); httpget.addHeader(HttpHeaders.PRAGMA, "No-cache");
httpget.addHeader(HttpHeaders.CACHE_CONTROL, "no-cache"); httpget.addHeader(HttpHeaders.CACHE_CONTROL, "no-cache");
httpget.addHeader(HttpHeaders.USER_AGENT, httpget.addHeader(HttpHeaders.USER_AGENT,

View File

@@ -14,6 +14,7 @@ import javax.annotation.PreDestroy;
import javax.inject.Inject; import javax.inject.Inject;
import javax.inject.Singleton; import javax.inject.Singleton;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -66,10 +67,10 @@ public class FeedRefreshUpdater {
ApplicationSettings settings = applicationSettingsService.get(); ApplicationSettings settings = applicationSettingsService.get();
int threads = Math.max(settings.getDatabaseUpdateThreads(), 1); int threads = Math.max(settings.getDatabaseUpdateThreads(), 1);
log.info("Creating database pool with {} threads", threads); log.info("Creating database pool with {} threads", threads);
locks = Striped.lazyWeakLock(threads); locks = Striped.lazyWeakLock(threads * 1000);
pool = new ThreadPoolExecutor(threads, threads, 0, pool = new ThreadPoolExecutor(threads, threads, 0,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
queue = new ArrayBlockingQueue<Runnable>(100 * threads)); queue = new ArrayBlockingQueue<Runnable>(500 * threads));
pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
@@ -111,35 +112,48 @@ public class FeedRefreshUpdater {
@Override @Override
public void run() { public void run() {
boolean ok = true;
if (entries != null) { if (entries != null) {
List<FeedSubscription> subscriptions = feedSubscriptionDAO List<FeedSubscription> subscriptions = feedSubscriptionDAO
.findByFeed(feed); .findByFeed(feed);
for (FeedEntry entry : entries) { for (FeedEntry entry : entries) {
updateEntry(feed, entry, subscriptions); ok &= updateEntry(feed, entry, subscriptions);
} }
} }
if (applicationSettingsService.get().isPubsubhubbub()) { if (applicationSettingsService.get().isPubsubhubbub()) {
handlePubSub(feed); handlePubSub(feed);
} }
if (!ok) {
feed.setDisabledUntil(null);
}
metricsBean.feedUpdated(); metricsBean.feedUpdated();
taskGiver.giveBack(feed); taskGiver.giveBack(feed);
} }
} }
private void updateEntry(final Feed feed, final FeedEntry entry, private boolean updateEntry(final Feed feed, final FeedEntry entry,
final List<FeedSubscription> subscriptions) { final List<FeedSubscription> subscriptions) {
Lock lock = locks.get(entry.getGuid()); String key = StringUtils.trimToEmpty(entry.getGuid() + entry.getUrl());
Lock lock = locks.get(key);
boolean locked = false;
try { try {
lock.tryLock(1, TimeUnit.MINUTES); locked = lock.tryLock(1, TimeUnit.MINUTES);
feedUpdateService.updateEntry(feed, entry, subscriptions); if (locked) {
feedUpdateService.updateEntry(feed, entry, subscriptions);
} else {
log.error("lock timeout for " + feed.getUrl() + " - " + key);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("interrupted while waiting for lock for " + feed.getUrl() log.error("interrupted while waiting for lock for " + feed.getUrl()
+ " : " + e.getMessage(), e); + " : " + e.getMessage(), e);
} finally { } finally {
lock.unlock(); if (locked) {
lock.unlock();
}
} }
return locked;
} }
private void handlePubSub(final Feed feed) { private void handlePubSub(final Feed feed) {

View File

@@ -85,19 +85,15 @@ public class FeedRefreshWorker {
private void update(Feed feed) { private void update(Feed feed) {
FetchedFeed fetchedFeed = null;
List<FeedEntry> entries = null;
String message = null;
int errorCount = 0;
Date disabledUntil = null;
try { try {
fetchedFeed = fetcher.fetch(feed.getUrl(), false, FetchedFeed fetchedFeed = fetcher.fetch(feed.getUrl(), false,
feed.getLastModifiedHeader(), feed.getEtagHeader()); feed.getLastModifiedHeader(), feed.getEtagHeader());
// stops here if NotModifiedException or any other exception is // stops here if NotModifiedException or any other exception is
// thrown // thrown
entries = fetchedFeed.getEntries(); List<FeedEntry> entries = fetchedFeed.getEntries();
Date disabledUntil = null;
if (applicationSettingsService.get().isHeavyLoad()) { if (applicationSettingsService.get().isHeavyLoad()) {
disabledUntil = FeedUtils.buildDisabledUntil( disabledUntil = FeedUtils.buildDisabledUntil(
fetchedFeed.getPublishedDate(), entries); fetchedFeed.getPublishedDate(), entries);
@@ -109,7 +105,12 @@ public class FeedRefreshWorker {
.getLastModifiedHeader()); .getLastModifiedHeader());
feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader()); feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader());
feed.setErrorCount(0);
feed.setMessage(null);
feed.setDisabledUntil(disabledUntil);
handlePubSub(feed, fetchedFeed); handlePubSub(feed, fetchedFeed);
feedRefreshUpdater.updateFeed(feed, entries);
} catch (NotModifiedException e) { } catch (NotModifiedException e) {
log.debug("Feed not modified (304) : " + feed.getUrl()); log.debug("Feed not modified (304) : " + feed.getUrl());
@@ -121,9 +122,8 @@ public class FeedRefreshWorker {
feed.setDisabledUntil(FeedUtils.buildDisabledUntil(publishedDate, feed.setDisabledUntil(FeedUtils.buildDisabledUntil(publishedDate,
feedEntries)); feedEntries));
taskGiver.giveBack(feed); taskGiver.giveBack(feed);
return;
} catch (Exception e) { } catch (Exception e) {
message = "Unable to refresh feed " + feed.getUrl() + " : " String message = "Unable to refresh feed " + feed.getUrl() + " : "
+ e.getMessage(); + e.getMessage();
if (e instanceof FeedException) { if (e instanceof FeedException) {
log.debug(e.getClass().getName() + " " + message); log.debug(e.getClass().getName() + " " + message);
@@ -131,16 +131,13 @@ public class FeedRefreshWorker {
log.debug(e.getClass().getName() + " " + message); log.debug(e.getClass().getName() + " " + message);
} }
errorCount = feed.getErrorCount() + 1; feed.setErrorCount(feed.getErrorCount() + 1);
disabledUntil = FeedUtils.buildDisabledUntil(errorCount); feed.setMessage(message);
feed.setDisabledUntil(FeedUtils.buildDisabledUntil(feed
.getErrorCount()));
taskGiver.giveBack(feed);
} }
feed.setErrorCount(errorCount);
feed.setMessage(message);
feed.setDisabledUntil(disabledUntil);
feedRefreshUpdater.updateFeed(feed, entries);
} }
private void handlePubSub(Feed feed, FetchedFeed fetchedFeed) { private void handlePubSub(Feed feed, FetchedFeed fetchedFeed) {

View File

@@ -37,7 +37,7 @@
StrictPooling=false StrictPooling=false
</Container> </Container>
<Container id="CommaFeedMessage" type="MESSAGE"> <Container id="CommaFeedMessage" type="MESSAGE">
InstanceLimit 50 InstanceLimi=50
ResourceAdapter=JMSCommaFeedAdapter ResourceAdapter=JMSCommaFeedAdapter
</Container> </Container>
<Resource id="JMSCommaFeedAdapter" type="ActiveMQResourceAdapter"> <Resource id="JMSCommaFeedAdapter" type="ActiveMQResourceAdapter">