diff --git a/src/main/java/com/commafeed/backend/HttpGetter.java b/src/main/java/com/commafeed/backend/HttpGetter.java index 51515e66..1f3dbce8 100644 --- a/src/main/java/com/commafeed/backend/HttpGetter.java +++ b/src/main/java/com/commafeed/backend/HttpGetter.java @@ -86,6 +86,7 @@ public class HttpGetter { HttpClient client = newClient(); try { HttpGet httpget = new HttpGet(url); + httpget.addHeader(HttpHeaders.ACCEPT_LANGUAGE, "en"); httpget.addHeader(HttpHeaders.PRAGMA, "No-cache"); httpget.addHeader(HttpHeaders.CACHE_CONTROL, "no-cache"); httpget.addHeader(HttpHeaders.USER_AGENT, diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index 28a075fd..c6b88a8a 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -14,6 +14,7 @@ import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.inject.Singleton; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,10 +67,10 @@ public class FeedRefreshUpdater { ApplicationSettings settings = applicationSettingsService.get(); int threads = Math.max(settings.getDatabaseUpdateThreads(), 1); log.info("Creating database pool with {} threads", threads); - locks = Striped.lazyWeakLock(threads); + locks = Striped.lazyWeakLock(threads * 1000); pool = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, - queue = new ArrayBlockingQueue(100 * threads)); + queue = new ArrayBlockingQueue(500 * threads)); pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { @@ -111,35 +112,48 @@ public class FeedRefreshUpdater { @Override public void run() { + boolean ok = true; if (entries != null) { List subscriptions = feedSubscriptionDAO .findByFeed(feed); for (FeedEntry entry : entries) { - updateEntry(feed, entry, subscriptions); + ok &= updateEntry(feed, entry, subscriptions); } } if (applicationSettingsService.get().isPubsubhubbub()) { handlePubSub(feed); } + if (!ok) { + feed.setDisabledUntil(null); + } metricsBean.feedUpdated(); taskGiver.giveBack(feed); } } - private void updateEntry(final Feed feed, final FeedEntry entry, + private boolean updateEntry(final Feed feed, final FeedEntry entry, final List subscriptions) { - Lock lock = locks.get(entry.getGuid()); + String key = StringUtils.trimToEmpty(entry.getGuid() + entry.getUrl()); + Lock lock = locks.get(key); + boolean locked = false; try { - lock.tryLock(1, TimeUnit.MINUTES); - feedUpdateService.updateEntry(feed, entry, subscriptions); + locked = lock.tryLock(1, TimeUnit.MINUTES); + if (locked) { + feedUpdateService.updateEntry(feed, entry, subscriptions); + } else { + log.error("lock timeout for " + feed.getUrl() + " - " + key); + } } catch (InterruptedException e) { log.error("interrupted while waiting for lock for " + feed.getUrl() + " : " + e.getMessage(), e); } finally { - lock.unlock(); + if (locked) { + lock.unlock(); + } } + return locked; } private void handlePubSub(final Feed feed) { diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index 8156e915..8188476f 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -85,19 +85,15 @@ public class FeedRefreshWorker { private void update(Feed feed) { - FetchedFeed fetchedFeed = null; - List entries = null; - - String message = null; - int errorCount = 0; - Date disabledUntil = null; try { - fetchedFeed = fetcher.fetch(feed.getUrl(), false, + FetchedFeed fetchedFeed = fetcher.fetch(feed.getUrl(), false, feed.getLastModifiedHeader(), feed.getEtagHeader()); // stops here if NotModifiedException or any other exception is // thrown - entries = fetchedFeed.getEntries(); + List entries = fetchedFeed.getEntries(); + + Date disabledUntil = null; if (applicationSettingsService.get().isHeavyLoad()) { disabledUntil = FeedUtils.buildDisabledUntil( fetchedFeed.getPublishedDate(), entries); @@ -109,7 +105,12 @@ public class FeedRefreshWorker { .getLastModifiedHeader()); feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader()); + feed.setErrorCount(0); + feed.setMessage(null); + feed.setDisabledUntil(disabledUntil); + handlePubSub(feed, fetchedFeed); + feedRefreshUpdater.updateFeed(feed, entries); } catch (NotModifiedException e) { log.debug("Feed not modified (304) : " + feed.getUrl()); @@ -121,9 +122,8 @@ public class FeedRefreshWorker { feed.setDisabledUntil(FeedUtils.buildDisabledUntil(publishedDate, feedEntries)); taskGiver.giveBack(feed); - return; } catch (Exception e) { - message = "Unable to refresh feed " + feed.getUrl() + " : " + String message = "Unable to refresh feed " + feed.getUrl() + " : " + e.getMessage(); if (e instanceof FeedException) { log.debug(e.getClass().getName() + " " + message); @@ -131,16 +131,13 @@ public class FeedRefreshWorker { log.debug(e.getClass().getName() + " " + message); } - errorCount = feed.getErrorCount() + 1; - disabledUntil = FeedUtils.buildDisabledUntil(errorCount); + feed.setErrorCount(feed.getErrorCount() + 1); + feed.setMessage(message); + feed.setDisabledUntil(FeedUtils.buildDisabledUntil(feed + .getErrorCount())); + + taskGiver.giveBack(feed); } - - feed.setErrorCount(errorCount); - feed.setMessage(message); - feed.setDisabledUntil(disabledUntil); - - feedRefreshUpdater.updateFeed(feed, entries); - } private void handlePubSub(Feed feed, FetchedFeed fetchedFeed) { diff --git a/src/main/tomee/conf/tomee.xml b/src/main/tomee/conf/tomee.xml index 26eed45f..6f9b9382 100644 --- a/src/main/tomee/conf/tomee.xml +++ b/src/main/tomee/conf/tomee.xml @@ -37,7 +37,7 @@ StrictPooling=false - InstanceLimit 50 + InstanceLimi=50 ResourceAdapter=JMSCommaFeedAdapter