forked from Archives/Athou_commafeed
refactored the way we handle feed refresh queue
This commit is contained in:
@@ -39,18 +39,12 @@ public class FeedDAO extends GenericDAO<Feed> {
|
||||
public List<Feed> feeds;
|
||||
}
|
||||
|
||||
private List<Predicate> getUpdatablePredicates(Root<Feed> root, Date threshold) {
|
||||
private Predicate getUpdatablePredicate(Root<Feed> root, Date threshold) {
|
||||
|
||||
Predicate hasSubscriptions = builder.isNotEmpty(root.get(Feed_.subscriptions));
|
||||
Predicate isNull = builder.isNull(root.get(Feed_.disabledUntil));
|
||||
Predicate lessThan = builder.lessThan(root.get(Feed_.disabledUntil), threshold);
|
||||
|
||||
Predicate neverUpdated = builder.isNull(root.get(Feed_.lastUpdated));
|
||||
Predicate updatedBeforeThreshold = builder.lessThan(root.get(Feed_.lastUpdated), threshold);
|
||||
|
||||
Predicate disabledDateIsNull = builder.isNull(root.get(Feed_.disabledUntil));
|
||||
Predicate disabledDateIsInPast = builder.lessThan(root.get(Feed_.disabledUntil), new Date());
|
||||
|
||||
return Lists.newArrayList(hasSubscriptions, builder.or(neverUpdated, updatedBeforeThreshold),
|
||||
builder.or(disabledDateIsNull, disabledDateIsInPast));
|
||||
return builder.or(isNull, lessThan);
|
||||
}
|
||||
|
||||
public Long getUpdatableCount(Date threshold) {
|
||||
@@ -58,7 +52,7 @@ public class FeedDAO extends GenericDAO<Feed> {
|
||||
Root<Feed> root = query.from(getType());
|
||||
|
||||
query.select(builder.count(root));
|
||||
query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0]));
|
||||
query.where(getUpdatablePredicate(root, threshold));
|
||||
|
||||
TypedQuery<Long> q = em.createQuery(query);
|
||||
return q.getSingleResult();
|
||||
@@ -68,9 +62,8 @@ public class FeedDAO extends GenericDAO<Feed> {
|
||||
CriteriaQuery<Feed> query = builder.createQuery(getType());
|
||||
Root<Feed> root = query.from(getType());
|
||||
|
||||
query.where(getUpdatablePredicates(root, threshold).toArray(new Predicate[0]));
|
||||
|
||||
query.orderBy(builder.asc(root.get(Feed_.lastUpdated)));
|
||||
query.where(getUpdatablePredicate(root, threshold));
|
||||
query.orderBy(builder.asc(root.get(Feed_.disabledUntil)));
|
||||
|
||||
TypedQuery<Feed> q = em.createQuery(query);
|
||||
q.setMaxResults(count);
|
||||
|
||||
@@ -49,7 +49,6 @@ public class FeedParser {
|
||||
FetchedFeed fetchedFeed = new FetchedFeed();
|
||||
Feed feed = fetchedFeed.getFeed();
|
||||
List<FeedEntry> entries = fetchedFeed.getEntries();
|
||||
feed.setLastUpdated(new Date());
|
||||
|
||||
try {
|
||||
String encoding = FeedUtils.guessEncoding(xml);
|
||||
|
||||
@@ -129,14 +129,12 @@ public class FeedRefreshTaskGiver {
|
||||
|
||||
public void add(Feed feed) {
|
||||
Date threshold = getThreshold();
|
||||
if (feed.getLastUpdated() == null || feed.getLastUpdated().before(threshold)) {
|
||||
if (feed.getDisabledUntil() == null || feed.getDisabledUntil().before(threshold)) {
|
||||
addQueue.add(feed);
|
||||
}
|
||||
}
|
||||
|
||||
private void refill() {
|
||||
Date now = new Date();
|
||||
|
||||
int count = Math.min(100, 3 * backgroundThreads);
|
||||
List<Feed> feeds = null;
|
||||
if (applicationSettingsService.get().isCrawlingPaused()) {
|
||||
@@ -152,7 +150,7 @@ public class FeedRefreshTaskGiver {
|
||||
|
||||
Map<Long, Feed> map = Maps.newLinkedHashMap();
|
||||
for (Feed f : feeds) {
|
||||
f.setLastUpdated(now);
|
||||
f.setDisabledUntil(new Date());
|
||||
map.put(f.getId(), f);
|
||||
}
|
||||
takeQueue.addAll(map.values());
|
||||
@@ -160,7 +158,6 @@ public class FeedRefreshTaskGiver {
|
||||
size = giveBackQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
Feed f = giveBackQueue.poll();
|
||||
f.setLastUpdated(now);
|
||||
map.put(f.getId(), f);
|
||||
}
|
||||
|
||||
@@ -171,6 +168,7 @@ public class FeedRefreshTaskGiver {
|
||||
String normalized = FeedUtils.normalizeURL(feed.getUrl());
|
||||
feed.setNormalizedUrl(normalized);
|
||||
feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized));
|
||||
feed.setLastUpdated(new Date());
|
||||
giveBackQueue.add(feed);
|
||||
}
|
||||
|
||||
|
||||
@@ -129,7 +129,8 @@ public class FeedRefreshUpdater {
|
||||
handlePubSub(feed);
|
||||
}
|
||||
if (!ok) {
|
||||
feed.setDisabledUntil(null);
|
||||
// requeue asap
|
||||
feed.setDisabledUntil(new Date(0));
|
||||
}
|
||||
metricsBean.feedUpdated();
|
||||
taskGiver.giveBack(feed);
|
||||
|
||||
@@ -95,7 +95,6 @@ public class FeedRefreshWorker {
|
||||
}
|
||||
|
||||
private void update(Feed feed) {
|
||||
Date now = new Date();
|
||||
try {
|
||||
FetchedFeed fetchedFeed = fetcher.fetch(feed.getUrl(), false, feed.getLastModifiedHeader(), feed.getEtagHeader(),
|
||||
feed.getLastPublishedDate(), feed.getLastContentHash());
|
||||
@@ -103,13 +102,12 @@ public class FeedRefreshWorker {
|
||||
// thrown
|
||||
List<FeedEntry> entries = fetchedFeed.getEntries();
|
||||
|
||||
Date disabledUntil = null;
|
||||
Date disabledUntil = new Date();
|
||||
if (applicationSettingsService.get().isHeavyLoad()) {
|
||||
disabledUntil = FeedUtils.buildDisabledUntil(fetchedFeed.getFeed().getLastEntryDate(), fetchedFeed.getFeed()
|
||||
.getAverageEntryInterval());
|
||||
}
|
||||
|
||||
feed.setLastUpdateSuccess(now);
|
||||
feed.setLink(fetchedFeed.getFeed().getLink());
|
||||
feed.setLastModifiedHeader(fetchedFeed.getFeed().getLastModifiedHeader());
|
||||
feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader());
|
||||
|
||||
@@ -62,12 +62,6 @@ public class Feed extends AbstractModel {
|
||||
@Temporal(TemporalType.TIMESTAMP)
|
||||
private Date lastEntryDate;
|
||||
|
||||
/**
|
||||
* Last time we successfully refreshed the feed
|
||||
*/
|
||||
@Temporal(TemporalType.TIMESTAMP)
|
||||
private Date lastUpdateSuccess;
|
||||
|
||||
/**
|
||||
* error message while retrieving the feed
|
||||
*/
|
||||
@@ -229,14 +223,6 @@ public class Feed extends AbstractModel {
|
||||
this.etagHeader = etagHeader;
|
||||
}
|
||||
|
||||
public Date getLastUpdateSuccess() {
|
||||
return lastUpdateSuccess;
|
||||
}
|
||||
|
||||
public void setLastUpdateSuccess(Date lastUpdateSuccess) {
|
||||
this.lastUpdateSuccess = lastUpdateSuccess;
|
||||
}
|
||||
|
||||
public String getPushHub() {
|
||||
return pushHub;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.commafeed.backend.services;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import javax.ejb.Lock;
|
||||
import javax.ejb.LockType;
|
||||
import javax.ejb.Singleton;
|
||||
@@ -31,6 +33,7 @@ public class FeedService {
|
||||
feed.setUrlHash(DigestUtils.sha1Hex(url));
|
||||
feed.setNormalizedUrl(normalized);
|
||||
feed.setNormalizedUrlHash(DigestUtils.sha1Hex(normalized));
|
||||
feed.setDisabledUntil(new Date(0));
|
||||
feedDAO.saveOrUpdate(feed);
|
||||
}
|
||||
return feed;
|
||||
|
||||
@@ -31,7 +31,7 @@
|
||||
<createIndex tableName="FEEDENTRYCONTENTS" indexName="content_hash_index">
|
||||
<column name="contentHash" />
|
||||
</createIndex>
|
||||
<dropColumn tableName="FEEDENTRIES" columnName="author"/>
|
||||
<dropColumn tableName="FEEDENTRIES" columnName="author" />
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="athou" id="add-new-index">
|
||||
@@ -60,14 +60,14 @@
|
||||
<column name="lastContentHash" valueComputed="null"></column>
|
||||
</update>
|
||||
</changeSet>
|
||||
|
||||
|
||||
<changeSet author="athou" id="revamp-status-indexes">
|
||||
<createIndex tableName="FEEDENTRYSTATUSES" indexName="user_starred_updated">
|
||||
<column name="user_id"></column>
|
||||
<column name="starred"></column>
|
||||
<column name="entryUpdated"></column>
|
||||
</createIndex>
|
||||
|
||||
|
||||
<createIndex tableName="FEEDENTRYSTATUSES" indexName="sub_index">
|
||||
<column name="subscription_id"></column>
|
||||
</createIndex>
|
||||
@@ -77,19 +77,19 @@
|
||||
<dropIndex tableName="FEEDENTRYSTATUSES" indexName="user_read_sub_index" />
|
||||
<dropIndex tableName="FEEDENTRYSTATUSES" indexName="user_entry_index" />
|
||||
</changeSet>
|
||||
|
||||
|
||||
<changeSet author="athou" id="revamp-entries-indexes">
|
||||
<dropIndex tableName="FEEDENTRIES" indexName="updated_index" />
|
||||
<dropIndex tableName="FEEDENTRIES" indexName="inserted_index" />
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="athou" id="add-starred-index-for-cleanup">
|
||||
|
||||
<changeSet author="athou" id="add-starred-index-for-cleanup">
|
||||
<createIndex tableName="FEEDENTRYSTATUSES" indexName="insterted_starred_index">
|
||||
<column name="entryInserted"></column>
|
||||
<column name="starred"></column>
|
||||
</createIndex>
|
||||
</changeSet>
|
||||
|
||||
|
||||
<changeSet author="athou" id="add-title-hashes">
|
||||
<addColumn tableName="FEEDENTRYCONTENTS">
|
||||
<column name="titleHash" type="VARCHAR(40)" />
|
||||
@@ -100,5 +100,10 @@
|
||||
</createIndex>
|
||||
<dropIndex tableName="FEEDENTRYCONTENTS" indexName="content_hash_index" />
|
||||
</changeSet>
|
||||
|
||||
|
||||
<changeSet author="athou" id="drop-last-update-success">
|
||||
<dropIndex tableName="FEEDS" indexName="disabled_lastupdated_index" />
|
||||
<dropColumn tableName="FEEDS" columnName="lastUpdateSuccess" />
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
||||
Reference in New Issue
Block a user