wip: allow parallel update of feeds

This commit is contained in:
Athou
2013-05-22 00:07:13 +02:00
parent f75baf5cff
commit 15b7d685fe
7 changed files with 177 additions and 124 deletions

View File

@@ -0,0 +1,69 @@
package com.commafeed.backend;
import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
// A map that creates and stores lock objects for arbitrary keys values.
// Lock objects which are no longer referenced are automatically released during garbage collection.
// Author: Christian d'Heureuse, www.source-code.biz
// Based on IdMutexProvider by McDowell, http://illegalargumentexception.blogspot.ch/2008/04/java-synchronizing-on-transient-id.html
// See also http://stackoverflow.com/questions/5639870/simple-java-name-based-locks
public class LockMap<KEY> {
private WeakHashMap<KeyWrapper<KEY>, WeakReference<KeyWrapper<KEY>>> map;
public LockMap() {
map = new WeakHashMap<KeyWrapper<KEY>, WeakReference<KeyWrapper<KEY>>>();
}
// Returns a lock object for the specified key.
public synchronized Object get(KEY key) {
if (key == null) {
throw new NullPointerException();
}
KeyWrapper<KEY> newKeyWrapper = new KeyWrapper<KEY>(key);
WeakReference<KeyWrapper<KEY>> ref = map.get(newKeyWrapper);
KeyWrapper<KEY> oldKeyWrapper = (ref == null) ? null : ref.get();
if (oldKeyWrapper != null) {
return oldKeyWrapper;
}
map.put(newKeyWrapper,
new WeakReference<KeyWrapper<KEY>>(newKeyWrapper));
return newKeyWrapper;
}
// Returns the number of used entries in the map.
public synchronized int size() {
return map.size();
}
// KeyWrapper wraps a key value and is used in three ways:
// - as the key for the internal WeakHashMap
// - as the value for the internal WeakHashMap, additionally wrapped in a
// WeakReference
// - as the lock object associated to the key
private static class KeyWrapper<KEY> {
private KEY key;
private int hashCode;
public KeyWrapper(KEY key) {
this.key = key;
hashCode = key.hashCode();
}
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof KeyWrapper) {
return ((KeyWrapper<?>) obj).key.equals(key);
}
return false;
}
public int hashCode() {
return hashCode;
}
}
} // end class LockMap

View File

