From 4ff46965c4bd73c3e1575a66c058fd6a3857ef18 Mon Sep 17 00:00:00 2001 From: Athou Date: Tue, 17 Jan 2023 21:14:38 +0100 Subject: [PATCH] add websocket support to immediately refresh tree when new entries are available --- commafeed-client/package-lock.json | 13 +++- commafeed-client/package.json | 3 +- commafeed-client/src/hooks/useWebSocket.ts | 24 ++++++++ commafeed-client/src/pages/app/Layout.tsx | 2 + commafeed-client/vite.config.ts | 1 + commafeed-server/pom.xml | 10 +-- .../com/commafeed/CommaFeedApplication.java | 14 ++++- .../backend/feed/FeedRefreshUpdater.java | 41 ++++++++++--- .../service/FeedSubscriptionService.java | 2 +- .../frontend/session/SessionHelper.java | 11 ++-- .../frontend/ws/WebSocketConfigurator.java | 42 +++++++++++++ .../frontend/ws/WebSocketEndpoint.java | 61 +++++++++++++++++++ .../frontend/ws/WebSocketMessageBuilder.java | 14 +++++ .../frontend/ws/WebSocketSessions.java | 44 +++++++++++++ pom.xml | 10 +++ 15 files changed, 268 insertions(+), 24 deletions(-) create mode 100644 commafeed-client/src/hooks/useWebSocket.ts create mode 100644 commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketConfigurator.java create mode 100644 commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketEndpoint.java create mode 100644 commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketMessageBuilder.java create mode 100644 commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketSessions.java diff --git a/commafeed-client/package-lock.json b/commafeed-client/package-lock.json index 00054cfe..9c60d960 100644 --- a/commafeed-client/package-lock.json +++ b/commafeed-client/package-lock.json @@ -36,7 +36,8 @@ "react-router-dom": "^6.4.3", "react-swipeable": "^7.0.0", "swagger-ui-react": "^4.15.2", - "tinycon": "^0.6.8" + "tinycon": "^0.6.8", + "websocket-heartbeat-js": "^1.1.0" }, "devDependencies": { "@lingui/cli": "^3.15.0", @@ -10175,6 +10176,11 @@ "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==" }, + "node_modules/websocket-heartbeat-js": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/websocket-heartbeat-js/-/websocket-heartbeat-js-1.1.0.tgz", + "integrity": "sha512-5BSa6e8LUs0I8XrZXPUxAzo5Zmd45s69WmuY+7rNUjhgSzN1YUjFs1QWQJqfuq+JKpAuwp0fdlNNxODZNHGXhA==" + }, "node_modules/whatwg-url": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", @@ -17479,6 +17485,11 @@ "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==" }, + "websocket-heartbeat-js": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/websocket-heartbeat-js/-/websocket-heartbeat-js-1.1.0.tgz", + "integrity": "sha512-5BSa6e8LUs0I8XrZXPUxAzo5Zmd45s69WmuY+7rNUjhgSzN1YUjFs1QWQJqfuq+JKpAuwp0fdlNNxODZNHGXhA==" + }, "whatwg-url": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", diff --git a/commafeed-client/package.json b/commafeed-client/package.json index 8e7cd94d..1203512d 100644 --- a/commafeed-client/package.json +++ b/commafeed-client/package.json @@ -43,7 +43,8 @@ "react-router-dom": "^6.4.3", "react-swipeable": "^7.0.0", "swagger-ui-react": "^4.15.2", - "tinycon": "^0.6.8" + "tinycon": "^0.6.8", + "websocket-heartbeat-js": "^1.1.0" }, "devDependencies": { "@lingui/cli": "^3.15.0", diff --git a/commafeed-client/src/hooks/useWebSocket.ts b/commafeed-client/src/hooks/useWebSocket.ts new file mode 100644 index 00000000..7ee42eb7 --- /dev/null +++ b/commafeed-client/src/hooks/useWebSocket.ts @@ -0,0 +1,24 @@ +import { reloadTree } from "app/slices/tree" +import { useAppDispatch } from "app/store" +import { useEffect } from "react" +import WebsocketHeartbeatJs from "websocket-heartbeat-js" + +export const useWebSocket = () => { + const dispatch = useAppDispatch() + + useEffect(() => { + const currentUrl = new URL(window.location.href) + const wsProtocol = currentUrl.protocol === "http:" ? "ws" : "wss" + const wsUrl = `${wsProtocol}://${currentUrl.hostname}:${currentUrl.port}/ws` + + const ws = new WebsocketHeartbeatJs({ url: wsUrl, pingMsg: "ping" }) + ws.onmessage = event => { + const { data } = event + if (typeof data === "string") { + if (data.startsWith("new-feed-entries:")) dispatch(reloadTree()) + } + } + + return () => ws.close() + }, [dispatch]) +} diff --git a/commafeed-client/src/pages/app/Layout.tsx b/commafeed-client/src/pages/app/Layout.tsx index f650f533..1808a531 100644 --- a/commafeed-client/src/pages/app/Layout.tsx +++ b/commafeed-client/src/pages/app/Layout.tsx @@ -25,6 +25,7 @@ import { Logo } from "components/Logo" import { OnDesktop } from "components/responsive/OnDesktop" import { OnMobile } from "components/responsive/OnMobile" import { useAppLoading } from "hooks/useAppLoading" +import { useWebSocket } from "hooks/useWebSocket" import { LoadingPage } from "pages/LoadingPage" import { ReactNode, Suspense, useEffect } from "react" import { TbPlus } from "react-icons/tb" @@ -85,6 +86,7 @@ export default function Layout({ sidebar, header }: LayoutProps) { const { loading } = useAppLoading() const mobileMenuOpen = useAppSelector(state => state.tree.mobileMenuOpen) const dispatch = useAppDispatch() + useWebSocket() useEffect(() => { dispatch(reloadSettings()) diff --git a/commafeed-client/vite.config.ts b/commafeed-client/vite.config.ts index 001e4583..1f822582 100644 --- a/commafeed-client/vite.config.ts +++ b/commafeed-client/vite.config.ts @@ -22,6 +22,7 @@ export default defineConfig({ port: 8082, proxy: { "/rest": "http://localhost:8083", + "/ws": "ws://localhost:8083", "/swagger": "http://localhost:8083", }, }, diff --git a/commafeed-server/pom.xml b/commafeed-server/pom.xml index 32f946c2..14bf0e87 100644 --- a/commafeed-server/pom.xml +++ b/commafeed-server/pom.xml @@ -38,11 +38,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - 3.10.1 - org.apache.maven.plugins maven-surefire-plugin @@ -285,6 +280,11 @@ dropwizard-web 1.5.0 + + be.tomcools + dropwizard-websocket-jee7-bundle + 2.0.0 + javax.xml.bind diff --git a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java index 5bfb3f05..0532c350 100644 --- a/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java +++ b/commafeed-server/src/main/java/com/commafeed/CommaFeedApplication.java @@ -13,6 +13,7 @@ import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; +import javax.websocket.server.ServerEndpointConfig; import org.hibernate.cfg.AvailableSettings; @@ -47,11 +48,14 @@ import com.commafeed.frontend.servlet.CustomCssServlet; import com.commafeed.frontend.servlet.LogoutServlet; import com.commafeed.frontend.servlet.NextUnreadServlet; import com.commafeed.frontend.session.SessionHelperFactoryProvider; +import com.commafeed.frontend.ws.WebSocketConfigurator; +import com.commafeed.frontend.ws.WebSocketEndpoint; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.TypeLiteral; +import be.tomcools.dropwizard.websocket.WebsocketBundle; import io.dropwizard.Application; import io.dropwizard.assets.AssetsBundle; import io.dropwizard.configuration.EnvironmentVariableSubstitutor; @@ -60,7 +64,6 @@ import io.dropwizard.db.DataSourceFactory; import io.dropwizard.forms.MultiPartBundle; import io.dropwizard.hibernate.HibernateBundle; import io.dropwizard.migrations.MigrationsBundle; -import io.dropwizard.server.DefaultServerFactory; import io.dropwizard.servlets.CacheBustingFilter; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; @@ -75,6 +78,7 @@ public class CommaFeedApplication extends Application { public static final Date STARTUP_TIME = new Date(); private HibernateBundle hibernateBundle; + private WebsocketBundle websocketBundle; @Override public String getName() { @@ -85,6 +89,7 @@ public class CommaFeedApplication extends Application { public void initialize(Bootstrap bootstrap) { bootstrap.getObjectMapper().registerModule(new MetricsModule(TimeUnit.SECONDS, TimeUnit.SECONDS, false)); + bootstrap.addBundle(websocketBundle = new WebsocketBundle<>()); bootstrap.addBundle(hibernateBundle = new HibernateBundle(AbstractModel.class, Feed.class, FeedCategory.class, FeedEntry.class, FeedEntryContent.class, FeedEntryStatus.class, FeedEntryTag.class, FeedSubscription.class, User.class, UserRole.class, UserSettings.class) { @@ -140,7 +145,6 @@ public class CommaFeedApplication extends Application { // REST resources environment.jersey().setUrlPattern("/rest/*"); - ((DefaultServerFactory) config.getServerFactory()).setJerseyRootPath("/rest/*"); environment.jersey().register(injector.getInstance(AdminREST.class)); environment.jersey().register(injector.getInstance(CategoryREST.class)); environment.jersey().register(injector.getInstance(EntryREST.class)); @@ -155,6 +159,12 @@ public class CommaFeedApplication extends Application { environment.servlets().addServlet("customCss", injector.getInstance(CustomCssServlet.class)).addMapping("/custom_css.css"); environment.servlets().addServlet("analytics.js", injector.getInstance(AnalyticsServlet.class)).addMapping("/analytics.js"); + // WebSocket endpoint + ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketEndpoint.class, "/ws") + .configurator(injector.getInstance(WebSocketConfigurator.class)) + .build(); + websocketBundle.addEndpoint(serverEndpointConfig); + // Scheduled tasks Set tasks = injector.getInstance(Key.get(new TypeLiteral>() { })); diff --git a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java index a819c69c..c23479ea 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/feed/FeedRefreshUpdater.java @@ -32,9 +32,12 @@ import com.commafeed.backend.model.FeedSubscription; import com.commafeed.backend.model.User; import com.commafeed.backend.service.FeedUpdateService; import com.commafeed.backend.service.PubSubService; +import com.commafeed.frontend.ws.WebSocketMessageBuilder; +import com.commafeed.frontend.ws.WebSocketSessions; import com.google.common.util.concurrent.Striped; import io.dropwizard.lifecycle.Managed; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -48,6 +51,7 @@ public class FeedRefreshUpdater implements Managed { private final CommaFeedConfiguration config; private final FeedSubscriptionDAO feedSubscriptionDAO; private final CacheService cache; + private final WebSocketSessions webSocketSessions; private final FeedRefreshExecutor pool; private final Striped locks; @@ -60,7 +64,7 @@ public class FeedRefreshUpdater implements Managed { @Inject public FeedRefreshUpdater(SessionFactory sessionFactory, FeedUpdateService feedUpdateService, PubSubService pubSubService, FeedQueues queues, CommaFeedConfiguration config, MetricRegistry metrics, FeedSubscriptionDAO feedSubscriptionDAO, - CacheService cache) { + CacheService cache, WebSocketSessions webSocketSessions) { this.sessionFactory = sessionFactory; this.feedUpdateService = feedUpdateService; this.pubSubService = pubSubService; @@ -68,6 +72,7 @@ public class FeedRefreshUpdater implements Managed { this.config = config; this.feedSubscriptionDAO = feedSubscriptionDAO; this.cache = cache; + this.webSocketSessions = webSocketSessions; ApplicationSettings settings = config.getApplicationSettings(); int threads = Math.max(settings.getDatabaseUpdateThreads(), 1); @@ -94,8 +99,9 @@ public class FeedRefreshUpdater implements Managed { pool.execute(new EntryTask(context)); } - private boolean addEntry(final Feed feed, final FeedEntry entry, final List subscriptions) { - boolean success = false; + private AddEntryResult addEntry(final Feed feed, final FeedEntry entry, final List subscriptions) { + boolean processed = false; + boolean inserted = false; // lock on feed, make sure we are not updating the same feed twice at // the same time @@ -112,14 +118,15 @@ public class FeedRefreshUpdater implements Managed { boolean locked1 = false; boolean locked2 = false; try { + // try to lock, give up after 1 minute locked1 = lock1.tryLock(1, TimeUnit.MINUTES); locked2 = lock2.tryLock(1, TimeUnit.MINUTES); if (locked1 && locked2) { - boolean inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions)); + processed = true; + inserted = UnitOfWork.call(sessionFactory, () -> feedUpdateService.addEntry(feed, entry, subscriptions)); if (inserted) { entryInserted.mark(); } - success = true; } else { log.error("lock timeout for " + feed.getUrl() + " - " + key1); } @@ -133,7 +140,7 @@ public class FeedRefreshUpdater implements Managed { lock2.unlock(); } } - return success; + return new AddEntryResult(processed, inserted); } private void handlePubSub(final Feed feed) { @@ -169,7 +176,9 @@ public class FeedRefreshUpdater implements Managed { @Override public void run() { - boolean ok = true; + boolean processed = true; + boolean insertedAtLeastOneEntry = false; + final Feed feed = context.getFeed(); List entries = context.getEntries(); if (entries.isEmpty()) { @@ -186,7 +195,10 @@ public class FeedRefreshUpdater implements Managed { if (subscriptions == null) { subscriptions = UnitOfWork.call(sessionFactory, () -> feedSubscriptionDAO.findByFeed(feed)); } - ok &= addEntry(feed, entry, subscriptions); + AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions); + processed &= addEntryResult.processed; + insertedAtLeastOneEntry |= addEntryResult.inserted; + entryCacheMiss.mark(); } else { log.debug("cache hit for {}", entry.getUrl()); @@ -199,17 +211,20 @@ public class FeedRefreshUpdater implements Managed { if (subscriptions == null) { feed.setMessage("No new entries found"); - } else if (!subscriptions.isEmpty()) { + } else if (insertedAtLeastOneEntry) { List users = subscriptions.stream().map(FeedSubscription::getUser).collect(Collectors.toList()); cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0])); cache.invalidateUserRootCategory(users.toArray(new User[0])); + + // notify over websocket + subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub))); } } if (config.getApplicationSettings().getPubsubhubbub()) { handlePubSub(feed); } - if (!ok) { + if (!processed) { // requeue asap feed.setDisabledUntil(new Date(0)); } @@ -223,4 +238,10 @@ public class FeedRefreshUpdater implements Managed { } } + @AllArgsConstructor + private static class AddEntryResult { + private final boolean processed; + private final boolean inserted; + } + } diff --git a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java index c675f170..263f5341 100644 --- a/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java +++ b/commafeed-server/src/main/java/com/commafeed/backend/service/FeedSubscriptionService.java @@ -76,7 +76,7 @@ public class FeedSubscriptionService { sub.setTitle(FeedUtils.truncate(title, 128)); feedSubscriptionDAO.saveOrUpdate(sub); - queues.add(feed, false); + queues.add(feed, true); cache.invalidateUserRootCategory(user); return sub.getId(); } diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/session/SessionHelper.java b/commafeed-server/src/main/java/com/commafeed/frontend/session/SessionHelper.java index 143e35c1..3c385f0c 100644 --- a/commafeed-server/src/main/java/com/commafeed/frontend/session/SessionHelper.java +++ b/commafeed-server/src/main/java/com/commafeed/frontend/session/SessionHelper.java @@ -9,7 +9,7 @@ import com.commafeed.backend.model.User; import lombok.RequiredArgsConstructor; -@RequiredArgsConstructor() +@RequiredArgsConstructor public class SessionHelper { private static final String SESSION_KEY_USER = "user"; @@ -18,15 +18,18 @@ public class SessionHelper { public Optional getLoggedInUser() { Optional session = getSession(false); - if (session.isPresent()) { - User user = (User) session.get().getAttribute(SESSION_KEY_USER); - return Optional.ofNullable(user); + return getLoggedInUser(session.get()); } return Optional.empty(); } + public static Optional getLoggedInUser(HttpSession session) { + User user = (User) session.getAttribute(SESSION_KEY_USER); + return Optional.ofNullable(user); + } + public void setLoggedInUser(User user) { Optional session = getSession(true); session.get().setAttribute(SESSION_KEY_USER, user); diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketConfigurator.java b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketConfigurator.java new file mode 100644 index 00000000..38aafcc9 --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketConfigurator.java @@ -0,0 +1,42 @@ +package com.commafeed.frontend.ws; + +import java.util.Optional; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.servlet.http.HttpSession; +import javax.websocket.HandshakeResponse; +import javax.websocket.server.HandshakeRequest; +import javax.websocket.server.ServerEndpointConfig; +import javax.websocket.server.ServerEndpointConfig.Configurator; + +import com.commafeed.backend.model.User; +import com.commafeed.frontend.session.SessionHelper; + +import lombok.RequiredArgsConstructor; + +@Singleton +@RequiredArgsConstructor(onConstructor = @__({ @Inject })) +public class WebSocketConfigurator extends Configurator { + + public static final String SESSIONKEY_USERID = "userId"; + + private final WebSocketSessions webSocketSessions; + + @Override + public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) { + HttpSession httpSession = (HttpSession) request.getHttpSession(); + if (httpSession != null) { + Optional user = SessionHelper.getLoggedInUser(httpSession); + if (user.isPresent()) { + config.getUserProperties().put(SESSIONKEY_USERID, user.get().getId()); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public T getEndpointInstance(Class endpointClass) throws InstantiationException { + return (T) new WebSocketEndpoint(webSocketSessions); + } +} diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketEndpoint.java b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketEndpoint.java new file mode 100644 index 00000000..67fd3c9f --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketEndpoint.java @@ -0,0 +1,61 @@ +package com.commafeed.frontend.ws; + +import java.io.IOException; + +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCodes; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.Session; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Singleton +@RequiredArgsConstructor(onConstructor = @__({ @Inject })) +public class WebSocketEndpoint extends Endpoint { + + private final WebSocketSessions sessions; + + @Override + public void onOpen(Session session, EndpointConfig config) { + Long userId = (Long) config.getUserProperties().get(WebSocketConfigurator.SESSIONKEY_USERID); + if (userId == null) { + reject(session); + } else { + log.debug("created websocket session for user {}", userId); + sessions.add(userId, session); + } + + // converting this anonymous inner class to a lambda causes the following error when a message is sent from the client + // Unable to find decoder for type + // this error is only visible when registering a listener to ws.onclose on the client + session.addMessageHandler(new MessageHandler.Whole() { + @Override + public void onMessage(String message) { + if ("ping".equals(message)) { + session.getAsyncRemote().sendText("pong"); + } + } + }); + } + + private void reject(Session session) { + try { + session.close(new CloseReason(CloseCodes.VIOLATED_POLICY, "unauthorized")); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onClose(Session session, CloseReason reason) { + sessions.remove(session); + + } + +} diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketMessageBuilder.java b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketMessageBuilder.java new file mode 100644 index 00000000..cf35eb8d --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketMessageBuilder.java @@ -0,0 +1,14 @@ +package com.commafeed.frontend.ws; + +import com.commafeed.backend.model.FeedSubscription; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class WebSocketMessageBuilder { + + public static final String newFeedEntries(FeedSubscription subscription) { + return String.format("%s:%s", "new-feed-entries", subscription.getId()); + } + +} diff --git a/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketSessions.java b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketSessions.java new file mode 100644 index 00000000..aadb1e57 --- /dev/null +++ b/commafeed-server/src/main/java/com/commafeed/frontend/ws/WebSocketSessions.java @@ -0,0 +1,44 @@ +package com.commafeed.frontend.ws; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import javax.inject.Singleton; +import javax.websocket.Session; + +import com.commafeed.backend.model.User; + +import lombok.extern.slf4j.Slf4j; + +@Singleton +@Slf4j +public class WebSocketSessions { + + // a user may have multiple sessions (two tabs, on mobile, ...) + private final Map> sessions = new ConcurrentHashMap<>(); + + public void add(Long userId, Session session) { + sessions.computeIfAbsent(userId, v -> ConcurrentHashMap.newKeySet()).add(session); + } + + public void remove(Session session) { + sessions.values().forEach(v -> v.removeIf(e -> e.equals(session))); + } + + public void sendMessage(User user, String text) { + Set userSessions = sessions.entrySet() + .stream() + .filter(e -> e.getKey().equals(user.getId())) + .flatMap(e -> e.getValue().stream()) + .collect(Collectors.toSet()); + + log.debug("sending '{}' to {} users via websocket", text, userSessions.size()); + for (Session userSession : userSessions) { + if (userSession.isOpen()) { + userSession.getAsyncRemote().sendText(text); + } + } + } +} diff --git a/pom.xml b/pom.xml index bb0493c6..4ce1cf30 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,16 @@ + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.10.1 + + + + commafeed-client commafeed-server