merge push infos into feeds

This commit is contained in:
Athou
2013-06-05 21:50:26 +02:00
parent 7943c8e1e6
commit dbe7b48a04
9 changed files with 55 additions and 187 deletions

View File

@@ -7,7 +7,6 @@ import java.util.List;
import javax.ejb.Stateless;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.JoinType;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
@@ -60,7 +59,6 @@ public class FeedDAO extends GenericDAO<Feed> {
Root<Feed> root = query.from(getType());
query.where(getUpdatablePredicates(root).toArray(new Predicate[0]));
root.fetch(Feed_.pushInfo, JoinType.LEFT);
query.orderBy(builder.asc(root.get(Feed_.lastUpdated)));
@@ -88,4 +86,8 @@ public class FeedDAO extends GenericDAO<Feed> {
criteria.setMaxResults(limit);
return criteria.getSingleResult();
}
public List<Feed> findByTopic(String topic) {
return findByField(Feed_.pushTopic, topic);
}
}

View File

@@ -1,29 +0,0 @@
package com.commafeed.backend.dao;
import java.util.List;
import javax.ejb.Stateless;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.model.FeedPushInfo_;
@Stateless
public class FeedPushInfoDAO extends GenericDAO<FeedPushInfo> {
public List<FeedPushInfo> findByTopic(String topic, boolean includeFeed) {
CriteriaQuery<FeedPushInfo> query = builder.createQuery(getType());
Root<FeedPushInfo> root = query.from(getType());
if (includeFeed) {
root.fetch(FeedPushInfo_.feed);
}
query.where(builder.equal(root.get(FeedPushInfo_.topic), topic));
TypedQuery<FeedPushInfo> q = em.createQuery(query);
return q.getResultList();
}
}

View File

