add websocket support to immediately refresh tree when new entries are available

This commit is contained in:
Athou
2023-01-17 21:14:38 +01:00
parent 33e3f7ea3c
commit 4ff46965c4
15 changed files with 268 additions and 24 deletions

View File

@@ -13,6 +13,7 @@ import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.websocket.server.ServerEndpointConfig;
import org.hibernate.cfg.AvailableSettings;
@@ -47,11 +48,14 @@ import com.commafeed.frontend.servlet.CustomCssServlet;
import com.commafeed.frontend.servlet.LogoutServlet;
import com.commafeed.frontend.servlet.NextUnreadServlet;
import com.commafeed.frontend.session.SessionHelperFactoryProvider;
import com.commafeed.frontend.ws.WebSocketConfigurator;
import com.commafeed.frontend.ws.WebSocketEndpoint;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import be.tomcools.dropwizard.websocket.WebsocketBundle;
import io.dropwizard.Application;
import io.dropwizard.assets.AssetsBundle;
import io.dropwizard.configuration.EnvironmentVariableSubstitutor;
@@ -60,7 +64,6 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.forms.MultiPartBundle;
import io.dropwizard.hibernate.HibernateBundle;
import io.dropwizard.migrations.MigrationsBundle;
import io.dropwizard.server.DefaultServerFactory;
import io.dropwizard.servlets.CacheBustingFilter;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
@@ -75,6 +78,7 @@ public class CommaFeedApplication extends Application<CommaFeedConfiguration> {
public static final Date STARTUP_TIME = new Date();
private HibernateBundle<CommaFeedConfiguration> hibernateBundle;
private WebsocketBundle<CommaFeedConfiguration> websocketBundle;
@Override
public String getName() {
@@ -85,6 +89,7 @@ public class CommaFeedApplication extends Application<CommaFeedConfiguration> {
public void initialize(Bootstrap<CommaFeedConfiguration> bootstrap) {
bootstrap.getObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.SECONDS, false));
bootstrap.addBundle(websocketBundle = new WebsocketBundle<>());
bootstrap.addBundle(hibernateBundle = new HibernateBundle<CommaFeedConfiguration>(AbstractModel.class, Feed.class,
FeedCategory.class, FeedEntry.class, FeedEntryContent.class, FeedEntryStatus.class, FeedEntryTag.class,
FeedSubscription.class, User.class, UserRole.class, UserSettings.class) {
@@ -140,7 +145,6 @@ public class CommaFeedApplication extends Application<CommaFeedConfiguration> {
// REST resources
environment.jersey().setUrlPattern("/rest/*");
((DefaultServerFactory) config.getServerFactory()).setJerseyRootPath("/rest/*");
environment.jersey().register(injector.getInstance(AdminREST.class));
environment.jersey().register(injector.getInstance(CategoryREST.class));
environment.jersey().register(injector.getInstance(EntryREST.class));
@@ -155,6 +159,12 @@ public class CommaFeedApplication extends Application<CommaFeedConfiguration> {
environment.servlets().addServlet("customCss", injector.getInstance(CustomCssServlet.class)).addMapping("/custom_css.css");
environment.servlets().addServlet("analytics.js", injector.getInstance(AnalyticsServlet.class)).addMapping("/analytics.js");
// WebSocket endpoint
ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketEndpoint.class, "/ws")
.configurator(injector.getInstance(WebSocketConfigurator.class))
.build();
websocketBundle.addEndpoint(serverEndpointConfig);
// Scheduled tasks
Set<ScheduledTask> tasks = injector.getInstance(Key.get(new TypeLiteral<Set<ScheduledTask>>() {
}));

View File

@@ -32,9 +32,12 @@ import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.commafeed.backend.service.FeedUpdateService;
import com.commafeed.backend.service.PubSubService;
import com.commafeed.frontend.ws.WebSocketMessageBuilder;
import com.commafeed.frontend.ws.WebSocketSessions;
import com.google.common.util.concurrent.Striped;
import io.dropwizard.lifecycle.Managed;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -48,6 +51,7 @@ public class FeedRefreshUpdater implements Managed {
private final CommaFeedConfiguration config;
private final FeedSubscriptionDAO feedSubscriptionDAO;
private final CacheService cache;
private final WebSocketSessions webSocketSessions;
private final FeedRefreshExecutor pool;
private final Striped<Lock> locks;
@@ -60,7 +64,7 @@ public class FeedRefreshUpdater implements Managed {
@Inject
public FeedRefreshUpdater(SessionFactory sessionFactory, FeedUpdateService feedUpdateService, PubSubService pubSubService,
FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO,
CacheService cache) {
CacheService cache, WebSocketSessions webSocketSessions) {
this.sessionFactory = sessionFactory;
this.feedUpdateService = feedUpdateService;
this.pubSubService = pubSubService;
@@ -68,6 +72,7 @@ public class FeedRefreshUpdater implements Managed {
this.config = config;
this.feedSubscriptionDAO = feedSubscriptionDAO;
this.cache = cache;
this.webSocketSessions = webSocketSessions;
ApplicationSettings settings = config.getApplicationSettings();
int threads = Math.max(settings.getDatabaseUpdateThreads(), 1);
@@ -94,8 +99,9 @@ public class FeedRefreshUpdater implements Managed {
pool.execute(new EntryTask(context));
}
private boolean addEntry(final Feed feed, final FeedEntry entry, final List<FeedSubscription> subscriptions) {
boolean success = false;
private AddEntryResult addEntry(final Feed feed, final FeedEntry entry, final List<FeedSubscription> subscriptions) {
boolean processed = false;
boolean inserted = false;
// lock on feed, make sure we are not updating the same feed twice at
// the same time
@@ -112,14 +118,15 @@ public class FeedRefreshUpdater implements Managed {
boolean locked1 = false;
boolean locked2 = false;
try {
// try to lock, give up after 1 minute
locked1 = lock1.tryLock(1, TimeUnit.MINUTES);
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
if (locked1 && locked2) {
boolean inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions));
processed = true;
inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions));
if (inserted) {
entryInserted.mark();
}
success = true;
} else {
log.error("lock timeout for " + feed.getUrl() + " - " + key1);
}
@@ -133,7 +140,7 @@ public class FeedRefreshUpdater implements Managed {
lock2.unlock();
}
}
return success;
return new AddEntryResult(processed, inserted);
}
private void handlePubSub(final Feed feed) {
@@ -169,7 +176,9 @@ public class FeedRefreshUpdater implements Managed {
@Override
public void run() {
boolean ok = true;
boolean processed = true;
boolean insertedAtLeastOneEntry = false;
final Feed feed = context.getFeed();
List<FeedEntry> entries = context.getEntries();
if (entries.isEmpty()) {
@@ -186,7 +195,10 @@ public class FeedRefreshUpdater implements Managed {
if (subscriptions == null) {
subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed));
}
ok &= addEntry(feed, entry, subscriptions);
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
processed &= addEntryResult.processed;
insertedAtLeastOneEntry |= addEntryResult.inserted;
entryCacheMiss.mark();
} else {
log.debug("cache hit for {}", entry.getUrl());
@@ -199,17 +211,20 @@ public class FeedRefreshUpdater implements Managed {
if (subscriptions == null) {
feed.setMessage("No new entries found");
} else if (!subscriptions.isEmpty()) {
} else if (insertedAtLeastOneEntry) {
List<User> users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList());
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
cache.invalidateUserRootCategory(users.toArray(new User[0]));
// notify over websocket
subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub)));
}
}
if (config.getApplicationSettings().getPubsubhubbub()) {
handlePubSub(feed);
}
if (!ok) {
if (!processed) {
// requeue asap
feed.setDisabledUntil(new Date(0));
}
@@ -223,4 +238,10 @@ public class FeedRefreshUpdater implements Managed {
}
}
@AllArgsConstructor
private static class AddEntryResult {
private final boolean processed;
private final boolean inserted;
}
}

View File

@@ -76,7 +76,7 @@ public class FeedSubscriptionService {
sub.setTitle(FeedUtils.truncate(title, 128));
feedSubscriptionDAO.saveOrUpdate(sub);
queues.add(feed, false);
queues.add(feed, true);
cache.invalidateUserRootCategory(user);
return sub.getId();
}

View File

@@ -9,7 +9,7 @@ import com.commafeed.backend.model.User;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor()
@RequiredArgsConstructor
public class SessionHelper {
private static final String SESSION_KEY_USER = "user";
@@ -18,15 +18,18 @@ public class SessionHelper {
public Optional<User> getLoggedInUser() {
Optional<HttpSession> session = getSession(false);
if (session.isPresent()) {
User user = (User) session.get().getAttribute(SESSION_KEY_USER);
return Optional.ofNullable(user);
return getLoggedInUser(session.get());
}
return Optional.empty();
}
public static Optional<User> getLoggedInUser(HttpSession session) {
User user = (User) session.getAttribute(SESSION_KEY_USER);
return Optional.ofNullable(user);
}
public void setLoggedInUser(User user) {
Optional<HttpSession> session = getSession(true);
session.get().setAttribute(SESSION_KEY_USER, user);

View File

@@ -0,0 +1,42 @@
package com.commafeed.frontend.ws;
import java.util.Optional;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
import com.commafeed.backend.model.User;
import com.commafeed.frontend.session.SessionHelper;
import lombok.RequiredArgsConstructor;
@Singleton
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class WebSocketConfigurator extends Configurator {
public static final String SESSIONKEY_USERID = "userId";
private final WebSocketSessions webSocketSessions;
@Override
public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession) request.getHttpSession();
if (httpSession != null) {
Optional<User> user = SessionHelper.getLoggedInUser(httpSession);
if (user.isPresent()) {
config.getUserProperties().put(SESSIONKEY_USERID, user.get().getId());
}
}
}
@SuppressWarnings("unchecked")
@Override
public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException {
return (T) new WebSocketEndpoint(webSocketSessions);
}
}

View File

@@ -0,0 +1,61 @@
package com.commafeed.frontend.ws;
import java.io.IOException;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor = @__({ @Inject }))
public class WebSocketEndpoint extends Endpoint {
private final WebSocketSessions sessions;
@Override
public void onOpen(Session session, EndpointConfig config) {
Long userId = (Long) config.getUserProperties().get(WebSocketConfigurator.SESSIONKEY_USERID);
if (userId == null) {
reject(session);
} else {
log.debug("created websocket session for user {}", userId);
sessions.add(userId, session);
}
// converting this anonymous inner class to a lambda causes the following error when a message is sent from the client
// Unable to find decoder for type <javax.websocket.MessageHandler$Whole>
// this error is only visible when registering a listener to ws.onclose on the client
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
if ("ping".equals(message)) {
session.getAsyncRemote().sendText("pong");
}
}
});
}
private void reject(Session session) {
try {
session.close(new CloseReason(CloseCodes.VIOLATED_POLICY, "unauthorized"));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void onClose(Session session, CloseReason reason) {
sessions.remove(session);
}
}

View File

@@ -0,0 +1,14 @@
package com.commafeed.frontend.ws;
import com.commafeed.backend.model.FeedSubscription;
import lombok.experimental.UtilityClass;
@UtilityClass
public class WebSocketMessageBuilder {
public static final String newFeedEntries(FeedSubscription subscription) {
return String.format("%s:%s", "new-feed-entries", subscription.getId());
}
}

View File

@@ -0,0 +1,44 @@
package com.commafeed.frontend.ws;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.inject.Singleton;
import javax.websocket.Session;
import com.commafeed.backend.model.User;
import lombok.extern.slf4j.Slf4j;
@Singleton
@Slf4j
public class WebSocketSessions {
// a user may have multiple sessions (two tabs, on mobile, ...)
private final Map<Long, Set<Session>> sessions = new ConcurrentHashMap<>();
public void add(Long userId, Session session) {
sessions.computeIfAbsent(userId, v -> ConcurrentHashMap.newKeySet()).add(session);
}
public void remove(Session session) {
sessions.values().forEach(v -> v.removeIf(e -> e.equals(session)));
}
public void sendMessage(User user, String text) {
Set<Session> userSessions = sessions.entrySet()
.stream()
.filter(e -> e.getKey().equals(user.getId()))
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toSet());
log.debug("sending '{}' to {} users via websocket", text, userSessions.size());
for (Session userSession : userSessions) {
if (userSession.isOpen()) {
userSession.getAsyncRemote().sendText(text);
}
}
}
}