Merge branch 'lpoirothattermann-master'

This commit is contained in:
Athou
2026-02-18 20:53:50 +01:00
69 changed files with 2293 additions and 196 deletions

View File

@@ -68,6 +68,12 @@ public interface CommaFeedConfiguration {
@ConfigDocSection
FeedRefresh feedRefresh();
/**
* Push notification settings.
*/
@ConfigDocSection
PushNotifications pushNotifications();
/**
* Database settings.
*/
@@ -138,10 +144,9 @@ public interface CommaFeedConfiguration {
* Prevent access to local addresses to mitigate server-side request forgery (SSRF) attacks, which could potentially expose internal
* resources.
*
* You may want to disable this if you subscribe to feeds that are only available on your local network and you trust all users of
* your CommaFeed instance.
* You may want to enable this if you host a public instance of CommaFeed with regisration open.
*/
@WithDefault("true")
@WithDefault("false")
boolean blockLocalAddresses();
/**
@@ -242,6 +247,28 @@ public interface CommaFeedConfiguration {
Duration forceRefreshCooldownDuration();
}
interface PushNotifications {
/**
* Whether to enable push notifications to notify users of new entries in their feeds.
*/
@WithDefault("true")
boolean enabled();
/**
* Amount of threads used to send external notifications about new entries.
*/
@Min(1)
@WithDefault("5")
int threads();
/**
* Maximum amount of notifications that can be queued before new notifications are discarded.
*/
@Min(1)
@WithDefault("100")
int queueCapacity();
}
interface FeedRefreshErrorHandling {
/**
* Number of retries before backoff is applied.

View File

@@ -0,0 +1,124 @@
package com.commafeed.backend;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.SequencedMap;
import java.util.zip.GZIPInputStream;
import jakarta.inject.Singleton;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.entity.DeflateInputStream;
import org.apache.hc.client5.http.entity.InputStreamFactory;
import org.apache.hc.client5.http.entity.compress.ContentCoding;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.brotli.dec.BrotliInputStream;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.CommaFeedVersion;
import com.google.common.net.HttpHeaders;
import lombok.RequiredArgsConstructor;
import nl.altindag.ssl.SSLFactory;
import nl.altindag.ssl.apache5.util.Apache5SslUtils;
@Singleton
@RequiredArgsConstructor
public class HttpClientFactory {
private final CommaFeedConfiguration config;
private final CommaFeedVersion version;
public CloseableHttpClient newClient(int poolSize) {
PoolingHttpClientConnectionManager connectionManager = newConnectionManager(config, poolSize);
String userAgent = config.httpClient()
.userAgent()
.orElseGet(() -> String.format("CommaFeed/%s (https://github.com/Athou/commafeed)", version.getVersion()));
return newClient(connectionManager, userAgent, config.httpClient().idleConnectionsEvictionInterval());
}
private CloseableHttpClient newClient(HttpClientConnectionManager connectionManager, String userAgent,
Duration idleConnectionsEvictionInterval) {
List<Header> headers = new ArrayList<>();
headers.add(new BasicHeader(HttpHeaders.ACCEPT_LANGUAGE, "en"));
headers.add(new BasicHeader(HttpHeaders.PRAGMA, "No-cache"));
headers.add(new BasicHeader(HttpHeaders.CACHE_CONTROL, "no-cache"));
SequencedMap<String, InputStreamFactory> contentDecoderMap = new LinkedHashMap<>();
contentDecoderMap.put(ContentCoding.GZIP.token(), GZIPInputStream::new);
contentDecoderMap.put(ContentCoding.DEFLATE.token(), DeflateInputStream::new);
contentDecoderMap.put(ContentCoding.BROTLI.token(), BrotliInputStream::new);
return HttpClientBuilder.create()
.useSystemProperties()
.disableAutomaticRetries()
.disableCookieManagement()
.setUserAgent(userAgent)
.setDefaultHeaders(headers)
.setConnectionManager(connectionManager)
.evictExpiredConnections()
.evictIdleConnections(TimeValue.of(idleConnectionsEvictionInterval))
.setContentDecoderRegistry(new LinkedHashMap<>(contentDecoderMap))
.build();
}
private PoolingHttpClientConnectionManager newConnectionManager(CommaFeedConfiguration config, int poolSize) {
SSLFactory sslFactory = SSLFactory.builder().withUnsafeTrustMaterial().withUnsafeHostnameVerifier().build();
DnsResolver dnsResolver = config.httpClient().blockLocalAddresses()
? new BlockLocalAddressesDnsResolver(SystemDefaultDnsResolver.INSTANCE)
: SystemDefaultDnsResolver.INSTANCE;
return PoolingHttpClientConnectionManagerBuilder.create()
.setTlsSocketStrategy(Apache5SslUtils.toTlsSocketStrategy(sslFactory))
.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(config.httpClient().connectTimeout()))
.setSocketTimeout(Timeout.of(config.httpClient().socketTimeout()))
.setTimeToLive(Timeout.of(config.httpClient().connectionTimeToLive()))
.build())
.setDefaultTlsConfig(TlsConfig.custom().setHandshakeTimeout(Timeout.of(config.httpClient().sslHandshakeTimeout())).build())
.setMaxConnPerRoute(poolSize)
.setMaxConnTotal(poolSize)
.setDnsResolver(dnsResolver)
.build();
}
private record BlockLocalAddressesDnsResolver(DnsResolver delegate) implements DnsResolver {
@Override
public InetAddress[] resolve(String host) throws UnknownHostException {
InetAddress[] addresses = delegate.resolve(host);
for (InetAddress addr : addresses) {
if (isLocalAddress(addr)) {
throw new UnknownHostException("Access to address blocked: " + addr.getHostAddress());
}
}
return addresses;
}
@Override
public String resolveCanonicalHostname(String host) throws UnknownHostException {
return delegate.resolveCanonicalHostname(host);
}
private boolean isLocalAddress(InetAddress address) {
return address.isSiteLocalAddress() || address.isAnyLocalAddress() || address.isLinkLocalAddress()
|| address.isLoopbackAddress() || address.isMulticastAddress();
}
}
}

View File

@@ -2,20 +2,12 @@ package com.commafeed.backend;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.SequencedMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import jakarta.inject.Singleton;
import jakarta.ws.rs.core.CacheControl;
@@ -24,36 +16,22 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.entity.DeflateInputStream;
import org.apache.hc.client5.http.entity.InputStreamFactory;
import org.apache.hc.client5.http.entity.compress.ContentCoding;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.protocol.RedirectLocations;
import org.apache.hc.client5.http.utils.DateUtils;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.brotli.dec.BrotliInputStream;
import org.jboss.resteasy.reactive.common.headers.CacheControlDelegate;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.CommaFeedConfiguration.HttpClientCache;
import com.commafeed.CommaFeedVersion;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
@@ -66,8 +44,6 @@ import lombok.Getter;
import lombok.Lombok;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import nl.altindag.ssl.SSLFactory;
import nl.altindag.ssl.apache5.util.Apache5SslUtils;
/**
* Smart HTTP getter: handles gzip, ssl, last modified and etag headers
@@ -82,42 +58,27 @@ public class HttpGetter {
private final CloseableHttpClient client;
private final Cache<HttpRequest, HttpResponse> cache;
public HttpGetter(CommaFeedConfiguration config, InstantSource instantSource, CommaFeedVersion version, MetricRegistry metrics) {
public HttpGetter(CommaFeedConfiguration config, InstantSource instantSource, HttpClientFactory httpClientFactory,
MetricRegistry metrics) {
this.config = config;
this.instantSource = instantSource;
PoolingHttpClientConnectionManager connectionManager = newConnectionManager(config);
String userAgent = config.httpClient()
.userAgent()
.orElseGet(() -> String.format("CommaFeed/%s (https://github.com/Athou/commafeed)", version.getVersion()));
this.client = newClient(connectionManager, userAgent, config.httpClient().idleConnectionsEvictionInterval());
this.client = httpClientFactory.newClient(config.feedRefresh().httpThreads());
this.cache = newCache(config);
metrics.registerGauge(MetricRegistry.name(getClass(), "pool", "max"), () -> connectionManager.getTotalStats().getMax());
metrics.registerGauge(MetricRegistry.name(getClass(), "pool", "size"),
() -> connectionManager.getTotalStats().getAvailable() + connectionManager.getTotalStats().getLeased());
metrics.registerGauge(MetricRegistry.name(getClass(), "pool", "leased"), () -> connectionManager.getTotalStats().getLeased());
metrics.registerGauge(MetricRegistry.name(getClass(), "pool", "pending"), () -> connectionManager.getTotalStats().getPending());
metrics.registerGauge(MetricRegistry.name(getClass(), "cache", "size"), () -> cache == null ? 0 : cache.size());
metrics.registerGauge(MetricRegistry.name(getClass(), "cache", "memoryUsage"),
() -> cache == null ? 0 : cache.asMap().values().stream().mapToInt(e -> ArrayUtils.getLength(e.content)).sum());
}
public HttpResult get(String url)
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException, HostNotAllowedException {
public HttpResult get(String url) throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException {
return get(HttpRequest.builder(url).build());
}
public HttpResult get(HttpRequest request)
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException, HostNotAllowedException {
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException {
URI uri = URI.create(request.getUrl());
ensureHttpScheme(uri.getScheme());
if (config.httpClient().blockLocalAddresses()) {
ensurePublicAddress(uri.getHost());
}
final HttpResponse response;
if (cache == null) {
response = invoke(request);
@@ -164,22 +125,6 @@ public class HttpGetter {
}
}
private void ensurePublicAddress(String host) throws HostNotAllowedException, UnknownHostException {
if (host == null) {
throw new HostNotAllowedException(null);
}
InetAddress[] addresses = DNS_RESOLVER.resolve(host);
if (Stream.of(addresses).anyMatch(this::isPrivateAddress)) {
throw new HostNotAllowedException(host);
}
}
private boolean isPrivateAddress(InetAddress address) {
return address.isSiteLocalAddress() || address.isAnyLocalAddress() || address.isLinkLocalAddress() || address.isLoopbackAddress()
|| address.isMulticastAddress();
}
private HttpResponse invoke(HttpRequest request) throws IOException {
log.debug("fetching {}", request.getUrl());
@@ -268,50 +213,6 @@ public class HttpGetter {
}
}
private PoolingHttpClientConnectionManager newConnectionManager(CommaFeedConfiguration config) {
SSLFactory sslFactory = SSLFactory.builder().withUnsafeTrustMaterial().withUnsafeHostnameVerifier().build();
int poolSize = config.feedRefresh().httpThreads();
return PoolingHttpClientConnectionManagerBuilder.create()
.setTlsSocketStrategy(Apache5SslUtils.toTlsSocketStrategy(sslFactory))
.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(config.httpClient().connectTimeout()))
.setSocketTimeout(Timeout.of(config.httpClient().socketTimeout()))
.setTimeToLive(Timeout.of(config.httpClient().connectionTimeToLive()))
.build())
.setDefaultTlsConfig(TlsConfig.custom().setHandshakeTimeout(Timeout.of(config.httpClient().sslHandshakeTimeout())).build())
.setMaxConnPerRoute(poolSize)
.setMaxConnTotal(poolSize)
.setDnsResolver(DNS_RESOLVER)
.build();
}
private static CloseableHttpClient newClient(HttpClientConnectionManager connectionManager, String userAgent,
Duration idleConnectionsEvictionInterval) {
List<Header> headers = new ArrayList<>();
headers.add(new BasicHeader(HttpHeaders.ACCEPT_LANGUAGE, "en"));
headers.add(new BasicHeader(HttpHeaders.PRAGMA, "No-cache"));
headers.add(new BasicHeader(HttpHeaders.CACHE_CONTROL, "no-cache"));
SequencedMap<String, InputStreamFactory> contentDecoderMap = new LinkedHashMap<>();
contentDecoderMap.put(ContentCoding.GZIP.token(), GZIPInputStream::new);
contentDecoderMap.put(ContentCoding.DEFLATE.token(), DeflateInputStream::new);
contentDecoderMap.put(ContentCoding.BROTLI.token(), BrotliInputStream::new);
return HttpClientBuilder.create()
.useSystemProperties()
.disableAutomaticRetries()
.disableCookieManagement()
.setUserAgent(userAgent)
.setDefaultHeaders(headers)
.setConnectionManager(connectionManager)
.evictExpiredConnections()
.evictIdleConnections(TimeValue.of(idleConnectionsEvictionInterval))
.setContentDecoderRegistry(new LinkedHashMap<>(contentDecoderMap))
.build();
}
private static Cache<HttpRequest, HttpResponse> newCache(CommaFeedConfiguration config) {
HttpClientCache cacheConfig = config.httpClient().cache();
if (!cacheConfig.enabled()) {
@@ -333,14 +234,6 @@ public class HttpGetter {
}
}
public static class HostNotAllowedException extends Exception {
private static final long serialVersionUID = 1L;
public HostNotAllowedException(String host) {
super("Host not allowed: " + host);
}
}
@Getter
public static class NotModifiedException extends Exception {
private static final long serialVersionUID = 1L;

View File

@@ -53,6 +53,10 @@ public class Urls {
}
public static String removeTrailingSlash(String url) {
if (url == null) {
return null;
}
if (url.endsWith("/")) {
url = url.substring(0, url.length() - 1);
}

View File

@@ -14,7 +14,6 @@ import org.apache.hc.core5.net.URIBuilder;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.HttpGetter;
import com.commafeed.backend.HttpGetter.HostNotAllowedException;
import com.commafeed.backend.HttpGetter.HttpResult;
import com.commafeed.backend.HttpGetter.NotModifiedException;
import com.commafeed.backend.HttpGetter.SchemeNotAllowedException;
@@ -98,7 +97,7 @@ public class YoutubeFaviconFetcher extends AbstractFaviconFetcher {
}
private byte[] fetchForUser(String googleAuthKey, String userId)
throws IOException, NotModifiedException, TooManyRequestsException, HostNotAllowedException, SchemeNotAllowedException {
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException {
URI uri = UriBuilder.fromUri("https://www.googleapis.com/youtube/v3/channels")
.queryParam("part", PART_SNIPPET)
.queryParam("key", googleAuthKey)
@@ -108,7 +107,7 @@ public class YoutubeFaviconFetcher extends AbstractFaviconFetcher {
}
private byte[] fetchForChannel(String googleAuthKey, String channelId)
throws IOException, NotModifiedException, TooManyRequestsException, HostNotAllowedException, SchemeNotAllowedException {
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException {
URI uri = UriBuilder.fromUri("https://www.googleapis.com/youtube/v3/channels")
.queryParam("part", PART_SNIPPET)
.queryParam("key", googleAuthKey)
@@ -118,7 +117,7 @@ public class YoutubeFaviconFetcher extends AbstractFaviconFetcher {
}
private byte[] fetchForPlaylist(String googleAuthKey, String playlistId)
throws IOException, NotModifiedException, TooManyRequestsException, HostNotAllowedException, SchemeNotAllowedException {
throws IOException, NotModifiedException, TooManyRequestsException, SchemeNotAllowedException {
URI uri = UriBuilder.fromUri("https://www.googleapis.com/youtube/v3/playlists")
.queryParam("part", PART_SNIPPET)
.queryParam("key", googleAuthKey)

View File

@@ -13,7 +13,6 @@ import org.apache.commons.lang3.Strings;
import com.commafeed.backend.Digests;
import com.commafeed.backend.HttpGetter;
import com.commafeed.backend.HttpGetter.HostNotAllowedException;
import com.commafeed.backend.HttpGetter.HttpRequest;
import com.commafeed.backend.HttpGetter.HttpResult;
import com.commafeed.backend.HttpGetter.NotModifiedException;
@@ -46,7 +45,7 @@ public class FeedFetcher {
public FeedFetcherResult fetch(String feedUrl, boolean extractFeedUrlFromHtml, String lastModified, String eTag,
Instant lastPublishedDate, String lastContentHash) throws FeedParsingException, IOException, NotModifiedException,
TooManyRequestsException, SchemeNotAllowedException, HostNotAllowedException, NoFeedFoundException {
TooManyRequestsException, SchemeNotAllowedException, NoFeedFoundException {
log.debug("Fetching feed {}", feedUrl);
HttpResult result = getter.get(HttpRequest.builder(feedUrl).lastModified(lastModified).eTag(eTag).build());

View File

@@ -7,6 +7,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -21,6 +22,8 @@ import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.model.AbstractModel;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedSubscription;
import lombok.extern.slf4j.Slf4j;
@@ -32,6 +35,7 @@ public class FeedRefreshEngine {
private final FeedDAO feedDAO;
private final FeedRefreshWorker worker;
private final FeedRefreshUpdater updater;
private final FeedUpdateNotifier notifier;
private final CommaFeedConfiguration config;
private final Meter refill;
@@ -42,13 +46,15 @@ public class FeedRefreshEngine {
private final ExecutorService refillExecutor;
private final ThreadPoolExecutor workerExecutor;
private final ThreadPoolExecutor databaseUpdaterExecutor;
private final ThreadPoolExecutor notifierExecutor;
public FeedRefreshEngine(UnitOfWork unitOfWork, FeedDAO feedDAO, FeedRefreshWorker worker, FeedRefreshUpdater updater,
CommaFeedConfiguration config, MetricRegistry metrics) {
FeedUpdateNotifier notifier, CommaFeedConfiguration config, MetricRegistry metrics) {
this.unitOfWork = unitOfWork;
this.feedDAO = feedDAO;
this.worker = worker;
this.updater = updater;
this.notifier = notifier;
this.config = config;
this.refill = metrics.meter(MetricRegistry.name(getClass(), "refill"));
@@ -59,10 +65,14 @@ public class FeedRefreshEngine {
this.refillExecutor = newDiscardingSingleThreadExecutorService();
this.workerExecutor = newBlockingExecutorService(config.feedRefresh().httpThreads());
this.databaseUpdaterExecutor = newBlockingExecutorService(config.feedRefresh().databaseThreads());
this.notifierExecutor = newDiscardingExecutorService(config.pushNotifications().threads(),
config.pushNotifications().queueCapacity());
metrics.register(MetricRegistry.name(getClass(), "queue", "size"), (Gauge<Integer>) queue::size);
metrics.register(MetricRegistry.name(getClass(), "worker", "active"), (Gauge<Integer>) workerExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "updater", "active"), (Gauge<Integer>) databaseUpdaterExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "notifier", "active"), (Gauge<Integer>) notifierExecutor::getActiveCount);
metrics.register(MetricRegistry.name(getClass(), "notifier", "queue"), (Gauge<Integer>) () -> notifierExecutor.getQueue().size());
}
public void start() {
@@ -152,10 +162,19 @@ public class FeedRefreshEngine {
private void processFeedAsync(Feed feed) {
CompletableFuture.supplyAsync(() -> worker.update(feed), workerExecutor)
.thenApplyAsync(r -> updater.update(r.feed(), r.entries()), databaseUpdaterExecutor)
.whenComplete((data, ex) -> {
if (ex != null) {
log.error("error while processing feed {}", feed.getUrl(), ex);
}
.thenCompose(r -> {
List<CompletableFuture<Void>> futures = r.insertedUnreadEntriesBySubscription().entrySet().stream().map(e -> {
FeedSubscription sub = e.getKey();
List<FeedEntry> entries = e.getValue();
notifier.notifyOverWebsocket(sub, entries);
return CompletableFuture.runAsync(() -> notifier.sendPushNotifications(sub, entries), notifierExecutor);
}).toList();
return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
})
.exceptionally(ex -> {
log.error("error while processing feed {}", feed.getUrl(), ex);
return null;
});
}
@@ -183,6 +202,7 @@ public class FeedRefreshEngine {
this.refillExecutor.shutdownNow();
this.workerExecutor.shutdownNow();
this.databaseUpdaterExecutor.shutdownNow();
this.notifierExecutor.shutdownNow();
}
/**
@@ -194,6 +214,16 @@ public class FeedRefreshEngine {
return pool;
}
/**
* returns an ExecutorService that discards tasks if the queue is full
*/
private ThreadPoolExecutor newDiscardingExecutorService(int threads, int queueCapacity) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueCapacity));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return pool;
}
/**
* returns an ExecutorService that blocks submissions until a thread is available
*/

