initial pubsubhubbub support (#44)

This commit is contained in:
Athou
2013-05-20 14:06:09 +02:00
parent badc830535
commit c88d3021b8
14 changed files with 389 additions and 7 deletions

View File

@@ -179,7 +179,7 @@ public class HttpGetter {
}
private static HttpClient newClient() {
public static HttpClient newClient() {
DefaultHttpClient client = new DefaultHttpClient();
SSLSocketFactory ssf = new SSLSocketFactory(SSL_CONTEXT, VERIFIER);

View File

@@ -7,6 +7,7 @@ import java.util.List;
import javax.ejb.Stateless;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.JoinType;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
@@ -25,6 +26,7 @@ public class FeedDAO extends GenericDAO<Feed> {
public List<Feed> findNextUpdatable(int count) {
CriteriaQuery<Feed> query = builder.createQuery(getType());
Root<Feed> root = query.from(getType());
root.fetch(Feed_.pushInfo, JoinType.LEFT);
Date now = Calendar.getInstance().getTime();

View File

@@ -0,0 +1,27 @@
package com.commafeed.backend.dao;
import java.util.List;
import javax.ejb.Stateless;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.model.FeedPushInfo_;
@Stateless
public class FeedPushInfoDAO extends GenericDAO<FeedPushInfo> {
public List<FeedPushInfo> findByTopic(String topic) {
CriteriaQuery<FeedPushInfo> query = builder.createQuery(getType());
Root<FeedPushInfo> root = query.from(getType());
root.fetch(FeedPushInfo_.feed);
query.where(builder.equal(root.get(FeedPushInfo_.topic), topic));
TypedQuery<FeedPushInfo> q = em.createQuery(query);
return q.getResultList();
}
}

View File

@@ -61,12 +61,12 @@ public class FeedFetcher {
Document doc = Jsoup.parse(html, baseUri);
String root = doc.children().get(0).tagName();
if ("html".equals(root)) {
Elements rss = doc.select("link[type=application/rss+xml]");
Elements atom = doc.select("link[type=application/atom+xml]");
if (!rss.isEmpty()) {
foundUrl = rss.get(0).attr("abs:href").toString();
} else if (!atom.isEmpty()) {
Elements rss = doc.select("link[type=application/rss+xml]");
if (!atom.isEmpty()) {
foundUrl = atom.get(0).attr("abs:href").toString();
} else if (!rss.isEmpty()) {
foundUrl = rss.get(0).attr("abs:href").toString();
}
}
return foundUrl;

View File

@@ -8,6 +8,8 @@ import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.InputSource;
import com.commafeed.backend.model.Feed;
@@ -20,11 +22,14 @@ import com.sun.syndication.feed.synd.SyndContent;
import com.sun.syndication.feed.synd.SyndEnclosure;
import com.sun.syndication.feed.synd.SyndEntry;
import com.sun.syndication.feed.synd.SyndFeed;
import com.sun.syndication.feed.synd.SyndLink;
import com.sun.syndication.io.FeedException;
import com.sun.syndication.io.SyndFeedInput;
public class FeedParser {
private static Logger log = LoggerFactory.getLogger(FeedParser.class);
private static final Date START = new Date(0);
private static final Date END = new Date(1000l * Integer.MAX_VALUE);
@@ -50,6 +55,8 @@ public class FeedParser {
SyndFeed rss = new SyndFeedInput().build(source);
fetchedFeed.setTitle(rss.getTitle());
fetchedFeed.setHub(findHub(rss));
fetchedFeed.setTopic(findSelf(rss));
feed.setUrl(feedUrl);
feed.setLink(rss.getLink());
List<SyndEntry> items = rss.getEntries();
@@ -124,4 +131,26 @@ public class FeedParser {
return content;
}
@SuppressWarnings("unchecked")
private String findHub(SyndFeed feed) {
for (SyndLink l : (List<SyndLink>) feed.getLinks()) {
if ("hub".equalsIgnoreCase(l.getRel())) {
log.info("found hub {} for feed {}", l.getHref(), feed.getLink());
return l.getHref();
}
}
return null;
}
@SuppressWarnings("unchecked")
private String findSelf(SyndFeed feed) {
for (SyndLink l : (List<SyndLink>) feed.getLinks()) {
if ("self".equalsIgnoreCase(l.getRel())) {
log.info("found self {} for feed {}", l.getHref(), feed.getLink());
return l.getHref();
}
}
return null;
}
}

View File

@@ -7,18 +7,28 @@ import javax.ejb.Stateless;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.pubsubhubbub.SubscriptionHandler;
import com.commafeed.backend.services.FeedUpdateService;
@Stateless
public class FeedRefreshUpdater {
protected static Logger log = LoggerFactory
.getLogger(FeedRefreshUpdater.class);
@Inject
FeedUpdateService feedUpdateService;
@Inject
SubscriptionHandler handler;
@Inject
FeedDAO feedDAO;
@@ -28,6 +38,14 @@ public class FeedRefreshUpdater {
feedUpdateService.updateEntries(feed, entries);
}
feedDAO.update(feed);
handlePubSub(feed);
}
private void handlePubSub(Feed feed) {
FeedPushInfo info = feed.getPushInfo();
if (info != null && info.isActive() == false) {
handler.subscribe(feed);
}
}
}

View File

@@ -13,12 +13,14 @@ import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.HttpGetter.NotModifiedException;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedPushInfo;
import com.sun.syndication.io.FeedException;
public class FeedRefreshWorker {
@@ -78,7 +80,6 @@ public class FeedRefreshWorker {
try {
fetchedFeed = fetcher.fetch(feed.getUrl(), false,
feed.getLastModifiedHeader(), feed.getEtagHeader());
// stops here if NotModifiedException or any other exception is
// thrown
entries = fetchedFeed.getEntries();
@@ -90,6 +91,8 @@ public class FeedRefreshWorker {
.getLastModifiedHeader());
feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader());
handlePubSub(feed, fetchedFeed);
} catch (NotModifiedException e) {
log.debug("Feed not modified (304) : " + feed.getUrl());
if (feed.getErrorCount() == 0) {
@@ -112,12 +115,32 @@ public class FeedRefreshWorker {
feed.setErrorCount(errorCount);
feed.setMessage(message);
feed.setDisabledUntil(disabledUntil);
log.info(feed.getUrl() + " disabledUntil " + disabledUntil);
feedRefreshUpdater.updateEntries(feed, entries);
}
private void handlePubSub(Feed feed, FetchedFeed fetchedFeed) {
String hub = fetchedFeed.getHub();
String topic = fetchedFeed.getTopic();
log.info("Checking for pubsub infos for {}", feed.getUrl());
if (hub != null && topic != null) {
log.info("feed {} has pubsub info: {}", feed.getUrl(), topic);
FeedPushInfo info = feed.getPushInfo();
if (info == null) {
info = new FeedPushInfo();
}
if (!StringUtils.equals(hub, info.getHub())
|| !StringUtils.equals(topic, info.getTopic())) {
info.setHub(hub);
info.setTopic(topic);
info.setFeed(feed);
info.setActive(false);
}
feed.setPushInfo(info);
}
}
private Feed getNextFeed() {
return taskGiver.take();
}

View File

@@ -16,6 +16,16 @@ public class FetchedFeed {
private long fetchDuration;
private Date publishedDate;
/**
* pubsubhubbub hub url
*/
private String hub;
/**
* pubsubhubbub topic
*/
private String topic;
public Feed getFeed() {
return feed;
}
@@ -56,4 +66,20 @@ public class FetchedFeed {
this.publishedDate = publishedDate;
}
public String getHub() {
return hub;
}
public void setHub(String hub) {
this.hub = hub;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
}

View File

@@ -3,10 +3,13 @@ package com.commafeed.backend.model;
import java.util.Date;
import java.util.Set;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.ManyToMany;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
@@ -64,6 +67,10 @@ public class Feed extends AbstractModel {
@Column(length = 255)
private String etagHeader;
@OneToOne(fetch = FetchType.LAZY, mappedBy = "feed", cascade = {
CascadeType.PERSIST, CascadeType.MERGE })
private FeedPushInfo pushInfo;
@ManyToMany(mappedBy = "feeds")
private Set<FeedEntry> entries = Sets.newHashSet();
@@ -174,4 +181,12 @@ public class Feed extends AbstractModel {
this.lastUpdateSuccess = lastUpdateSuccess;
}
public FeedPushInfo getPushInfo() {
return pushInfo;
}
public void setPushInfo(FeedPushInfo pushInfo) {
this.pushInfo = pushInfo;
}
}

View File

@@ -0,0 +1,60 @@
package com.commafeed.backend.model;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import org.hibernate.annotations.Index;
@Entity
@Table(name = "FEEDPUSHINFOS")
@SuppressWarnings("serial")
public class FeedPushInfo extends AbstractModel {
@OneToOne(fetch = FetchType.LAZY)
private Feed feed;
@Column(length = 2048, nullable = false)
@Index(name = "topic_index")
private String topic;
@Column(length = 2048, nullable = false)
private String hub;
private boolean active;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Feed getFeed() {
return feed;
}
public void setFeed(Feed feed) {
this.feed = feed;
}
public String getHub() {
return hub;
}
public void setHub(String hub) {
this.hub = hub;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
}

View File

@@ -0,0 +1,77 @@
package com.commafeed.backend.pubsubhubbub;
import java.util.List;
import javax.ejb.Asynchronous;
import javax.inject.Inject;
import javax.ws.rs.core.MediaType;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.HttpGetter;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedPushInfo;
import com.commafeed.backend.services.ApplicationSettingsService;
import com.google.common.collect.Lists;
public class SubscriptionHandler {
private static Logger log = LoggerFactory
.getLogger(SubscriptionHandler.class);
@Inject
ApplicationSettingsService applicationSettingsService;
@Asynchronous
public void subscribe(Feed feed) {
FeedPushInfo info = feed.getPushInfo();
String hub = info.getHub();
String topic = info.getTopic();
String publicUrl = applicationSettingsService.get().getPublicUrl();
if (publicUrl.endsWith("/")) {
publicUrl = publicUrl.substring(0, publicUrl.length() - 1);
}
log.info("sending new pubsub subscription to {} for {}", hub, topic);
HttpPost post = new HttpPost(hub);
List<NameValuePair> nvp = Lists.newArrayList();
nvp.add(new BasicNameValuePair("hub.callback", publicUrl
+ "/rest/push/callback"));
nvp.add(new BasicNameValuePair("hub.topic", topic));
nvp.add(new BasicNameValuePair("hub.mode", "subscribe"));
nvp.add(new BasicNameValuePair("hub.verify", "async"));
nvp.add(new BasicNameValuePair("hub.secret", ""));
nvp.add(new BasicNameValuePair("hub.verify_token", ""));
nvp.add(new BasicNameValuePair("hub.lease_seconds", ""));
post.setHeader(HttpHeaders.USER_AGENT, "CommaFeed");
post.setHeader(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_FORM_URLENCODED);
HttpClient client = HttpGetter.newClient();
try {
post.setEntity(new UrlEncodedFormEntity(nvp));
HttpResponse response = client.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code != 204 && code != 202) {
throw new Exception("Unexpected response code: " + code + " "
+ response.getStatusLine().getReasonPhrase());
}
log.info("subscribed to {} for {}", hub, topic);
} catch (Exception e) {
log.error("Could not subscribe to pubsub: " + e.getMessage(), e);
} finally {
client.getConnectionManager().shutdown();
}
}
}

View File

@@ -10,6 +10,7 @@ import com.commafeed.frontend.rest.resources.ApiDocumentationREST;
import com.commafeed.frontend.rest.resources.CategoryREST;
import com.commafeed.frontend.rest.resources.EntryREST;
import com.commafeed.frontend.rest.resources.FeedREST;
import com.commafeed.frontend.rest.resources.PubSubHubbubCallbackREST;
import com.commafeed.frontend.rest.resources.ServerREST;
import com.commafeed.frontend.rest.resources.UserREST;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
@@ -36,6 +37,7 @@ public class RESTApplication extends Application {
set.add(AdminREST.class);
set.add(ApiDocumentationREST.class);
set.add(PubSubHubbubCallbackREST.class);
return set;
}

View File

@@ -125,6 +125,7 @@ public class AdminREST extends AbstractResourceREST {
userModel = new UserModel();
userModel.setId(user.getId());
userModel.setName(user.getName());
userModel.setEmail(user.getEmail());
userModel.setEnabled(!user.isDisabled());
users.put(key, userModel);
}

View File

@@ -0,0 +1,102 @@
package com.commafeed.frontend.rest.resources;
import java.util.List;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.commafeed.backend.dao.FeedPushInfoDAO;
import com.commafeed.backend.feeds.FeedParser;
import com.commafeed.backend.feeds.FeedRefreshTaskGiver;
import com.commafeed.backend.feeds.FetchedFeed;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedPushInfo;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
@Path("/push")
public class PubSubHubbubCallbackREST {
private static Logger log = LoggerFactory
.getLogger(PubSubHubbubCallbackREST.class);
@Context
HttpServletRequest request;
@Inject
FeedPushInfoDAO feedPushInfoDAO;
@Inject
FeedParser parser;
@Inject
FeedRefreshTaskGiver taskGiver;
@Path("/callback")
@GET
@Produces(MediaType.TEXT_PLAIN)
public Response verify(@QueryParam("hub.mode") String mode,
@QueryParam("hub.topic") String topic,
@QueryParam("hub.challenge") String challenge,
@QueryParam("hub.lease_seconds") String leaseSeconds,
@QueryParam("hub.verify_token") String verifyToken) {
Preconditions.checkArgument(StringUtils.isNotEmpty(topic));
Preconditions.checkArgument("subscribe".equals(mode));
log.info("confirmation callback received for {}", topic);
List<FeedPushInfo> infos = feedPushInfoDAO.findByTopic(topic);
if (infos.isEmpty() == false) {
for (FeedPushInfo info : infos) {
log.info("activated push notifications for {}", info.getFeed()
.getUrl());
info.setActive(true);
}
feedPushInfoDAO.update(infos);
return Response.ok(challenge).build();
} else {
log.info("rejecting callback: no push info for {}", topic);
return Response.status(Status.NOT_FOUND).build();
}
}
@Path("/callback")
@POST
@Consumes({ MediaType.APPLICATION_ATOM_XML, "application/rss+xml" })
public Response callback() {
log.info("content callback received");
try {
byte[] bytes = IOUtils.toByteArray(request.getInputStream());
log.info(new String(bytes));
FetchedFeed fetchedFeed = parser.parse(null, bytes);
String topic = fetchedFeed.getTopic();
if (topic != null) {
log.info("content callback received for {}", topic);
List<FeedPushInfo> infos = feedPushInfoDAO.findByTopic(topic);
for (FeedPushInfo info : infos) {
Feed feed = info.getFeed();
log.info("pushing content to queue for {}", feed.getUrl());
taskGiver.add(feed);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return Response.ok().build();
}
}