@@ -28,7 +28,6 @@ import com.commafeed.backend.dao.FeedSubscriptionDAO;
import com.commafeed.backend.model.ApplicationSettings;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.pubsubhubbub.SubscriptionHandler;
import com.commafeed.backend.services.ApplicationSettingsService;
@@ -163,9 +162,8 @@ public class FeedRefreshUpdater {
}
private void handlePubSub(final Feed feed) {
FeedPushInfo info = feed.getPushInfo();
if (info != null) {
Date lastPing = info.getLastPing();
if (feed.getPushHub() != null && feed.getPushTopic() != null) {
Date lastPing = feed.getPushLastPing();
Date now = Calendar.getInstance().getTime();
if (lastPing == null || lastPing.before(DateUtils.addDays(now, -3))) {
new Thread() {

View File

@@ -15,9 +15,7 @@ import com.commafeed.backend.MetricsBean;
import com.commafeed.backend.dao.FeedEntryDAO;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.services.ApplicationSettingsService;
import com.commafeed.backend.services.FeedPushInfoService;
import com.sun.syndication.io.FeedException;
public class FeedRefreshWorker {
@@ -37,9 +35,6 @@ public class FeedRefreshWorker {
@Inject
ApplicationSettingsService applicationSettingsService;
@Inject
FeedPushInfoService feedPushInfoService;
@Inject
MetricsBean metricsBean;
@@ -166,11 +161,8 @@ public class FeedRefreshWorker {
topic = "http://" + topic;
}
log.debug("feed {} has pubsub info: {}", feed.getUrl(), topic);
FeedPushInfo info = feed.getPushInfo();
if (info == null) {
info = feedPushInfoService.findOrCreate(feed, hub, topic);
}
feed.setPushInfo(info);
feed.setPushHub(hub);
feed.setPushTopic(topic);
}
}

View File

@@ -6,10 +6,8 @@ import java.util.Set;
import javax.persistence.Cacheable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.ManyToMany;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
@@ -73,15 +71,22 @@ public class Feed extends AbstractModel {
@Column(length = 255)
private String etagHeader;
@OneToOne(fetch = FetchType.LAZY, mappedBy = "feed")
private FeedPushInfo pushInfo;
@ManyToMany(mappedBy = "feeds")
private Set<FeedEntry> entries = Sets.newHashSet();
@OneToMany(mappedBy = "feed")
private Set<FeedSubscription> subscriptions;
@Column(length = 2048)
private String pushHub;
@Column(length = 2048)
@Index(name = "topic_index")
private String pushTopic;
@Temporal(TemporalType.TIMESTAMP)
private Date pushLastPing;
public Feed() {
}
@@ -186,12 +191,28 @@ public class Feed extends AbstractModel {
this.lastUpdateSuccess = lastUpdateSuccess;
}
public FeedPushInfo getPushInfo() {
return pushInfo;
public String getPushHub() {
return pushHub;
}
public void setPushInfo(FeedPushInfo pushInfo) {
this.pushInfo = pushInfo;
public void setPushHub(String pushHub) {
this.pushHub = pushHub;
}
public String getPushTopic() {
return pushTopic;
}
public void setPushTopic(String pushTopic) {
this.pushTopic = pushTopic;
}
public Date getPushLastPing() {
return pushLastPing;
}
public void setPushLastPing(Date pushLastPing) {
this.pushLastPing = pushLastPing;
}
}

View File

@@ -1,72 +0,0 @@
package com.commafeed.backend.model;
import java.util.Date;
import javax.persistence.Cacheable;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import org.hibernate.annotations.Cache;
import org.hibernate.annotations.CacheConcurrencyStrategy;
import org.hibernate.annotations.Index;
@Entity
@Table(name = "FEEDPUSHINFOS")
@SuppressWarnings("serial")
@Cacheable
@Cache(usage = CacheConcurrencyStrategy.READ_WRITE)
public class FeedPushInfo extends AbstractModel {
@JoinColumn(unique = true)
@OneToOne(fetch = FetchType.LAZY)
private Feed feed;
@Column(length = 2048, nullable = false)
@Index(name = "topic_index")
private String topic;
@Column(length = 2048, nullable = false)
private String hub;
@Temporal(TemporalType.TIMESTAMP)
private Date lastPing;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Feed getFeed() {
return feed;
}
public void setFeed(Feed feed) {
this.feed = feed;
}
public String getHub() {
return hub;
}
public void setHub(String hub) {
this.hub = hub;
}
public Date getLastPing() {
return lastPing;
}
public void setLastPing(Date lastPing) {
this.lastPing = lastPing;
}
}

View File

@@ -18,10 +18,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.HttpGetter;
import com.commafeed.backend.dao.FeedPushInfoDAO;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.feeds.FeedUtils;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.services.ApplicationSettingsService;
import com.google.common.collect.Lists;
@@ -34,12 +33,11 @@ public class SubscriptionHandler {
ApplicationSettingsService applicationSettingsService;
@Inject
FeedPushInfoDAO feedPushInfoDAO;
FeedDAO feedDAO;
public void subscribe(Feed feed) {
FeedPushInfo info = feed.getPushInfo();
String hub = info.getHub();
String topic = info.getTopic();
String hub = feed.getPushHub();
String topic = feed.getPushTopic();
String publicUrl = FeedUtils
.removeTrailingSlash(applicationSettingsService.get()
.getPublicUrl());
@@ -73,10 +71,10 @@ public class SubscriptionHandler {
if (code == 400
&& StringUtils.contains(message, pushpressError)) {
String[] tokens = message.split(" ");
info.setTopic(tokens[tokens.length - 1]);
feedPushInfoDAO.update(info);
feed.setPushTopic(tokens[tokens.length - 1]);
feedDAO.update(feed);
log.debug("handled pushpress subfeed {} : {}", topic,
info.getTopic());
feed.getPushTopic());
} else {
throw new Exception("Unexpected response code: " + code
+ " " + response.getStatusLine().getReasonPhrase()

View File

@@ -1,39 +0,0 @@
package com.commafeed.backend.services;
import java.util.List;
import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ejb.Singleton;
import javax.inject.Inject;
import com.commafeed.backend.dao.FeedPushInfoDAO;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.model.FeedPushInfo_;
@Singleton
public class FeedPushInfoService {
@Inject
FeedPushInfoDAO feedPushInfoDAO;
@Lock(LockType.WRITE)
public FeedPushInfo findOrCreate(Feed feed, String hub, String topic) {
FeedPushInfo info = null;
List<FeedPushInfo> infos = feedPushInfoDAO.findByField(
FeedPushInfo_.feed, feed);
if (infos.isEmpty()) {
info = new FeedPushInfo();
info.setFeed(feed);
info.setHub(hub);
info.setTopic(topic);
feedPushInfoDAO.save(info);
} else {
info = infos.get(0);
}
return info;
}
}

View File

@@ -21,12 +21,11 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.dao.FeedPushInfoDAO;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.feeds.FeedParser;
import com.commafeed.backend.feeds.FeedRefreshTaskGiver;
import com.commafeed.backend.feeds.FetchedFeed;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedPushInfo;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
@Path("/push")
@@ -39,7 +38,7 @@ public class PubSubHubbubCallbackREST {
HttpServletRequest request;
@Inject
FeedPushInfoDAO feedPushInfoDAO;
FeedDAO feedDAO;
@Inject
FeedParser parser;
@@ -60,15 +59,15 @@ public class PubSubHubbubCallbackREST {
log.debug("confirmation callback received for {}", topic);
List<FeedPushInfo> infos = feedPushInfoDAO.findByTopic(topic, false);
List<Feed> feeds = feedDAO.findByTopic(topic);
if (infos.isEmpty() == false) {
for (FeedPushInfo info : infos) {
if (feeds.isEmpty()) {
for (Feed feed : feeds) {
log.debug("activated push notifications for {}",
info.getTopic());
info.setLastPing(Calendar.getInstance().getTime());
feed.getPushTopic());
feed.setPushLastPing(Calendar.getInstance().getTime());
}
feedPushInfoDAO.update(infos);
feedDAO.update(feeds);
return Response.ok(challenge).build();
} else {
log.debug("rejecting callback: no push info for {}", topic);
@@ -86,10 +85,8 @@ public class PubSubHubbubCallbackREST {
String topic = fetchedFeed.getTopic();
if (topic != null) {
log.debug("content callback received for {}", topic);
List<FeedPushInfo> infos = feedPushInfoDAO.findByTopic(topic,
true);
for (FeedPushInfo info : infos) {
Feed feed = info.getFeed();
List<Feed> feeds = feedDAO.findByTopic(topic);
for (Feed feed : feeds) {
log.debug("pushing content to queue for {}", feed.getUrl());
taskGiver.add(feed);
}