@@ -33,13 +33,17 @@ public class MetricsBean {
thisHour.feedsRefreshed++;
}
public void feedUpdated(int entriesCount, int statusesCount) {
public void feedUpdated() {
thisHour.feedsUpdated++;
thisMinute.feedsUpdated++;
thisHour.entriesInserted += entriesCount;
thisMinute.entriesInserted += entriesCount;
}
public void entryUpdated(int statusesCount) {
thisHour.entriesInserted++;
thisMinute.entriesInserted++;
thisHour.statusesInserted += statusesCount;
thisMinute.statusesInserted += statusesCount;
}

View File

@@ -14,30 +14,20 @@ import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedEntry_;
import com.commafeed.backend.model.Feed_;
import com.google.api.client.util.Lists;
import com.uaihebert.model.EasyCriteria;
@Stateless
public class FeedEntryDAO extends GenericDAO<FeedEntry> {
public List<FeedEntry> findByGuids(List<String> guids) {
List<String> hashes = Lists.newArrayList();
for (String guid : guids) {
hashes.add(DigestUtils.sha1Hex(guid));
}
public List<FeedEntry> findByGuid(String guid) {
String hash = DigestUtils.sha1Hex(guid);
EasyCriteria<FeedEntry> criteria = createCriteria();
criteria.setDistinctTrue();
criteria.andStringIn(FeedEntry_.guidHash.getName(), hashes);
criteria.andEquals(FeedEntry_.guidHash.getName(), hash);
criteria.leftJoinFetch(FeedEntry_.feeds.getName());
List<FeedEntry> list = Lists.newArrayList();
for (FeedEntry entry : criteria.getResultList()) {
if (guids.contains(entry.getGuid())) {
list.add(entry);
}
}
return list;
return criteria.getResultList();
}
public List<FeedEntry> findByFeed(Feed feed, int offset, int limit) {

View File

@@ -6,10 +6,8 @@ import java.util.List;
import java.util.Queue;
import javax.annotation.PostConstruct;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.time.DateUtils;
@@ -34,7 +32,8 @@ public class FeedRefreshTaskGiver {
private int backgroundThreads;
private Queue<Feed> addQueue = Queues.newConcurrentLinkedQueue();
private Queue<Feed> queue = Queues.newConcurrentLinkedQueue();
private Queue<Feed> takeQueue = Queues.newConcurrentLinkedQueue();
private Queue<Feed> giveBackQueue = Queues.newConcurrentLinkedQueue();
@PostConstruct
public void init() {
@@ -42,7 +41,6 @@ public class FeedRefreshTaskGiver {
.getBackgroundThreads();
}
@Lock(LockType.WRITE)
public void add(Feed feed) {
Date now = Calendar.getInstance().getTime();
boolean heavyLoad = applicationSettingsService.get().isHeavyLoad();
@@ -52,31 +50,34 @@ public class FeedRefreshTaskGiver {
feed.setEtagHeader(null);
feed.setLastModifiedHeader(null);
}
addQueue.add(feed);
}
@Lock(LockType.WRITE)
public Feed take() {
Feed feed = queue.poll();
public synchronized Feed take() {
Feed feed = takeQueue.poll();
if (feed == null) {
int count = Math.min(100, 5 * backgroundThreads);
List<Feed> feeds = feedDAO.findNextUpdatable(count);
int addQueueSize = queue.size();
for (int i = 0; i < addQueueSize; i++) {
feeds.add(addQueue.poll());
}
feeds.addAll(addQueue);
for (Feed f : feeds) {
queue.add(f);
takeQueue.add(f);
f.setLastUpdated(Calendar.getInstance().getTime());
}
feeds.addAll(giveBackQueue);
feedDAO.update(feeds);
feed = queue.poll();
feed = takeQueue.poll();
}
metricsBean.feedRefreshed();
return feed;
}
public void giveBack(Feed feed) {
giveBackQueue.add(feed);
}
}

View File

@@ -1,69 +1,68 @@
package com.commafeed.backend.feeds;
import java.util.Collection;
import java.util.List;
import javax.ejb.Asynchronous;
import javax.ejb.Stateless;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.LockMap;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.FeedSubscriptionDAO;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedEntryContent;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.pubsubhubbub.SubscriptionHandler;
import com.commafeed.backend.services.ApplicationSettingsService;
import com.commafeed.backend.services.FeedUpdateService;
@Stateless
public class FeedRefreshUpdater {
protected static Logger log = LoggerFactory
.getLogger(FeedRefreshUpdater.class);
private static LockMap<String> lockMap = new LockMap<String>();
@Inject
FeedUpdateService feedUpdateService;
@Inject
SubscriptionHandler handler;
@Inject
FeedRefreshTaskGiver taskGiver;
@Inject
FeedDAO feedDAO;
@Inject
ApplicationSettingsService applicationSettingsService;
@Asynchronous
public void updateEntries(Feed feed, Collection<FeedEntry> entries) {
if (CollectionUtils.isNotEmpty(entries)) {
@Inject
FeedSubscriptionDAO feedSubscriptionDAO;
public void updateFeed(Feed feed, Collection<FeedEntry> entries) {
taskGiver.giveBack(feed);
if (entries != null) {
List<FeedSubscription> subscriptions = feedSubscriptionDAO
.findByFeed(feed);
for (FeedEntry entry : entries) {
handleEntry(feed, entry);
updateEntry(feed, entry, subscriptions);
}
feedUpdateService.updateEntries(feed, entries);
}
feedDAO.update(feed);
if (applicationSettingsService.get().isPubsubhubbub()) {
handlePubSub(feed);
}
}
private void handleEntry(Feed feed, FeedEntry entry) {
String baseUri = feed.getLink();
FeedEntryContent content = entry.getContent();
content.setContent(FeedUtils.handleContent(content.getContent(),
baseUri));
String title = FeedUtils.handleContent(content.getTitle(), baseUri);
if (title != null) {
content.setTitle(title.substring(0, Math.min(2048, title.length())));
}
String author = entry.getAuthor();
if (author != null) {
entry.setAuthor(author.substring(0, Math.min(128, author.length())));
private void updateEntry(Feed feed, FeedEntry entry,
List<FeedSubscription> subscriptions) {
synchronized (lockMap.get(entry.getGuid())) {
feedUpdateService.updateEntry(feed, entry, subscriptions);
}
}

View File

@@ -5,12 +5,6 @@ import java.util.Collection;
import java.util.Date;
import javax.inject.Inject;
import javax.jms.JMSException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.StringUtils;
@@ -77,10 +71,7 @@ public class FeedRefreshWorker {
}
}
private void update(Feed feed) throws NotSupportedException,
SystemException, SecurityException, IllegalStateException,
RollbackException, HeuristicMixedException,
HeuristicRollbackException, JMSException {
private void update(Feed feed) {
FetchedFeed fetchedFeed = null;
Collection<FeedEntry> entries = null;
@@ -130,7 +121,7 @@ public class FeedRefreshWorker {
feed.setMessage(message);
feed.setDisabledUntil(disabledUntil);
feedRefreshUpdater.updateEntries(feed, entries);
feedRefreshUpdater.updateFeed(feed, entries);
}

View File

@@ -1,15 +1,10 @@
package com.commafeed.backend.services;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import javax.ejb.AccessTimeout;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.ejb.Stateless;
import javax.inject.Inject;
import org.apache.commons.lang.ObjectUtils;
@@ -19,13 +14,15 @@ import com.commafeed.backend.MetricsBean;
import com.commafeed.backend.dao.FeedEntryDAO;
import com.commafeed.backend.dao.FeedEntryStatusDAO;
import com.commafeed.backend.dao.FeedSubscriptionDAO;
import com.commafeed.backend.feeds.FeedUtils;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedEntryContent;
import com.commafeed.backend.model.FeedEntryStatus;
import com.commafeed.backend.model.FeedSubscription;
import com.google.common.collect.Lists;
@Singleton
@Stateless
public class FeedUpdateService {
@Inject
@@ -40,74 +37,76 @@ public class FeedUpdateService {
@Inject
MetricsBean metricsBean;
@Lock(LockType.WRITE)
@AccessTimeout(value = 5, unit = TimeUnit.MINUTES)
public void updateEntries(Feed feed, Collection<FeedEntry> entries) {
public void updateEntry(Feed feed, FeedEntry entry,
List<FeedSubscription> subscriptions) {
List<FeedEntry> existingEntries = getExistingEntries(entries);
List<FeedSubscription> subscriptions = feedSubscriptionDAO
.findByFeed(feed);
FeedEntry foundEntry = findEntry(
feedEntryDAO.findByGuid(entry.getGuid()), entry);
List<FeedEntry> entryUpdateList = Lists.newArrayList();
List<FeedEntryStatus> statusUpdateList = Lists.newArrayList();
for (FeedEntry entry : entries) {
if (foundEntry == null) {
handleEntry(feed, entry);
entry.setInserted(Calendar.getInstance().getTime());
entry.getFeeds().add(feed);
FeedEntry foundEntry = findEntry(existingEntries, entry);
foundEntry = entry;
} else {
if (foundEntry == null) {
entry.setInserted(Calendar.getInstance().getTime());
entry.getFeeds().add(feed);
entryUpdateList.add(entry);
} else {
boolean foundFeed = false;
for (Feed existingFeed : foundEntry.getFeeds()) {
if (ObjectUtils.equals(existingFeed.getId(), feed.getId())) {
foundFeed = true;
break;
}
}
if (!foundFeed) {
foundEntry.getFeeds().add(feed);
entryUpdateList.add(foundEntry);
}
if (!findFeed(foundEntry.getFeeds(), feed)) {
foundEntry.getFeeds().add(feed);
}
}
for (FeedEntry entry : entryUpdateList) {
if (foundEntry != null) {
List<FeedEntryStatus> statusUpdateList = Lists.newArrayList();
for (FeedSubscription sub : subscriptions) {
FeedEntryStatus status = new FeedEntryStatus();
status.setEntry(entry);
status.setEntry(foundEntry);
status.setSubscription(sub);
statusUpdateList.add(status);
}
feedEntryDAO.saveOrUpdate(foundEntry);
feedEntryStatusDAO.saveOrUpdate(statusUpdateList);
metricsBean.entryUpdated(statusUpdateList.size());
}
feedEntryDAO.saveOrUpdate(entryUpdateList);
feedEntryStatusDAO.saveOrUpdate(statusUpdateList);
metricsBean
.feedUpdated(entryUpdateList.size(), statusUpdateList.size());
}
private FeedEntry findEntry(List<FeedEntry> existingEntries, FeedEntry entry) {
FeedEntry foundEntry = null;
for (FeedEntry existingEntry : existingEntries) {
if (StringUtils.equals(entry.getGuid(), existingEntry.getGuid())
&& StringUtils.equals(entry.getUrl(),
existingEntry.getUrl())) {
foundEntry = existingEntry;
FeedEntry found = null;
for (FeedEntry existing : existingEntries) {
if (StringUtils.equals(entry.getGuid(), existing.getGuid())
&& StringUtils.equals(entry.getUrl(), existing.getUrl())) {
found = existing;
break;
}
}
return foundEntry;
return found;
}
private List<FeedEntry> getExistingEntries(Collection<FeedEntry> entries) {
List<String> guids = Lists.newArrayList();
for (FeedEntry entry : entries) {
guids.add(entry.getGuid());
private boolean findFeed(Set<Feed> feeds, Feed feed) {
boolean found = false;
for (Feed existingFeed : feeds) {
if (ObjectUtils.equals(existingFeed.getId(), feed.getId())) {
found = true;
break;
}
}
List<FeedEntry> existingEntries = guids.isEmpty() ? new ArrayList<FeedEntry>()
: feedEntryDAO.findByGuids(guids);
return existingEntries;
return found;
}
private void handleEntry(Feed feed, FeedEntry entry) {
String baseUri = feed.getLink();
FeedEntryContent content = entry.getContent();
content.setContent(FeedUtils.handleContent(content.getContent(),
baseUri));
String title = FeedUtils.handleContent(content.getTitle(), baseUri);
if (title != null) {
content.setTitle(title.substring(0, Math.min(2048, title.length())));
}
String author = entry.getAuthor();
if (author != null) {
entry.setAuthor(author.substring(0, Math.min(128, author.length())));
}
}
}