remove wonky pubsub support

This commit is contained in:
Athou
2023-12-18 08:39:22 +01:00
parent c45f832131
commit 00faf44c94
15 changed files with 17 additions and 424 deletions

View File

@@ -31,7 +31,6 @@ import com.commafeed.frontend.resource.AdminREST;
import com.commafeed.frontend.resource.CategoryREST;
import com.commafeed.frontend.resource.EntryREST;
import com.commafeed.frontend.resource.FeedREST;
import com.commafeed.frontend.resource.PubSubHubbubCallbackREST;
import com.commafeed.frontend.resource.ServerREST;
import com.commafeed.frontend.resource.UserREST;
import com.commafeed.frontend.resource.fever.FeverREST;
@@ -159,7 +158,6 @@ public class CommaFeedApplication extends Application<CommaFeedConfiguration> {
environment.jersey().register(injector.getInstance(CategoryREST.class));
environment.jersey().register(injector.getInstance(EntryREST.class));
environment.jersey().register(injector.getInstance(FeedREST.class));
environment.jersey().register(injector.getInstance(PubSubHubbubCallbackREST.class));
environment.jersey().register(injector.getInstance(ServerREST.class));
environment.jersey().register(injector.getInstance(UserREST.class));
environment.jersey().register(injector.getInstance(FeverREST.class));

View File

@@ -127,10 +127,6 @@ public class CommaFeedConfiguration extends Configuration implements WebsocketBu
@Valid
private Boolean heavyLoad;
@NotNull
@Valid
private Boolean pubsubhubbub;
@NotNull
@Valid
private Boolean imageProxyEnabled;

View File

@@ -54,10 +54,6 @@ public class FeedDAO extends GenericDAO<Feed> {
return null;
}
public List<Feed> findByTopic(String topic) {
return query().selectFrom(feed).where(feed.pushTopicHash.eq(DigestUtils.sha1Hex(topic))).fetch();
}
public List<Feed> findWithoutSubscriptions(int max) {
QFeedSubscription sub = QFeedSubscription.feedSubscription;
return query().selectFrom(feed).where(JPAExpressions.selectOne().from(sub).where(sub.feed.eq(feed)).notExists()).limit(max).fetch();

View File

@@ -70,8 +70,6 @@ public class FeedParser {
String title = rss.getTitle();
Feed feed = new Feed();
feed.setPushHub(findHub(rss));
feed.setPushTopic(findSelf(rss));
feed.setUrl(feedUrl);
feed.setLink(rss.getLink());

View File

@@ -11,7 +11,6 @@ import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@@ -26,7 +25,6 @@ import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.commafeed.backend.service.FeedEntryService;
import com.commafeed.backend.service.FeedService;
import com.commafeed.backend.service.PubSubService;
import com.commafeed.frontend.ws.WebSocketMessageBuilder;
import com.commafeed.frontend.ws.WebSocketSessions;
import com.google.common.util.concurrent.Striped;
@@ -47,7 +45,6 @@ public class FeedRefreshUpdater implements Managed {
private final UnitOfWork unitOfWork;
private final FeedService feedService;
private final FeedEntryService feedEntryService;
private final PubSubService pubSubService;
private final CommaFeedConfiguration config;
private final FeedSubscriptionDAO feedSubscriptionDAO;
private final CacheService cache;
@@ -62,12 +59,11 @@ public class FeedRefreshUpdater implements Managed {
@Inject
public FeedRefreshUpdater(UnitOfWork unitOfWork, FeedService feedService, FeedEntryService feedEntryService,
PubSubService pubSubService, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO,
CacheService cache, WebSocketSessions webSocketSessions) {
CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO, CacheService cache,
WebSocketSessions webSocketSessions) {
this.unitOfWork = unitOfWork;
this.feedService = feedService;
this.feedEntryService = feedEntryService;
this.pubSubService = pubSubService;
this.config = config;
this.feedSubscriptionDAO = feedSubscriptionDAO;
this.cache = cache;
@@ -125,29 +121,6 @@ public class FeedRefreshUpdater implements Managed {
return new AddEntryResult(processed, inserted);
}
private void handlePubSub(final Feed feed) {
if (feed.getPushHub() != null && feed.getPushTopic() != null) {
Date lastPing = feed.getPushLastPing();
Date now = new Date();
if (lastPing == null || lastPing.before(DateUtils.addDays(now, -3))) {
new Thread() {
@Override
public void run() {
try {
// make sure the feed has been updated in the database so that the
// callback works
Thread.sleep(30000);
} catch (InterruptedException e1) {
// do nothing
}
pubSubService.subscribe(feed);
}
}.start();
}
}
}
public boolean update(Feed feed, List<FeedEntry> entries) {
boolean processed = true;
boolean insertedAtLeastOneEntry = false;
@@ -190,9 +163,6 @@ public class FeedRefreshUpdater implements Managed {
}
}
if (Boolean.TRUE.equals(config.getApplicationSettings().getPubsubhubbub())) {
handlePubSub(feed);
}
if (!processed) {
// requeue asap
feed.setDisabledUntil(new Date(0));

View File

@@ -5,7 +5,6 @@ import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Meter;
@@ -73,8 +72,6 @@ public class FeedRefreshWorker {
feed.setMessage(null);
feed.setDisabledUntil(refreshIntervalCalculator.onFetchSuccess(feedFetcherResult.getFeed()));
handlePubSub(feed, feedFetcherResult.getFeed());
return new FeedRefreshWorkerResult(feed, entries);
} catch (NotModifiedException e) {
log.debug("Feed not modified : {} - {}", feed.getUrl(), e.getMessage());
@@ -105,28 +102,6 @@ public class FeedRefreshWorker {
}
}
private void handlePubSub(Feed feed, Feed fetchedFeed) {
String hub = fetchedFeed.getPushHub();
String topic = fetchedFeed.getPushTopic();
if (hub != null && topic != null) {
if (hub.contains("hubbub.api.typepad.com")) {
// that hub does not exist anymore
return;
}
if (topic.startsWith("www.")) {
topic = "http://" + topic;
} else if (topic.startsWith("feed://")) {
topic = "http://" + topic.substring(7);
} else if (!topic.startsWith("http")) {
topic = "http://" + topic;
}
log.debug("feed {} has pubsub info: {}", feed.getUrl(), topic);
feed.setPushHub(hub);
feed.setPushTopic(topic);
feed.setPushTopicHash(DigestUtils.sha1Hex(topic));
}
}
@Value
public static class FeedRefreshWorkerResult {
Feed feed;

View File

@@ -99,25 +99,4 @@ public class Feed extends AbstractModel {
@Column(length = 40)
private String lastContentHash;
/**
* detected hub for pubsubhubbub
*/
@Column(length = 2048)
private String pushHub;
/**
* detected topic for pubsubhubbub
*/
@Column(length = 2048)
private String pushTopic;
@Column(name = "push_topic_hash", length = 2048)
private String pushTopicHash;
/**
* last time we subscribed for that topic on that hub
*/
@Temporal(TemporalType.TIMESTAMP)
private Date pushLastPing;
}

View File

@@ -1,91 +0,0 @@
package com.commafeed.backend.service;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.HttpGetter;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.feed.FeedUtils;
import com.commafeed.backend.model.Feed;
import com.commafeed.frontend.resource.PubSubHubbubCallbackREST;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.ws.rs.core.MediaType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Sends push subscription requests. Callback is handled by {@link PubSubHubbubCallbackREST}
*
*/
@Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
@Singleton
public class PubSubService {
private final CommaFeedConfiguration config;
private final FeedService feedService;
private final UnitOfWork unitOfWork;
public void subscribe(Feed feed) {
String hub = feed.getPushHub();
String topic = feed.getPushTopic();
String publicUrl = FeedUtils.removeTrailingSlash(config.getApplicationSettings().getPublicUrl());
log.debug("sending new pubsub subscription to {} for {}", hub, topic);
HttpPost post = new HttpPost(hub);
List<NameValuePair> nvp = new ArrayList<>();
nvp.add(new BasicNameValuePair("hub.callback", publicUrl + "/rest/push/callback"));
nvp.add(new BasicNameValuePair("hub.topic", topic));
nvp.add(new BasicNameValuePair("hub.mode", "subscribe"));
nvp.add(new BasicNameValuePair("hub.verify", "async"));
nvp.add(new BasicNameValuePair("hub.secret", ""));
nvp.add(new BasicNameValuePair("hub.verify_token", ""));
nvp.add(new BasicNameValuePair("hub.lease_seconds", ""));
post.setHeader(HttpHeaders.USER_AGENT, "CommaFeed");
post.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED);
CloseableHttpClient client = HttpGetter.newClient(20000);
CloseableHttpResponse response = null;
try {
post.setEntity(new UrlEncodedFormEntity(nvp));
response = client.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code != 204 && code != 202 && code != 200) {
String message = EntityUtils.toString(response.getEntity());
String pushpressError = " is value is not allowed. You may only subscribe to";
if (code == 400 && StringUtils.contains(message, pushpressError)) {
String[] tokens = message.split(" ");
feed.setPushTopic(tokens[tokens.length - 1]);
unitOfWork.run(() -> feedService.save(feed));
log.debug("handled pushpress subfeed {} : {}", topic, feed.getPushTopic());
} else {
throw new Exception(
"Unexpected response code: " + code + " " + response.getStatusLine().getReasonPhrase() + " - " + message);
}
}
log.debug("subscribed to {} for {}", hub, topic);
} catch (Exception e) {
log.error("Could not subscribe to {} for {} : " + e.getMessage(), hub, topic);
} finally {
IOUtils.closeQuietly(response);
IOUtils.closeQuietly(client);
}
}
}

View File

@@ -1,125 +0,0 @@
package com.commafeed.frontend.resource;
import java.util.Date;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.annotation.Timed;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.feed.FeedParser;
import com.commafeed.backend.feed.FeedParser.FeedParserResult;
import com.commafeed.backend.feed.FeedRefreshEngine;
import com.commafeed.backend.model.Feed;
import com.google.common.base.Preconditions;
import io.dropwizard.hibernate.UnitOfWork;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Path("/push")
@Slf4j
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
@Singleton
public class PubSubHubbubCallbackREST {
@Context
private HttpServletRequest request;
private final FeedDAO feedDAO;
private final FeedParser parser;
private final FeedRefreshEngine feedRefreshEngine;
private final CommaFeedConfiguration config;
private final MetricRegistry metricRegistry;
@Path("/callback")
@GET
@UnitOfWork
@Produces(MediaType.TEXT_PLAIN)
@Timed
public Response verify(@QueryParam("hub.mode") String mode, @QueryParam("hub.topic") String topic,
@QueryParam("hub.challenge") String challenge, @QueryParam("hub.lease_seconds") String leaseSeconds,
@QueryParam("hub.verify_token") String verifyToken) {
if (!config.getApplicationSettings().getPubsubhubbub()) {
return Response.status(Status.FORBIDDEN).entity("pubsubhubbub is disabled").build();
}
Preconditions.checkArgument(StringUtils.isNotEmpty(topic));
Preconditions.checkArgument("subscribe".equals(mode));
log.debug("confirmation callback received for {}", topic);
List<Feed> feeds = feedDAO.findByTopic(topic);
if (!feeds.isEmpty()) {
for (Feed feed : feeds) {
log.debug("activated push notifications for {}", feed.getPushTopic());
feed.setPushLastPing(new Date());
}
feedDAO.saveOrUpdate(feeds);
return Response.ok(challenge).build();
} else {
log.debug("rejecting callback: no push info for {}", topic);
return Response.status(Status.NOT_FOUND).build();
}
}
@Path("/callback")
@POST
@UnitOfWork
@Consumes({ MediaType.APPLICATION_ATOM_XML, "application/rss+xml" })
@Timed
public Response callback() {
if (!config.getApplicationSettings().getPubsubhubbub()) {
return Response.status(Status.FORBIDDEN).entity("pubsubhubbub is disabled").build();
}
try {
byte[] bytes = IOUtils.toByteArray(request.getInputStream());
if (ArrayUtils.isEmpty(bytes)) {
return Response.status(Status.BAD_REQUEST).entity("empty body received").build();
}
FeedParserResult feedParserResult = parser.parse(null, bytes);
String topic = feedParserResult.getFeed().getPushTopic();
if (StringUtils.isBlank(topic)) {
return Response.status(Status.BAD_REQUEST).entity("empty topic received").build();
}
log.debug("content callback received for {}", topic);
List<Feed> feeds = feedDAO.findByTopic(topic);
if (feeds.isEmpty()) {
return Response.status(Status.BAD_REQUEST).entity("no feeds found for that topic").build();
}
for (Feed feed : feeds) {
log.debug("pushing content to queue for {}", feed.getUrl());
feedRefreshEngine.refreshImmediately(feed);
}
metricRegistry.meter(MetricRegistry.name(getClass(), "pushReceived")).mark();
} catch (Exception e) {
log.error("Could not parse pubsub callback: " + e.getMessage());
}
return Response.ok().build();
}
}