From 15b7d685fee4cea3913dae981597f01e1b2daf8b Mon Sep 17 00:00:00 2001 From: Athou Date: Wed, 22 May 2013 00:07:13 +0200 Subject: [PATCH] wip: allow parallel update of feeds --- .../java/com/commafeed/backend/LockMap.java | 69 +++++++++++ .../com/commafeed/backend/MetricsBean.java | 12 +- .../commafeed/backend/dao/FeedEntryDAO.java | 18 +-- .../backend/feeds/FeedRefreshTaskGiver.java | 31 ++--- .../backend/feeds/FeedRefreshUpdater.java | 47 ++++---- .../backend/feeds/FeedRefreshWorker.java | 13 +- .../backend/services/FeedUpdateService.java | 111 +++++++++--------- 7 files changed, 177 insertions(+), 124 deletions(-) create mode 100644 src/main/java/com/commafeed/backend/LockMap.java diff --git a/src/main/java/com/commafeed/backend/LockMap.java b/src/main/java/com/commafeed/backend/LockMap.java new file mode 100644 index 00000000..4fef65c2 --- /dev/null +++ b/src/main/java/com/commafeed/backend/LockMap.java @@ -0,0 +1,69 @@ +package com.commafeed.backend; + +import java.lang.ref.WeakReference; +import java.util.WeakHashMap; + +// A map that creates and stores lock objects for arbitrary keys values. +// Lock objects which are no longer referenced are automatically released during garbage collection. +// Author: Christian d'Heureuse, www.source-code.biz +// Based on IdMutexProvider by McDowell, http://illegalargumentexception.blogspot.ch/2008/04/java-synchronizing-on-transient-id.html +// See also http://stackoverflow.com/questions/5639870/simple-java-name-based-locks +public class LockMap { + + private WeakHashMap, WeakReference>> map; + + public LockMap() { + map = new WeakHashMap, WeakReference>>(); + } + + // Returns a lock object for the specified key. + public synchronized Object get(KEY key) { + if (key == null) { + throw new NullPointerException(); + } + KeyWrapper newKeyWrapper = new KeyWrapper(key); + WeakReference> ref = map.get(newKeyWrapper); + KeyWrapper oldKeyWrapper = (ref == null) ? null : ref.get(); + if (oldKeyWrapper != null) { + return oldKeyWrapper; + } + map.put(newKeyWrapper, + new WeakReference>(newKeyWrapper)); + return newKeyWrapper; + } + + // Returns the number of used entries in the map. + public synchronized int size() { + return map.size(); + } + + // KeyWrapper wraps a key value and is used in three ways: + // - as the key for the internal WeakHashMap + // - as the value for the internal WeakHashMap, additionally wrapped in a + // WeakReference + // - as the lock object associated to the key + private static class KeyWrapper { + private KEY key; + private int hashCode; + + public KeyWrapper(KEY key) { + this.key = key; + hashCode = key.hashCode(); + } + + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof KeyWrapper) { + return ((KeyWrapper) obj).key.equals(key); + } + return false; + } + + public int hashCode() { + return hashCode; + } + } + +} // end class LockMap \ No newline at end of file diff --git a/src/main/java/com/commafeed/backend/MetricsBean.java b/src/main/java/com/commafeed/backend/MetricsBean.java index 5f46efc4..226d8721 100644 --- a/src/main/java/com/commafeed/backend/MetricsBean.java +++ b/src/main/java/com/commafeed/backend/MetricsBean.java @@ -33,13 +33,17 @@ public class MetricsBean { thisHour.feedsRefreshed++; } - public void feedUpdated(int entriesCount, int statusesCount) { + public void feedUpdated() { thisHour.feedsUpdated++; thisMinute.feedsUpdated++; - thisHour.entriesInserted += entriesCount; - thisMinute.entriesInserted += entriesCount; - + } + + public void entryUpdated(int statusesCount) { + + thisHour.entriesInserted++; + thisMinute.entriesInserted++; + thisHour.statusesInserted += statusesCount; thisMinute.statusesInserted += statusesCount; } diff --git a/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java b/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java index 420ccfee..f0ffaf16 100644 --- a/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java +++ b/src/main/java/com/commafeed/backend/dao/FeedEntryDAO.java @@ -14,30 +14,20 @@ import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; import com.commafeed.backend.model.FeedEntry_; import com.commafeed.backend.model.Feed_; -import com.google.api.client.util.Lists; import com.uaihebert.model.EasyCriteria; @Stateless public class FeedEntryDAO extends GenericDAO { - public List findByGuids(List guids) { - List hashes = Lists.newArrayList(); - for (String guid : guids) { - hashes.add(DigestUtils.sha1Hex(guid)); - } + public List findByGuid(String guid) { + String hash = DigestUtils.sha1Hex(guid); EasyCriteria criteria = createCriteria(); criteria.setDistinctTrue(); - criteria.andStringIn(FeedEntry_.guidHash.getName(), hashes); + criteria.andEquals(FeedEntry_.guidHash.getName(), hash); criteria.leftJoinFetch(FeedEntry_.feeds.getName()); - List list = Lists.newArrayList(); - for (FeedEntry entry : criteria.getResultList()) { - if (guids.contains(entry.getGuid())) { - list.add(entry); - } - } - return list; + return criteria.getResultList(); } public List findByFeed(Feed feed, int offset, int limit) { diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java index c98e1d83..a42edd84 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshTaskGiver.java @@ -6,10 +6,8 @@ import java.util.List; import java.util.Queue; import javax.annotation.PostConstruct; -import javax.ejb.Lock; -import javax.ejb.LockType; -import javax.ejb.Singleton; import javax.inject.Inject; +import javax.inject.Singleton; import org.apache.commons.lang3.time.DateUtils; @@ -34,7 +32,8 @@ public class FeedRefreshTaskGiver { private int backgroundThreads; private Queue addQueue = Queues.newConcurrentLinkedQueue(); - private Queue queue = Queues.newConcurrentLinkedQueue(); + private Queue takeQueue = Queues.newConcurrentLinkedQueue(); + private Queue giveBackQueue = Queues.newConcurrentLinkedQueue(); @PostConstruct public void init() { @@ -42,7 +41,6 @@ public class FeedRefreshTaskGiver { .getBackgroundThreads(); } - @Lock(LockType.WRITE) public void add(Feed feed) { Date now = Calendar.getInstance().getTime(); boolean heavyLoad = applicationSettingsService.get().isHeavyLoad(); @@ -52,31 +50,34 @@ public class FeedRefreshTaskGiver { feed.setEtagHeader(null); feed.setLastModifiedHeader(null); } - addQueue.add(feed); } - @Lock(LockType.WRITE) - public Feed take() { - Feed feed = queue.poll(); + public synchronized Feed take() { + Feed feed = takeQueue.poll(); if (feed == null) { int count = Math.min(100, 5 * backgroundThreads); List feeds = feedDAO.findNextUpdatable(count); - int addQueueSize = queue.size(); - for (int i = 0; i < addQueueSize; i++) { - feeds.add(addQueue.poll()); - } + feeds.addAll(addQueue); for (Feed f : feeds) { - queue.add(f); + takeQueue.add(f); f.setLastUpdated(Calendar.getInstance().getTime()); } + + feeds.addAll(giveBackQueue); + feedDAO.update(feeds); - feed = queue.poll(); + + feed = takeQueue.poll(); } metricsBean.feedRefreshed(); return feed; } + public void giveBack(Feed feed) { + giveBackQueue.add(feed); + } + } diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index 306827f5..1def0d7c 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -1,69 +1,68 @@ package com.commafeed.backend.feeds; import java.util.Collection; +import java.util.List; -import javax.ejb.Asynchronous; -import javax.ejb.Stateless; import javax.inject.Inject; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.commafeed.backend.LockMap; import com.commafeed.backend.dao.FeedDAO; +import com.commafeed.backend.dao.FeedSubscriptionDAO; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; -import com.commafeed.backend.model.FeedEntryContent; import com.commafeed.backend.model.FeedPushInfo; +import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.pubsubhubbub.SubscriptionHandler; import com.commafeed.backend.services.ApplicationSettingsService; import com.commafeed.backend.services.FeedUpdateService; -@Stateless public class FeedRefreshUpdater { protected static Logger log = LoggerFactory .getLogger(FeedRefreshUpdater.class); + private static LockMap lockMap = new LockMap(); + @Inject FeedUpdateService feedUpdateService; @Inject SubscriptionHandler handler; + @Inject + FeedRefreshTaskGiver taskGiver; + @Inject FeedDAO feedDAO; @Inject ApplicationSettingsService applicationSettingsService; - @Asynchronous - public void updateEntries(Feed feed, Collection entries) { - if (CollectionUtils.isNotEmpty(entries)) { + @Inject + FeedSubscriptionDAO feedSubscriptionDAO; + + public void updateFeed(Feed feed, Collection entries) { + taskGiver.giveBack(feed); + if (entries != null) { + List subscriptions = feedSubscriptionDAO + .findByFeed(feed); for (FeedEntry entry : entries) { - handleEntry(feed, entry); + updateEntry(feed, entry, subscriptions); } - feedUpdateService.updateEntries(feed, entries); } - feedDAO.update(feed); + if (applicationSettingsService.get().isPubsubhubbub()) { handlePubSub(feed); } } - private void handleEntry(Feed feed, FeedEntry entry) { - String baseUri = feed.getLink(); - FeedEntryContent content = entry.getContent(); - - content.setContent(FeedUtils.handleContent(content.getContent(), - baseUri)); - String title = FeedUtils.handleContent(content.getTitle(), baseUri); - if (title != null) { - content.setTitle(title.substring(0, Math.min(2048, title.length()))); - } - String author = entry.getAuthor(); - if (author != null) { - entry.setAuthor(author.substring(0, Math.min(128, author.length()))); + private void updateEntry(Feed feed, FeedEntry entry, + List subscriptions) { + synchronized (lockMap.get(entry.getGuid())) { + feedUpdateService.updateEntry(feed, entry, subscriptions); } } diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index 1239648d..ffc55a8e 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -5,12 +5,6 @@ import java.util.Collection; import java.util.Date; import javax.inject.Inject; -import javax.jms.JMSException; -import javax.transaction.HeuristicMixedException; -import javax.transaction.HeuristicRollbackException; -import javax.transaction.NotSupportedException; -import javax.transaction.RollbackException; -import javax.transaction.SystemException; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang3.StringUtils; @@ -77,10 +71,7 @@ public class FeedRefreshWorker { } } - private void update(Feed feed) throws NotSupportedException, - SystemException, SecurityException, IllegalStateException, - RollbackException, HeuristicMixedException, - HeuristicRollbackException, JMSException { + private void update(Feed feed) { FetchedFeed fetchedFeed = null; Collection entries = null; @@ -130,7 +121,7 @@ public class FeedRefreshWorker { feed.setMessage(message); feed.setDisabledUntil(disabledUntil); - feedRefreshUpdater.updateEntries(feed, entries); + feedRefreshUpdater.updateFeed(feed, entries); } diff --git a/src/main/java/com/commafeed/backend/services/FeedUpdateService.java b/src/main/java/com/commafeed/backend/services/FeedUpdateService.java index 0ac7b4b2..ea4cee91 100644 --- a/src/main/java/com/commafeed/backend/services/FeedUpdateService.java +++ b/src/main/java/com/commafeed/backend/services/FeedUpdateService.java @@ -1,15 +1,10 @@ package com.commafeed.backend.services; -import java.util.ArrayList; import java.util.Calendar; -import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.Set; -import javax.ejb.AccessTimeout; -import javax.ejb.Lock; -import javax.ejb.LockType; -import javax.ejb.Singleton; +import javax.ejb.Stateless; import javax.inject.Inject; import org.apache.commons.lang.ObjectUtils; @@ -19,13 +14,15 @@ import com.commafeed.backend.MetricsBean; import com.commafeed.backend.dao.FeedEntryDAO; import com.commafeed.backend.dao.FeedEntryStatusDAO; import com.commafeed.backend.dao.FeedSubscriptionDAO; +import com.commafeed.backend.feeds.FeedUtils; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; +import com.commafeed.backend.model.FeedEntryContent; import com.commafeed.backend.model.FeedEntryStatus; import com.commafeed.backend.model.FeedSubscription; import com.google.common.collect.Lists; -@Singleton +@Stateless public class FeedUpdateService { @Inject @@ -40,74 +37,76 @@ public class FeedUpdateService { @Inject MetricsBean metricsBean; - @Lock(LockType.WRITE) - @AccessTimeout(value = 5, unit = TimeUnit.MINUTES) - public void updateEntries(Feed feed, Collection entries) { + public void updateEntry(Feed feed, FeedEntry entry, + List subscriptions) { - List existingEntries = getExistingEntries(entries); - List subscriptions = feedSubscriptionDAO - .findByFeed(feed); + FeedEntry foundEntry = findEntry( + feedEntryDAO.findByGuid(entry.getGuid()), entry); - List entryUpdateList = Lists.newArrayList(); - List statusUpdateList = Lists.newArrayList(); - for (FeedEntry entry : entries) { + if (foundEntry == null) { + handleEntry(feed, entry); + entry.setInserted(Calendar.getInstance().getTime()); + entry.getFeeds().add(feed); - FeedEntry foundEntry = findEntry(existingEntries, entry); + foundEntry = entry; + } else { - if (foundEntry == null) { - entry.setInserted(Calendar.getInstance().getTime()); - entry.getFeeds().add(feed); - entryUpdateList.add(entry); - } else { - boolean foundFeed = false; - for (Feed existingFeed : foundEntry.getFeeds()) { - if (ObjectUtils.equals(existingFeed.getId(), feed.getId())) { - foundFeed = true; - break; - } - } - - if (!foundFeed) { - foundEntry.getFeeds().add(feed); - entryUpdateList.add(foundEntry); - } + if (!findFeed(foundEntry.getFeeds(), feed)) { + foundEntry.getFeeds().add(feed); } } - for (FeedEntry entry : entryUpdateList) { + + if (foundEntry != null) { + List statusUpdateList = Lists.newArrayList(); for (FeedSubscription sub : subscriptions) { FeedEntryStatus status = new FeedEntryStatus(); - status.setEntry(entry); + status.setEntry(foundEntry); status.setSubscription(sub); statusUpdateList.add(status); } + feedEntryDAO.saveOrUpdate(foundEntry); + feedEntryStatusDAO.saveOrUpdate(statusUpdateList); + metricsBean.entryUpdated(statusUpdateList.size()); } - - feedEntryDAO.saveOrUpdate(entryUpdateList); - feedEntryStatusDAO.saveOrUpdate(statusUpdateList); - metricsBean - .feedUpdated(entryUpdateList.size(), statusUpdateList.size()); } private FeedEntry findEntry(List existingEntries, FeedEntry entry) { - FeedEntry foundEntry = null; - for (FeedEntry existingEntry : existingEntries) { - if (StringUtils.equals(entry.getGuid(), existingEntry.getGuid()) - && StringUtils.equals(entry.getUrl(), - existingEntry.getUrl())) { - foundEntry = existingEntry; + FeedEntry found = null; + for (FeedEntry existing : existingEntries) { + if (StringUtils.equals(entry.getGuid(), existing.getGuid()) + && StringUtils.equals(entry.getUrl(), existing.getUrl())) { + found = existing; break; } } - return foundEntry; + return found; } - private List getExistingEntries(Collection entries) { - List guids = Lists.newArrayList(); - for (FeedEntry entry : entries) { - guids.add(entry.getGuid()); + private boolean findFeed(Set feeds, Feed feed) { + boolean found = false; + for (Feed existingFeed : feeds) { + if (ObjectUtils.equals(existingFeed.getId(), feed.getId())) { + found = true; + break; + } } - List existingEntries = guids.isEmpty() ? new ArrayList() - : feedEntryDAO.findByGuids(guids); - return existingEntries; + return found; } + + private void handleEntry(Feed feed, FeedEntry entry) { + String baseUri = feed.getLink(); + FeedEntryContent content = entry.getContent(); + + content.setContent(FeedUtils.handleContent(content.getContent(), + baseUri)); + String title = FeedUtils.handleContent(content.getTitle(), baseUri); + if (title != null) { + content.setTitle(title.substring(0, Math.min(2048, title.length()))); + } + String author = entry.getAuthor(); + if (author != null) { + entry.setAuthor(author.substring(0, Math.min(128, author.length()))); + } + } + }