View File

@@ -1,5 +1,6 @@
package com.commafeed.backend.feed;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,8 +28,6 @@ import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.Models;
import com.commafeed.backend.service.FeedEntryService;
import com.commafeed.backend.service.FeedService;
import com.commafeed.frontend.ws.WebSocketMessageBuilder;
import com.commafeed.frontend.ws.WebSocketSessions;
import com.google.common.util.concurrent.Striped;
import lombok.extern.slf4j.Slf4j;
@@ -44,7 +43,6 @@ public class FeedRefreshUpdater {
private final FeedService feedService;
private final FeedEntryService feedEntryService;
private final FeedSubscriptionDAO feedSubscriptionDAO;
private final WebSocketSessions webSocketSessions;
private final Striped<Lock> locks;
@@ -52,12 +50,11 @@ public class FeedRefreshUpdater {
private final Meter entryInserted;
public FeedRefreshUpdater(UnitOfWork unitOfWork, FeedService feedService, FeedEntryService feedEntryService, MetricRegistry metrics,
FeedSubscriptionDAO feedSubscriptionDAO, WebSocketSessions webSocketSessions) {
FeedSubscriptionDAO feedSubscriptionDAO) {
this.unitOfWork = unitOfWork;
this.feedService = feedService;
this.feedEntryService = feedEntryService;
this.feedSubscriptionDAO = feedSubscriptionDAO;
this.webSocketSessions = webSocketSessions;
locks = Striped.lazyWeakLock(100000);
@@ -67,7 +64,7 @@ public class FeedRefreshUpdater {
private AddEntryResult addEntry(final Feed feed, final Entry entry, final List<FeedSubscription> subscriptions) {
boolean processed = false;
boolean inserted = false;
FeedEntry insertedEntry = null;
Set<FeedSubscription> subscriptionsForWhichEntryIsUnread = new HashSet<>();
// lock on feed, make sure we are not updating the same feed twice at
@@ -90,23 +87,21 @@ public class FeedRefreshUpdater {
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
if (locked1 && locked2) {
processed = true;
inserted = unitOfWork.call(() -> {
boolean newEntry = false;
FeedEntry feedEntry = feedEntryService.find(feed, entry);
if (feedEntry == null) {
feedEntry = feedEntryService.create(feed, entry);
newEntry = true;
insertedEntry = unitOfWork.call(() -> {
if (feedEntryService.find(feed, entry) != null) {
// entry already exists, nothing to do
return null;
}
if (newEntry) {
entryInserted.mark();
for (FeedSubscription sub : subscriptions) {
boolean unread = feedEntryService.applyFilter(sub, feedEntry);
if (unread) {
subscriptionsForWhichEntryIsUnread.add(sub);
}
FeedEntry feedEntry = feedEntryService.create(feed, entry);
entryInserted.mark();
for (FeedSubscription sub : subscriptions) {
boolean unread = feedEntryService.applyFilter(sub, feedEntry);
if (unread) {
subscriptionsForWhichEntryIsUnread.add(sub);
}
}
return newEntry;
return feedEntry;
});
} else {
log.error("lock timeout for {} - {}", feed.getUrl(), key1);
@@ -122,13 +117,13 @@ public class FeedRefreshUpdater {
lock2.unlock();
}
}
return new AddEntryResult(processed, inserted, subscriptionsForWhichEntryIsUnread);
return new AddEntryResult(processed, insertedEntry, subscriptionsForWhichEntryIsUnread);
}
public boolean update(Feed feed, List<Entry> entries) {
public FeedRefreshUpdaterResult update(Feed feed, List<Entry> entries) {
boolean processed = true;
long inserted = 0;
Map<FeedSubscription, Long> unreadCountBySubscription = new HashMap<>();
Map<FeedSubscription, List<FeedEntry>> insertedUnreadEntriesBySubscription = new HashMap<>();
if (!entries.isEmpty()) {
List<FeedSubscription> subscriptions = null;
@@ -138,8 +133,12 @@ public class FeedRefreshUpdater {
}
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
processed &= addEntryResult.processed;
inserted += addEntryResult.inserted ? 1 : 0;
addEntryResult.subscriptionsForWhichEntryIsUnread.forEach(sub -> unreadCountBySubscription.merge(sub, 1L, Long::sum));
inserted += addEntryResult.insertedEntry != null ? 1 : 0;
addEntryResult.subscriptionsForWhichEntryIsUnread.forEach(sub -> {
if (addEntryResult.insertedEntry != null) {
insertedUnreadEntriesBySubscription.computeIfAbsent(sub, k -> new ArrayList<>()).add(addEntryResult.insertedEntry);
}
});
}
if (inserted == 0) {
@@ -160,17 +159,13 @@ public class FeedRefreshUpdater {
unitOfWork.run(() -> feedService.update(feed));
notifyOverWebsocket(unreadCountBySubscription);
return processed;
return new FeedRefreshUpdaterResult(insertedUnreadEntriesBySubscription);
}
private void notifyOverWebsocket(Map<FeedSubscription, Long> unreadCountBySubscription) {
unreadCountBySubscription.forEach((sub, unreadCount) -> webSocketSessions.sendMessage(sub.getUser(),
WebSocketMessageBuilder.newFeedEntries(sub, unreadCount)));
private record AddEntryResult(boolean processed, FeedEntry insertedEntry, Set<FeedSubscription> subscriptionsForWhichEntryIsUnread) {
}
private record AddEntryResult(boolean processed, boolean inserted, Set<FeedSubscription> subscriptionsForWhichEntryIsUnread) {
public record FeedRefreshUpdaterResult(Map<FeedSubscription, List<FeedEntry>> insertedUnreadEntriesBySubscription) {
}
}

View File

@@ -0,0 +1,50 @@
package com.commafeed.backend.feed;
import java.util.List;
import jakarta.inject.Singleton;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.dao.UnitOfWork;
import com.commafeed.backend.dao.UserSettingsDAO;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.UserSettings;
import com.commafeed.backend.service.PushNotificationService;
import com.commafeed.frontend.ws.WebSocketMessageBuilder;
import com.commafeed.frontend.ws.WebSocketSessions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Singleton
@RequiredArgsConstructor
public class FeedUpdateNotifier {
private final CommaFeedConfiguration config;
private final UnitOfWork unitOfWork;
private final UserSettingsDAO userSettingsDAO;
private final WebSocketSessions webSocketSessions;
private final PushNotificationService pushNotificationService;
public void notifyOverWebsocket(FeedSubscription sub, List<FeedEntry> entries) {
if (!entries.isEmpty()) {
webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub, entries.size()));
}
}
public void sendPushNotifications(FeedSubscription sub, List<FeedEntry> entries) {
if (!config.pushNotifications().enabled() || !sub.isPushNotificationsEnabled() || entries.isEmpty()) {
return;
}
UserSettings settings = unitOfWork.call(() -> userSettingsDAO.findByUser(sub.getUser()));
if (settings != null && settings.getPushNotificationType() != null) {
for (FeedEntry entry : entries) {
pushNotificationService.notify(settings, sub, entry);
}
}
}
}

View File

@@ -46,4 +46,7 @@ public class FeedSubscription extends AbstractModel {
@Column(name = "filtering_expression_legacy", length = 4096)
private String filterLegacy;
@Column(name = "push_notifications_enabled")
private boolean pushNotificationsEnabled;
}

View File

@@ -77,6 +77,17 @@ public class UserSettings extends AbstractModel {
ON_MOBILE
}
public enum PushNotificationType {
@JsonProperty("ntfy")
NTFY,
@JsonProperty("gotify")
GOTIFY,
@JsonProperty("pushover")
PUSHOVER
}
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "user_id", nullable = false, unique = true)
private User user;
@@ -133,6 +144,22 @@ public class UserSettings extends AbstractModel {
private boolean unreadCountFavicon;
private boolean disablePullToRefresh;
@Enumerated(EnumType.STRING)
@Column(name = "push_notification_type", length = 16)
private PushNotificationType pushNotificationType;
@Column(name = "push_notification_server_url", length = 1024)
private String pushNotificationServerUrl;
@Column(name = "push_notification_user_id", length = 512)
private String pushNotificationUserId;
@Column(name = "push_notification_user_secret", length = 512)
private String pushNotificationUserSecret;
@Column(name = "push_notification_topic", length = 256)
private String pushNotificationTopic;
private boolean email;
private boolean gmail;
private boolean facebook;

View File

@@ -77,7 +77,7 @@ public class OPMLImporter {
}
// make sure we continue with the import process even if a feed failed
try {
feedSubscriptionService.subscribe(user, outline.getXmlUrl(), name, parent, position);
feedSubscriptionService.subscribe(user, outline.getXmlUrl(), name, parent, position, false);
} catch (Exception e) {
log.error("error while importing {}: {}", outline.getXmlUrl(), e.getMessage());
}

View File

@@ -49,15 +49,7 @@ public class FeedSubscriptionService {
});
}
public long subscribe(User user, String url, String title) {
return subscribe(user, url, title, null, 0);
}
public long subscribe(User user, String url, String title, FeedCategory parent) {
return subscribe(user, url, title, parent, 0);
}
public long subscribe(User user, String url, String title, FeedCategory category, int position) {
public long subscribe(User user, String url, String title, FeedCategory category, int position, boolean pushNotificationsEnabled) {
Integer maxFeedsPerUser = config.database().cleanup().maxFeedsPerUser();
if (maxFeedsPerUser > 0 && feedSubscriptionDAO.count(user) >= maxFeedsPerUser) {
String message = String.format("You cannot subscribe to more feeds on this CommaFeed instance (max %s feeds per user)",
@@ -81,6 +73,7 @@ public class FeedSubscriptionService {
sub.setCategory(category);
sub.setPosition(position);
sub.setTitle(FeedUtils.truncate(title, 128));
sub.setPushNotificationsEnabled(pushNotificationsEnabled);
return feedSubscriptionDAO.merge(sub).getId();
}

View File

@@ -0,0 +1,178 @@
package com.commafeed.backend.service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.entity.UrlEncodedFormEntity;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicNameValuePair;
import org.apache.hc.core5.util.Timeout;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.HttpClientFactory;
import com.commafeed.backend.Urls;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.UserSettings;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class PushNotificationService {
private final CloseableHttpClient httpClient;
private final Meter meter;
private final CommaFeedConfiguration config;
public PushNotificationService(HttpClientFactory httpClientFactory, MetricRegistry metrics, CommaFeedConfiguration config) {
this.httpClient = httpClientFactory.newClient(config.pushNotifications().threads());
this.meter = metrics.meter(MetricRegistry.name(getClass(), "notify"));
this.config = config;
}
public void notify(UserSettings settings, FeedSubscription subscription, FeedEntry entry) {
if (!config.pushNotifications().enabled() || settings.getPushNotificationType() == null) {
return;
}
log.debug("sending {} push notification for entry {} in feed {}", settings.getPushNotificationType(), entry.getId(),
subscription.getFeed().getId());
String entryTitle = entry.getContent() != null ? entry.getContent().getTitle() : null;
String entryUrl = entry.getUrl();
String feedTitle = subscription.getTitle();
if (StringUtils.isBlank(entryTitle)) {
entryTitle = "New entry";
}
try {
switch (settings.getPushNotificationType()) {
case NTFY -> sendNtfy(settings, feedTitle, entryTitle, entryUrl);
case GOTIFY -> sendGotify(settings, feedTitle, entryTitle, entryUrl);
case PUSHOVER -> sendPushover(settings, feedTitle, entryTitle, entryUrl);
default -> throw new IllegalStateException("unsupported notification type: " + settings.getPushNotificationType());
}
} catch (IOException e) {
throw new PushNotificationException("Failed to send external notification", e);
}
meter.mark();
}
private void sendNtfy(UserSettings settings, String feedTitle, String entryTitle, String entryUrl) throws IOException {
String serverUrl = Urls.removeTrailingSlash(settings.getPushNotificationServerUrl());
String topic = settings.getPushNotificationTopic();
if (StringUtils.isBlank(serverUrl) || StringUtils.isBlank(topic)) {
log.warn("ntfy notification skipped: missing server URL or topic");
return;
}
HttpPost request = new HttpPost(serverUrl + "/" + topic);
request.setConfig(RequestConfig.custom().setResponseTimeout(Timeout.of(config.httpClient().responseTimeout())).build());
request.addHeader("Title", feedTitle);
request.setEntity(new StringEntity(entryTitle, StandardCharsets.UTF_8));
if (StringUtils.isNotBlank(entryUrl)) {
request.addHeader("Click", entryUrl);
}
if (StringUtils.isNotBlank(settings.getPushNotificationUserSecret())) {
request.addHeader("Authorization", "Bearer " + settings.getPushNotificationUserSecret());
}
httpClient.execute(request, response -> {
if (response.getCode() >= 400) {
throw new PushNotificationException("ntfy notification failed with status " + response.getCode());
}
return null;
});
}
private void sendGotify(UserSettings settings, String feedTitle, String entryTitle, String entryUrl) throws IOException {
String serverUrl = Urls.removeTrailingSlash(settings.getPushNotificationServerUrl());
String token = settings.getPushNotificationUserSecret();
if (StringUtils.isBlank(serverUrl) || StringUtils.isBlank(token)) {
log.warn("gotify notification skipped: missing server URL or token");
return;
}
JsonObject json = new JsonObject();
json.put("title", feedTitle);
json.put("message", entryTitle);
json.put("priority", 5);
if (StringUtils.isNotBlank(entryUrl)) {
json.put("extras",
new JsonObject().put("client::notification", new JsonObject().put("click", new JsonObject().put("url", entryUrl))));
}
HttpPost request = new HttpPost(serverUrl + "/message");
request.setConfig(RequestConfig.custom().setResponseTimeout(Timeout.of(config.httpClient().responseTimeout())).build());
request.addHeader("X-Gotify-Key", token);
request.setEntity(new StringEntity(json.toString(), ContentType.APPLICATION_JSON));
httpClient.execute(request, response -> {
if (response.getCode() >= 400) {
throw new PushNotificationException("gotify notification failed with status " + response.getCode());
}
return null;
});
}
private void sendPushover(UserSettings settings, String feedTitle, String entryTitle, String entryUrl) throws IOException {
String token = settings.getPushNotificationUserSecret();
String userKey = settings.getPushNotificationUserId();
if (StringUtils.isBlank(token) || StringUtils.isBlank(userKey)) {
log.warn("pushover notification skipped: missing token or user key");
return;
}
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("token", token));
params.add(new BasicNameValuePair("user", userKey));
params.add(new BasicNameValuePair("title", feedTitle));
params.add(new BasicNameValuePair("message", entryTitle));
if (StringUtils.isNotBlank(entryUrl)) {
params.add(new BasicNameValuePair("url", entryUrl));
}
HttpPost request = new HttpPost("https://api.pushover.net/1/messages.json");
request.setConfig(RequestConfig.custom().setResponseTimeout(Timeout.of(config.httpClient().responseTimeout())).build());
request.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
httpClient.execute(request, response -> {
if (response.getCode() >= 400) {
throw new PushNotificationException("pushover notification failed with status " + response.getCode());
}
return null;
});
}
public static class PushNotificationException extends RuntimeException {
private static final long serialVersionUID = -3392881821584833819L;
public PushNotificationException(String message) {
super(message);
}
public PushNotificationException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@@ -52,4 +52,7 @@ public class ServerInfo implements Serializable {
@Schema(required = true)
private int minimumPasswordLength;
@Schema(required = true)
private boolean pushNotificationsEnabled;
}

View File

@@ -5,6 +5,7 @@ import java.io.Serializable;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
import com.commafeed.backend.model.UserSettings.IconDisplayMode;
import com.commafeed.backend.model.UserSettings.PushNotificationType;
import com.commafeed.backend.model.UserSettings.ReadingMode;
import com.commafeed.backend.model.UserSettings.ReadingOrder;
import com.commafeed.backend.model.UserSettings.ScrollMode;
@@ -81,6 +82,28 @@ public class Settings implements Serializable {
@Schema(description = "sharing settings", required = true)
private SharingSettings sharingSettings = new SharingSettings();
@Schema(description = "push notification settings", required = true)
private PushNotificationSettings pushNotificationSettings = new PushNotificationSettings();
@Schema(description = "User notification settings")
@Data
public static class PushNotificationSettings implements Serializable {
@Schema(description = "notification provider type")
private PushNotificationType type;
@Schema(description = "server URL for ntfy or gotify")
private String serverUrl;
@Schema(description = "user Id")
private String userId;
@Schema(description = "user secret for authentication with the service")
private String userSecret;
@Schema(description = "topic")
private String topic;
}
@Schema(description = "User sharing settings")
@Data
public static class SharingSettings implements Serializable {

View File

@@ -65,6 +65,9 @@ public class Subscription implements Serializable {
@Schema(description = "JEXL legacy filter")
private String filterLegacy;
@Schema(description = "whether to send push notifications for new entries of this feed", required = true)
private boolean pushNotificationsEnabled;
public static Subscription build(FeedSubscription subscription, UnreadCount unreadCount) {
FeedCategory category = subscription.getCategory();
Feed feed = subscription.getFeed();
@@ -85,6 +88,7 @@ public class Subscription implements Serializable {
sub.setCategoryId(category == null ? null : String.valueOf(category.getId()));
sub.setFilter(subscription.getFilter());
sub.setFilterLegacy(subscription.getFilterLegacy());
sub.setPushNotificationsEnabled(subscription.isPushNotificationsEnabled());
return sub;
}

View File

@@ -31,4 +31,7 @@ public class FeedModificationRequest implements Serializable {
@Size(max = 4096)
private String filter;
@Schema(description = "whether to send push notifications for new entries of this feed")
private boolean pushNotificationsEnabled;
}

View File

@@ -28,4 +28,7 @@ public class SubscribeRequest implements Serializable {
@Size(max = 128)
private String categoryId;
@Schema(description = "whether to send push notifications for new entries of this feed")
private boolean pushNotificationsEnabled;
}

View File

@@ -365,7 +365,8 @@ public class FeedREST {
FeedInfo info = fetchFeedInternal(prependHttp(req.getUrl()));
User user = authenticationContext.getCurrentUser();
long subscriptionId = feedSubscriptionService.subscribe(user, info.getUrl(), req.getTitle(), category);
long subscriptionId = feedSubscriptionService.subscribe(user, info.getUrl(), req.getTitle(), category, 0,
req.isPushNotificationsEnabled());
return Response.ok(subscriptionId).build();
} catch (Exception e) {
log.error("Failed to subscribe to URL {}: {}", req.getUrl(), e.getMessage(), e);
@@ -384,7 +385,7 @@ public class FeedREST {
Preconditions.checkNotNull(url);
FeedInfo info = fetchFeedInternal(prependHttp(url));
User user = authenticationContext.getCurrentUser();
feedSubscriptionService.subscribe(user, info.getUrl(), info.getTitle());
feedSubscriptionService.subscribe(user, info.getUrl(), info.getTitle(), null, 0, false);
} catch (Exception e) {
log.info("Could not subscribe to url {} : {}", url, e.getMessage());
}
@@ -438,6 +439,8 @@ public class FeedREST {
subscription.setFilterLegacy(null);
}
subscription.setPushNotificationsEnabled(req.isPushNotificationsEnabled());
if (StringUtils.isNotBlank(req.getName())) {
subscription.setTitle(req.getName());
}

View File

@@ -62,6 +62,7 @@ public class ServerREST {
infos.setForceRefreshCooldownDuration(config.feedRefresh().forceRefreshCooldownDuration().toMillis());
infos.setInitialSetupRequired(databaseStartupService.isInitialSetupRequired());
infos.setMinimumPasswordLength(config.users().minimumPasswordLength());
infos.setPushNotificationsEnabled(config.pushNotifications().enabled());
return infos;
}

View File

@@ -125,6 +125,12 @@ public class UserREST {
s.setUnreadCountFavicon(settings.isUnreadCountFavicon());
s.setDisablePullToRefresh(settings.isDisablePullToRefresh());
s.setPrimaryColor(settings.getPrimaryColor());
s.getPushNotificationSettings().setType(settings.getPushNotificationType());
s.getPushNotificationSettings().setServerUrl(settings.getPushNotificationServerUrl());
s.getPushNotificationSettings().setUserId(settings.getPushNotificationUserId());
s.getPushNotificationSettings().setUserSecret(settings.getPushNotificationUserSecret());
s.getPushNotificationSettings().setTopic(settings.getPushNotificationTopic());
} else {
s.setReadingMode(ReadingMode.UNREAD);
s.setReadingOrder(ReadingOrder.DESC);
@@ -190,6 +196,12 @@ public class UserREST {
s.setDisablePullToRefresh(settings.isDisablePullToRefresh());
s.setPrimaryColor(settings.getPrimaryColor());
s.setPushNotificationType(settings.getPushNotificationSettings().getType());
s.setPushNotificationServerUrl(settings.getPushNotificationSettings().getServerUrl());
s.setPushNotificationUserId(settings.getPushNotificationSettings().getUserId());
s.setPushNotificationUserSecret(settings.getPushNotificationSettings().getUserSecret());
s.setPushNotificationTopic(settings.getPushNotificationSettings().getTopic());
s.setEmail(settings.getSharingSettings().isEmail());
s.setGmail(settings.getSharingSettings().isGmail());
s.setFacebook(settings.getSharingSettings().isFacebook());

View File

@@ -17,5 +17,20 @@
</update>
</changeSet>
<changeSet id="add-push-notification-settings" author="athou">
<addColumn tableName="USERSETTINGS">
<column name="push_notification_type" type="VARCHAR(16)" />
<column name="push_notification_server_url" type="VARCHAR(1024)" />
<column name="push_notification_user_id" type="VARCHAR(512)" />
<column name="push_notification_user_secret" type="VARCHAR(512)" />
<column name="push_notification_topic" type="VARCHAR(256)" />
</addColumn>
<addColumn tableName="FEEDSUBSCRIPTIONS">
<column name="push_notifications_enabled" type="BOOLEAN" valueBoolean="false">
<constraints nullable="false" />
</column>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@@ -6,6 +6,7 @@ import java.io.OutputStream;
import java.math.BigInteger;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
@@ -56,6 +57,7 @@ class HttpGetterTest {
private CommaFeedConfiguration config;
private HttpClientFactory provider;
private HttpGetter getter;
@BeforeEach
@@ -78,7 +80,8 @@ class HttpGetterTest {
Mockito.when(config.httpClient().cache().expiration()).thenReturn(Duration.ofMinutes(1));
Mockito.when(config.feedRefresh().httpThreads()).thenReturn(3);
this.getter = new HttpGetter(config, () -> NOW, Mockito.mock(CommaFeedVersion.class), Mockito.mock(MetricRegistry.class));
this.provider = new HttpClientFactory(config, Mockito.mock(CommaFeedVersion.class));
this.getter = new HttpGetter(config, () -> NOW, provider, Mockito.mock(MetricRegistry.class));
}
@ParameterizedTest
@@ -172,7 +175,7 @@ class HttpGetterTest {
@Test
void dataTimeout() {
Mockito.when(config.httpClient().responseTimeout()).thenReturn(Duration.ofMillis(500));
this.getter = new HttpGetter(config, () -> NOW, Mockito.mock(CommaFeedVersion.class), Mockito.mock(MetricRegistry.class));
this.getter = new HttpGetter(config, () -> NOW, provider, Mockito.mock(MetricRegistry.class));
this.mockServerClient.when(HttpRequest.request().withMethod("GET"))
.respond(HttpResponse.response().withDelay(Delay.milliseconds(1000)));
@@ -183,7 +186,7 @@ class HttpGetterTest {
@Test
void connectTimeout() {
Mockito.when(config.httpClient().connectTimeout()).thenReturn(Duration.ofMillis(500));
this.getter = new HttpGetter(config, () -> NOW, Mockito.mock(CommaFeedVersion.class), Mockito.mock(MetricRegistry.class));
this.getter = new HttpGetter(config, () -> NOW, provider, Mockito.mock(MetricRegistry.class));
// try to connect to a non-routable address
// https://stackoverflow.com/a/904609
Exception e = Assertions.assertThrows(Exception.class, () -> getter.get("http://10.255.255.1"));
@@ -367,44 +370,44 @@ class HttpGetterTest {
@BeforeEach
void init() {
Mockito.when(config.httpClient().blockLocalAddresses()).thenReturn(true);
getter = new HttpGetter(config, () -> NOW, Mockito.mock(CommaFeedVersion.class), Mockito.mock(MetricRegistry.class));
getter = new HttpGetter(config, () -> NOW, provider, Mockito.mock(MetricRegistry.class));
}
@Test
void localhost() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://localhost"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://127.0.0.1"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://2130706433"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://0x7F.0x00.0x00.0X01"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://localhost"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://127.0.0.1"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://2130706433"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://0x7F.0x00.0x00.0X01"));
}
@Test
void zero() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://0.0.0.0"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://0.0.0.0"));
}
@Test
void linkLocal() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://169.254.12.34"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://169.254.169.254"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://169.254.12.34"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://169.254.169.254"));
}
@Test
void multicast() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://224.2.3.4"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://239.255.255.254"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://224.2.3.4"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://239.255.255.254"));
}
@Test
void privateIpv4Ranges() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://10.0.0.1"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://172.16.0.1"));
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://192.168.0.1"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://10.0.0.1"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://172.16.0.1"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://192.168.0.1"));
}
@Test
void privateIpv6Ranges() {
Assertions.assertThrows(HttpGetter.HostNotAllowedException.class, () -> getter.get("http://fd12:3456:789a:1::1"));
Assertions.assertThrows(UnknownHostException.class, () -> getter.get("http://[fe80::215:5dff:fe15:102]"));
}
}

View File

@@ -46,7 +46,8 @@ class OPMLImporterTest {
importer.importOpml(user, xml);
Mockito.verify(feedSubscriptionService)
.subscribe(Mockito.eq(user), Mockito.anyString(), Mockito.anyString(), Mockito.any(FeedCategory.class), Mockito.anyInt());
.subscribe(Mockito.eq(user), Mockito.anyString(), Mockito.anyString(), Mockito.any(FeedCategory.class), Mockito.anyInt(),
Mockito.anyBoolean());
}
}

View File

@@ -0,0 +1,136 @@
package com.commafeed.backend.service;
import java.time.Duration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockserver.client.MockServerClient;
import org.mockserver.junit.jupiter.MockServerExtension;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.JsonBody;
import org.mockserver.model.MediaType;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.CommaFeedConfiguration;
import com.commafeed.backend.HttpClientFactory;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.model.FeedEntry;
import com.commafeed.backend.model.FeedEntryContent;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.UserSettings;
import com.commafeed.backend.model.UserSettings.PushNotificationType;
@ExtendWith(MockServerExtension.class)
class PushNotificationServiceTest {
private MockServerClient mockServerClient;
private PushNotificationService pushNotificationService;
private CommaFeedConfiguration config;
private UserSettings userSettings;
private FeedSubscription subscription;
private FeedEntry entry;
@BeforeEach
void init(MockServerClient mockServerClient) {
this.mockServerClient = mockServerClient;
this.mockServerClient.reset();
this.config = Mockito.mock(CommaFeedConfiguration.class, Mockito.RETURNS_DEEP_STUBS);
Mockito.when(config.pushNotifications().enabled()).thenReturn(true);
Mockito.when(config.pushNotifications().threads()).thenReturn(1);
Mockito.when(config.httpClient().responseTimeout()).thenReturn(Duration.ofSeconds(30));
HttpClientFactory httpClientFactory = new HttpClientFactory(config, Mockito.mock(com.commafeed.CommaFeedVersion.class));
MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class);
Mockito.when(metricRegistry.meter(Mockito.anyString())).thenReturn(Mockito.mock(Meter.class));
this.pushNotificationService = new PushNotificationService(httpClientFactory, metricRegistry, config);
this.userSettings = new UserSettings();
this.subscription = createSubscription("Test Feed");
this.entry = createEntry("Test Entry", "http://example.com/entry");
}
@Test
void testNtfyNotification() {
userSettings.setPushNotificationType(PushNotificationType.NTFY);
userSettings.setPushNotificationServerUrl("http://localhost:" + mockServerClient.getPort());
userSettings.setPushNotificationTopic("test-topic");
userSettings.setPushNotificationUserSecret("test-secret");
mockServerClient.when(HttpRequest.request()
.withMethod("POST")
.withPath("/test-topic")
.withHeader("Title", "Test Feed")
.withHeader("Click", "http://example.com/entry")
.withHeader("Authorization", "Bearer test-secret")
.withBody("Test Entry")).respond(HttpResponse.response().withStatusCode(200));
Assertions.assertDoesNotThrow(() -> pushNotificationService.notify(userSettings, subscription, entry));
}
@Test
void testGotifyNotification() {
userSettings.setPushNotificationType(PushNotificationType.GOTIFY);
userSettings.setPushNotificationServerUrl("http://localhost:" + mockServerClient.getPort());
userSettings.setPushNotificationUserSecret("gotify-token");
mockServerClient.when(HttpRequest.request()
.withMethod("POST")
.withPath("/message")
.withHeader("X-Gotify-Key", "gotify-token")
.withContentType(MediaType.APPLICATION_JSON_UTF_8)
.withBody(JsonBody.json("""
{
"title": "Test Feed",
"message": "Test Entry",
"priority": 5,
"extras": {
"client::notification": {
"click": {
"url": "http://example.com/entry"
}
}
}
}
"""))).respond(HttpResponse.response().withStatusCode(200));
Assertions.assertDoesNotThrow(() -> pushNotificationService.notify(userSettings, subscription, entry));
}
@Test
void testPushNotificationDisabled() {
Mockito.when(config.pushNotifications().enabled()).thenReturn(false);
userSettings.setPushNotificationType(PushNotificationType.NTFY);
userSettings.setPushNotificationServerUrl("http://localhost:" + mockServerClient.getPort());
userSettings.setPushNotificationTopic("test-topic");
Assertions.assertDoesNotThrow(() -> pushNotificationService.notify(userSettings, subscription, entry));
mockServerClient.verifyZeroInteractions();
}
private static FeedSubscription createSubscription(String title) {
FeedSubscription subscription = new FeedSubscription();
subscription.setTitle(title);
subscription.setFeed(new Feed());
return subscription;
}
private static FeedEntry createEntry(String title, String url) {
FeedEntry entry = new FeedEntry();
FeedEntryContent content = new FeedEntryContent();
content.setTitle(title);
entry.setContent(content);
entry.setUrl(url);
return entry;
}
}

View File

@@ -39,6 +39,7 @@ public abstract class BaseIT {
private static final HttpRequest FEED_REQUEST = HttpRequest.request().withMethod("GET").withPath("/");
@Getter
private MockServerClient mockServerClient;
private Client client;
private String feedUrl;
@@ -122,10 +123,15 @@ public abstract class BaseIT {
}
protected Long subscribe(String feedUrl, String categoryId) {
return subscribe(feedUrl, categoryId, false);
}
protected Long subscribe(String feedUrl, String categoryId, boolean pushNotificationsEnabled) {
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.setUrl(feedUrl);
subscribeRequest.setTitle("my title for this feed");
subscribeRequest.setCategoryId(categoryId);
subscribeRequest.setPushNotificationsEnabled(pushNotificationsEnabled);
return RestAssured.given()
.body(subscribeRequest)
.contentType(ContentType.JSON)

View File

@@ -0,0 +1,57 @@
package com.commafeed.integration;
import java.time.Duration;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.verify.VerificationTimes;
import com.commafeed.TestConstants;
import com.commafeed.backend.model.UserSettings.PushNotificationType;
import com.commafeed.frontend.model.Settings;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
@QuarkusTest
class PushNotificationIT extends BaseIT {
@BeforeEach
void setup() {
initialSetup(TestConstants.ADMIN_USERNAME, TestConstants.ADMIN_PASSWORD);
RestAssured.authentication = RestAssured.preemptive().basic(TestConstants.ADMIN_USERNAME, TestConstants.ADMIN_PASSWORD);
}
@AfterEach
void tearDown() {
RestAssured.reset();
}
@Test
void receivedPushNotifications() {
// mock ntfy server
HttpRequest ntfyPost = HttpRequest.request().withMethod("POST").withPath("/ntfy/integration-test");
getMockServerClient().when(ntfyPost).respond(HttpResponse.response().withStatusCode(200));
// enable push notifications
Settings settings = RestAssured.given().get("rest/user/settings").then().extract().as(Settings.class);
settings.getPushNotificationSettings().setType(PushNotificationType.NTFY);
settings.getPushNotificationSettings().setServerUrl("http://localhost:" + getMockServerClient().getPort() + "/ntfy");
settings.getPushNotificationSettings().setTopic("integration-test");
RestAssured.given().body(settings).contentType(ContentType.JSON).post("rest/user/settings").then().statusCode(200);
// subscribe with push notifications enabled
subscribe(getFeedUrl(), null, true);
// await push notification for the two entries in the feed
Awaitility.await()
.atMost(Duration.ofSeconds(20))
.untilAsserted(() -> getMockServerClient().verify(ntfyPost, VerificationTimes.exactly(2)));
}
}

View File

@@ -23,6 +23,7 @@ class ServerIT extends BaseIT {
Assertions.assertEquals(30000, serverInfos.getTreeReloadInterval());
Assertions.assertEquals(60000, serverInfos.getForceRefreshCooldownDuration());
Assertions.assertEquals(4, serverInfos.getMinimumPasswordLength());
Assertions.assertTrue(serverInfos.isPushNotificationsEnabled());
}
}