diff --git a/src/main/java/com/commafeed/backend/HttpGetter.java b/src/main/java/com/commafeed/backend/HttpGetter.java index 1b59905d..364743e4 100644 --- a/src/main/java/com/commafeed/backend/HttpGetter.java +++ b/src/main/java/com/commafeed/backend/HttpGetter.java @@ -179,7 +179,7 @@ public class HttpGetter { } - private static HttpClient newClient() { + public static HttpClient newClient() { DefaultHttpClient client = new DefaultHttpClient(); SSLSocketFactory ssf = new SSLSocketFactory(SSL_CONTEXT, VERIFIER); diff --git a/src/main/java/com/commafeed/backend/dao/FeedDAO.java b/src/main/java/com/commafeed/backend/dao/FeedDAO.java index 6f15150b..3c101017 100644 --- a/src/main/java/com/commafeed/backend/dao/FeedDAO.java +++ b/src/main/java/com/commafeed/backend/dao/FeedDAO.java @@ -7,6 +7,7 @@ import java.util.List; import javax.ejb.Stateless; import javax.persistence.TypedQuery; import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.JoinType; import javax.persistence.criteria.Predicate; import javax.persistence.criteria.Root; @@ -25,6 +26,7 @@ public class FeedDAO extends GenericDAO { public List findNextUpdatable(int count) { CriteriaQuery query = builder.createQuery(getType()); Root root = query.from(getType()); + root.fetch(Feed_.pushInfo, JoinType.LEFT); Date now = Calendar.getInstance().getTime(); diff --git a/src/main/java/com/commafeed/backend/dao/FeedPushInfoDAO.java b/src/main/java/com/commafeed/backend/dao/FeedPushInfoDAO.java new file mode 100644 index 00000000..1cec3645 --- /dev/null +++ b/src/main/java/com/commafeed/backend/dao/FeedPushInfoDAO.java @@ -0,0 +1,27 @@ +package com.commafeed.backend.dao; + +import java.util.List; + +import javax.ejb.Stateless; +import javax.persistence.TypedQuery; +import javax.persistence.criteria.CriteriaQuery; +import javax.persistence.criteria.Root; + +import com.commafeed.backend.model.FeedPushInfo; +import com.commafeed.backend.model.FeedPushInfo_; + +@Stateless +public class FeedPushInfoDAO extends GenericDAO { + + public List findByTopic(String topic) { + + CriteriaQuery query = builder.createQuery(getType()); + Root root = query.from(getType()); + root.fetch(FeedPushInfo_.feed); + query.where(builder.equal(root.get(FeedPushInfo_.topic), topic)); + + TypedQuery q = em.createQuery(query); + return q.getResultList(); + } + +} diff --git a/src/main/java/com/commafeed/backend/feeds/FeedFetcher.java b/src/main/java/com/commafeed/backend/feeds/FeedFetcher.java index 87433fe4..5b0a746c 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedFetcher.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedFetcher.java @@ -61,12 +61,12 @@ public class FeedFetcher { Document doc = Jsoup.parse(html, baseUri); String root = doc.children().get(0).tagName(); if ("html".equals(root)) { - Elements rss = doc.select("link[type=application/rss+xml]"); Elements atom = doc.select("link[type=application/atom+xml]"); - if (!rss.isEmpty()) { - foundUrl = rss.get(0).attr("abs:href").toString(); - } else if (!atom.isEmpty()) { + Elements rss = doc.select("link[type=application/rss+xml]"); + if (!atom.isEmpty()) { foundUrl = atom.get(0).attr("abs:href").toString(); + } else if (!rss.isEmpty()) { + foundUrl = rss.get(0).attr("abs:href").toString(); } } return foundUrl; diff --git a/src/main/java/com/commafeed/backend/feeds/FeedParser.java b/src/main/java/com/commafeed/backend/feeds/FeedParser.java index 2a8349b3..e01353b1 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedParser.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedParser.java @@ -8,6 +8,8 @@ import java.util.List; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xml.sax.InputSource; import com.commafeed.backend.model.Feed; @@ -20,11 +22,14 @@ import com.sun.syndication.feed.synd.SyndContent; import com.sun.syndication.feed.synd.SyndEnclosure; import com.sun.syndication.feed.synd.SyndEntry; import com.sun.syndication.feed.synd.SyndFeed; +import com.sun.syndication.feed.synd.SyndLink; import com.sun.syndication.io.FeedException; import com.sun.syndication.io.SyndFeedInput; public class FeedParser { + private static Logger log = LoggerFactory.getLogger(FeedParser.class); + private static final Date START = new Date(0); private static final Date END = new Date(1000l * Integer.MAX_VALUE); @@ -50,6 +55,8 @@ public class FeedParser { SyndFeed rss = new SyndFeedInput().build(source); fetchedFeed.setTitle(rss.getTitle()); + fetchedFeed.setHub(findHub(rss)); + fetchedFeed.setTopic(findSelf(rss)); feed.setUrl(feedUrl); feed.setLink(rss.getLink()); List items = rss.getEntries(); @@ -124,4 +131,26 @@ public class FeedParser { return content; } + @SuppressWarnings("unchecked") + private String findHub(SyndFeed feed) { + for (SyndLink l : (List) feed.getLinks()) { + if ("hub".equalsIgnoreCase(l.getRel())) { + log.info("found hub {} for feed {}", l.getHref(), feed.getLink()); + return l.getHref(); + } + } + return null; + } + + @SuppressWarnings("unchecked") + private String findSelf(SyndFeed feed) { + for (SyndLink l : (List) feed.getLinks()) { + if ("self".equalsIgnoreCase(l.getRel())) { + log.info("found self {} for feed {}", l.getHref(), feed.getLink()); + return l.getHref(); + } + } + return null; + } + } diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java index 93664737..63e64802 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshUpdater.java @@ -7,18 +7,28 @@ import javax.ejb.Stateless; import javax.inject.Inject; import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.commafeed.backend.dao.FeedDAO; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; +import com.commafeed.backend.model.FeedPushInfo; +import com.commafeed.backend.pubsubhubbub.SubscriptionHandler; import com.commafeed.backend.services.FeedUpdateService; @Stateless public class FeedRefreshUpdater { + protected static Logger log = LoggerFactory + .getLogger(FeedRefreshUpdater.class); + @Inject FeedUpdateService feedUpdateService; + @Inject + SubscriptionHandler handler; + @Inject FeedDAO feedDAO; @@ -28,6 +38,14 @@ public class FeedRefreshUpdater { feedUpdateService.updateEntries(feed, entries); } feedDAO.update(feed); + handlePubSub(feed); + } + + private void handlePubSub(Feed feed) { + FeedPushInfo info = feed.getPushInfo(); + if (info != null && info.isActive() == false) { + handler.subscribe(feed); + } } } diff --git a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java index 0928be6a..b047efae 100644 --- a/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java +++ b/src/main/java/com/commafeed/backend/feeds/FeedRefreshWorker.java @@ -13,12 +13,14 @@ import javax.transaction.RollbackException; import javax.transaction.SystemException; import org.apache.commons.lang.mutable.MutableBoolean; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.commafeed.backend.HttpGetter.NotModifiedException; import com.commafeed.backend.model.Feed; import com.commafeed.backend.model.FeedEntry; +import com.commafeed.backend.model.FeedPushInfo; import com.sun.syndication.io.FeedException; public class FeedRefreshWorker { @@ -78,7 +80,6 @@ public class FeedRefreshWorker { try { fetchedFeed = fetcher.fetch(feed.getUrl(), false, feed.getLastModifiedHeader(), feed.getEtagHeader()); - // stops here if NotModifiedException or any other exception is // thrown entries = fetchedFeed.getEntries(); @@ -90,6 +91,8 @@ public class FeedRefreshWorker { .getLastModifiedHeader()); feed.setEtagHeader(fetchedFeed.getFeed().getEtagHeader()); + handlePubSub(feed, fetchedFeed); + } catch (NotModifiedException e) { log.debug("Feed not modified (304) : " + feed.getUrl()); if (feed.getErrorCount() == 0) { @@ -112,12 +115,32 @@ public class FeedRefreshWorker { feed.setErrorCount(errorCount); feed.setMessage(message); feed.setDisabledUntil(disabledUntil); - log.info(feed.getUrl() + " disabledUntil " + disabledUntil); feedRefreshUpdater.updateEntries(feed, entries); } + private void handlePubSub(Feed feed, FetchedFeed fetchedFeed) { + String hub = fetchedFeed.getHub(); + String topic = fetchedFeed.getTopic(); + log.info("Checking for pubsub infos for {}", feed.getUrl()); + if (hub != null && topic != null) { + log.info("feed {} has pubsub info: {}", feed.getUrl(), topic); + FeedPushInfo info = feed.getPushInfo(); + if (info == null) { + info = new FeedPushInfo(); + } + if (!StringUtils.equals(hub, info.getHub()) + || !StringUtils.equals(topic, info.getTopic())) { + info.setHub(hub); + info.setTopic(topic); + info.setFeed(feed); + info.setActive(false); + } + feed.setPushInfo(info); + } + } + private Feed getNextFeed() { return taskGiver.take(); } diff --git a/src/main/java/com/commafeed/backend/feeds/FetchedFeed.java b/src/main/java/com/commafeed/backend/feeds/FetchedFeed.java index b1be3e25..829323eb 100644 --- a/src/main/java/com/commafeed/backend/feeds/FetchedFeed.java +++ b/src/main/java/com/commafeed/backend/feeds/FetchedFeed.java @@ -16,6 +16,16 @@ public class FetchedFeed { private long fetchDuration; private Date publishedDate; + /** + * pubsubhubbub hub url + */ + private String hub; + + /** + * pubsubhubbub topic + */ + private String topic; + public Feed getFeed() { return feed; } @@ -56,4 +66,20 @@ public class FetchedFeed { this.publishedDate = publishedDate; } + public String getHub() { + return hub; + } + + public void setHub(String hub) { + this.hub = hub; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + } diff --git a/src/main/java/com/commafeed/backend/model/Feed.java b/src/main/java/com/commafeed/backend/model/Feed.java index cb10ab25..01a3022e 100644 --- a/src/main/java/com/commafeed/backend/model/Feed.java +++ b/src/main/java/com/commafeed/backend/model/Feed.java @@ -3,10 +3,13 @@ package com.commafeed.backend.model; import java.util.Date; import java.util.Set; +import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; +import javax.persistence.FetchType; import javax.persistence.ManyToMany; import javax.persistence.OneToMany; +import javax.persistence.OneToOne; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType; @@ -64,6 +67,10 @@ public class Feed extends AbstractModel { @Column(length = 255) private String etagHeader; + @OneToOne(fetch = FetchType.LAZY, mappedBy = "feed", cascade = { + CascadeType.PERSIST, CascadeType.MERGE }) + private FeedPushInfo pushInfo; + @ManyToMany(mappedBy = "feeds") private Set entries = Sets.newHashSet(); @@ -174,4 +181,12 @@ public class Feed extends AbstractModel { this.lastUpdateSuccess = lastUpdateSuccess; } + public FeedPushInfo getPushInfo() { + return pushInfo; + } + + public void setPushInfo(FeedPushInfo pushInfo) { + this.pushInfo = pushInfo; + } + } diff --git a/src/main/java/com/commafeed/backend/model/FeedPushInfo.java b/src/main/java/com/commafeed/backend/model/FeedPushInfo.java new file mode 100644 index 00000000..1b32b38b --- /dev/null +++ b/src/main/java/com/commafeed/backend/model/FeedPushInfo.java @@ -0,0 +1,60 @@ +package com.commafeed.backend.model; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.OneToOne; +import javax.persistence.Table; + +import org.hibernate.annotations.Index; + +@Entity +@Table(name = "FEEDPUSHINFOS") +@SuppressWarnings("serial") +public class FeedPushInfo extends AbstractModel { + + @OneToOne(fetch = FetchType.LAZY) + private Feed feed; + + @Column(length = 2048, nullable = false) + @Index(name = "topic_index") + private String topic; + + @Column(length = 2048, nullable = false) + private String hub; + + private boolean active; + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Feed getFeed() { + return feed; + } + + public void setFeed(Feed feed) { + this.feed = feed; + } + + public String getHub() { + return hub; + } + + public void setHub(String hub) { + this.hub = hub; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + +} diff --git a/src/main/java/com/commafeed/backend/pubsubhubbub/SubscriptionHandler.java b/src/main/java/com/commafeed/backend/pubsubhubbub/SubscriptionHandler.java new file mode 100644 index 00000000..e98b43e9 --- /dev/null +++ b/src/main/java/com/commafeed/backend/pubsubhubbub/SubscriptionHandler.java @@ -0,0 +1,77 @@ +package com.commafeed.backend.pubsubhubbub; + +import java.util.List; + +import javax.ejb.Asynchronous; +import javax.inject.Inject; +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.message.BasicNameValuePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.commafeed.backend.HttpGetter; +import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedPushInfo; +import com.commafeed.backend.services.ApplicationSettingsService; +import com.google.common.collect.Lists; + +public class SubscriptionHandler { + + private static Logger log = LoggerFactory + .getLogger(SubscriptionHandler.class); + + @Inject + ApplicationSettingsService applicationSettingsService; + + @Asynchronous + public void subscribe(Feed feed) { + FeedPushInfo info = feed.getPushInfo(); + String hub = info.getHub(); + String topic = info.getTopic(); + String publicUrl = applicationSettingsService.get().getPublicUrl(); + if (publicUrl.endsWith("/")) { + publicUrl = publicUrl.substring(0, publicUrl.length() - 1); + } + + log.info("sending new pubsub subscription to {} for {}", hub, topic); + + HttpPost post = new HttpPost(hub); + List nvp = Lists.newArrayList(); + nvp.add(new BasicNameValuePair("hub.callback", publicUrl + + "/rest/push/callback")); + nvp.add(new BasicNameValuePair("hub.topic", topic)); + nvp.add(new BasicNameValuePair("hub.mode", "subscribe")); + nvp.add(new BasicNameValuePair("hub.verify", "async")); + nvp.add(new BasicNameValuePair("hub.secret", "")); + nvp.add(new BasicNameValuePair("hub.verify_token", "")); + nvp.add(new BasicNameValuePair("hub.lease_seconds", "")); + + post.setHeader(HttpHeaders.USER_AGENT, "CommaFeed"); + post.setHeader(HttpHeaders.CONTENT_TYPE, + MediaType.APPLICATION_FORM_URLENCODED); + + HttpClient client = HttpGetter.newClient(); + try { + post.setEntity(new UrlEncodedFormEntity(nvp)); + HttpResponse response = client.execute(post); + + int code = response.getStatusLine().getStatusCode(); + if (code != 204 && code != 202) { + throw new Exception("Unexpected response code: " + code + " " + + response.getStatusLine().getReasonPhrase()); + } + log.info("subscribed to {} for {}", hub, topic); + } catch (Exception e) { + log.error("Could not subscribe to pubsub: " + e.getMessage(), e); + } finally { + client.getConnectionManager().shutdown(); + } + } +} diff --git a/src/main/java/com/commafeed/frontend/rest/RESTApplication.java b/src/main/java/com/commafeed/frontend/rest/RESTApplication.java index 24608770..d73db166 100644 --- a/src/main/java/com/commafeed/frontend/rest/RESTApplication.java +++ b/src/main/java/com/commafeed/frontend/rest/RESTApplication.java @@ -10,6 +10,7 @@ import com.commafeed.frontend.rest.resources.ApiDocumentationREST; import com.commafeed.frontend.rest.resources.CategoryREST; import com.commafeed.frontend.rest.resources.EntryREST; import com.commafeed.frontend.rest.resources.FeedREST; +import com.commafeed.frontend.rest.resources.PubSubHubbubCallbackREST; import com.commafeed.frontend.rest.resources.ServerREST; import com.commafeed.frontend.rest.resources.UserREST; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; @@ -36,6 +37,7 @@ public class RESTApplication extends Application { set.add(AdminREST.class); set.add(ApiDocumentationREST.class); + set.add(PubSubHubbubCallbackREST.class); return set; } diff --git a/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java b/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java index ec76cae5..3ccad02f 100644 --- a/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java +++ b/src/main/java/com/commafeed/frontend/rest/resources/AdminREST.java @@ -125,6 +125,7 @@ public class AdminREST extends AbstractResourceREST { userModel = new UserModel(); userModel.setId(user.getId()); userModel.setName(user.getName()); + userModel.setEmail(user.getEmail()); userModel.setEnabled(!user.isDisabled()); users.put(key, userModel); } diff --git a/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java b/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java new file mode 100644 index 00000000..c0eefa24 --- /dev/null +++ b/src/main/java/com/commafeed/frontend/rest/resources/PubSubHubbubCallbackREST.java @@ -0,0 +1,102 @@ +package com.commafeed.frontend.rest.resources; + +import java.util.List; + +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.commafeed.backend.dao.FeedPushInfoDAO; +import com.commafeed.backend.feeds.FeedParser; +import com.commafeed.backend.feeds.FeedRefreshTaskGiver; +import com.commafeed.backend.feeds.FetchedFeed; +import com.commafeed.backend.model.Feed; +import com.commafeed.backend.model.FeedPushInfo; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; + +@Path("/push") +public class PubSubHubbubCallbackREST { + + private static Logger log = LoggerFactory + .getLogger(PubSubHubbubCallbackREST.class); + + @Context + HttpServletRequest request; + + @Inject + FeedPushInfoDAO feedPushInfoDAO; + + @Inject + FeedParser parser; + + @Inject + FeedRefreshTaskGiver taskGiver; + + @Path("/callback") + @GET + @Produces(MediaType.TEXT_PLAIN) + public Response verify(@QueryParam("hub.mode") String mode, + @QueryParam("hub.topic") String topic, + @QueryParam("hub.challenge") String challenge, + @QueryParam("hub.lease_seconds") String leaseSeconds, + @QueryParam("hub.verify_token") String verifyToken) { + Preconditions.checkArgument(StringUtils.isNotEmpty(topic)); + Preconditions.checkArgument("subscribe".equals(mode)); + + log.info("confirmation callback received for {}", topic); + + List infos = feedPushInfoDAO.findByTopic(topic); + + if (infos.isEmpty() == false) { + for (FeedPushInfo info : infos) { + log.info("activated push notifications for {}", info.getFeed() + .getUrl()); + info.setActive(true); + } + feedPushInfoDAO.update(infos); + return Response.ok(challenge).build(); + } else { + log.info("rejecting callback: no push info for {}", topic); + return Response.status(Status.NOT_FOUND).build(); + } + } + + @Path("/callback") + @POST + @Consumes({ MediaType.APPLICATION_ATOM_XML, "application/rss+xml" }) + public Response callback() { + log.info("content callback received"); + try { + byte[] bytes = IOUtils.toByteArray(request.getInputStream()); + log.info(new String(bytes)); + FetchedFeed fetchedFeed = parser.parse(null, bytes); + String topic = fetchedFeed.getTopic(); + if (topic != null) { + log.info("content callback received for {}", topic); + List infos = feedPushInfoDAO.findByTopic(topic); + for (FeedPushInfo info : infos) { + Feed feed = info.getFeed(); + log.info("pushing content to queue for {}", feed.getUrl()); + taskGiver.add(feed); + } + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return Response.ok().build(); + } +}