forked from Archives/Athou_commafeed
websocket notification now takes entry filtering into account (#1191)
This commit is contained in:
@@ -26,8 +26,8 @@ public class FeedEntryDAO extends GenericDAO<FeedEntry> {
|
||||
super(sessionFactory);
|
||||
}
|
||||
|
||||
public Long findExisting(String guidHash, Feed feed) {
|
||||
return query().select(entry.id).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne();
|
||||
public FeedEntry findExisting(String guidHash, Feed feed) {
|
||||
return query().select(entry).from(entry).where(entry.guidHash.eq(guidHash), entry.feed.eq(feed)).limit(1).fetchOne();
|
||||
}
|
||||
|
||||
public List<FeedCapacity> findFeedsExceedingCapacity(long maxCapacity, long max) {
|
||||
|
||||
@@ -3,8 +3,11 @@ package com.commafeed.backend.feed;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
@@ -20,6 +23,7 @@ import com.commafeed.backend.dao.UnitOfWork;
|
||||
import com.commafeed.backend.feed.parser.FeedParserResult.Content;
|
||||
import com.commafeed.backend.feed.parser.FeedParserResult.Entry;
|
||||
import com.commafeed.backend.model.Feed;
|
||||
import com.commafeed.backend.model.FeedEntry;
|
||||
import com.commafeed.backend.model.FeedSubscription;
|
||||
import com.commafeed.backend.model.User;
|
||||
import com.commafeed.backend.service.FeedEntryService;
|
||||
@@ -75,6 +79,7 @@ public class FeedRefreshUpdater {
|
||||
private AddEntryResult addEntry(final Feed feed, final Entry entry, final List<FeedSubscription> subscriptions) {
|
||||
boolean processed = false;
|
||||
boolean inserted = false;
|
||||
Set<FeedSubscription> subscriptionsForWhichEntryIsUnread = new HashSet<>();
|
||||
|
||||
// lock on feed, make sure we are not updating the same feed twice at
|
||||
// the same time
|
||||
@@ -96,10 +101,21 @@ public class FeedRefreshUpdater {
|
||||
locked2 = lock2.tryLock(1, TimeUnit.MINUTES);
|
||||
if (locked1 && locked2) {
|
||||
processed = true;
|
||||
inserted = unitOfWork.call(() -> feedEntryService.addEntry(feed, entry, subscriptions));
|
||||
if (inserted) {
|
||||
entryInserted.mark();
|
||||
}
|
||||
inserted = unitOfWork.call(() -> {
|
||||
Instant now = Instant.now();
|
||||
FeedEntry feedEntry = feedEntryService.findOrCreate(feed, entry);
|
||||
boolean newEntry = !feedEntry.getInserted().isBefore(now);
|
||||
if (newEntry) {
|
||||
entryInserted.mark();
|
||||
for (FeedSubscription sub : subscriptions) {
|
||||
boolean unread = feedEntryService.applyFilter(sub, feedEntry);
|
||||
if (unread) {
|
||||
subscriptionsForWhichEntryIsUnread.add(sub);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newEntry;
|
||||
});
|
||||
} else {
|
||||
log.error("lock timeout for " + feed.getUrl() + " - " + key1);
|
||||
}
|
||||
@@ -113,12 +129,13 @@ public class FeedRefreshUpdater {
|
||||
lock2.unlock();
|
||||
}
|
||||
}
|
||||
return new AddEntryResult(processed, inserted);
|
||||
return new AddEntryResult(processed, inserted, subscriptionsForWhichEntryIsUnread);
|
||||
}
|
||||
|
||||
public boolean update(Feed feed, List<Entry> entries) {
|
||||
boolean processed = true;
|
||||
long inserted = 0;
|
||||
Map<FeedSubscription, Long> unreadCountBySubscription = new HashMap<>();
|
||||
|
||||
if (!entries.isEmpty()) {
|
||||
Set<String> lastEntries = cache.getLastEntries(feed);
|
||||
@@ -135,6 +152,7 @@ public class FeedRefreshUpdater {
|
||||
AddEntryResult addEntryResult = addEntry(feed, entry, subscriptions);
|
||||
processed &= addEntryResult.processed;
|
||||
inserted += addEntryResult.inserted ? 1 : 0;
|
||||
addEntryResult.subscriptionsForWhichEntryIsUnread.forEach(sub -> unreadCountBySubscription.merge(sub, 1L, Long::sum));
|
||||
|
||||
entryCacheMiss.mark();
|
||||
} else {
|
||||
@@ -153,7 +171,7 @@ public class FeedRefreshUpdater {
|
||||
cache.invalidateUnreadCount(subscriptions.toArray(new FeedSubscription[0]));
|
||||
cache.invalidateUserRootCategory(users.toArray(new User[0]));
|
||||
|
||||
notifyOverWebsocket(subscriptions, inserted);
|
||||
notifyOverWebsocket(unreadCountBySubscription);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,14 +189,16 @@ public class FeedRefreshUpdater {
|
||||
return processed;
|
||||
}
|
||||
|
||||
private void notifyOverWebsocket(List<FeedSubscription> subscriptions, long inserted) {
|
||||
subscriptions.forEach(sub -> webSocketSessions.sendMessage(sub.getUser(), WebSocketMessageBuilder.newFeedEntries(sub, inserted)));
|
||||
private void notifyOverWebsocket(Map<FeedSubscription, Long> unreadCountBySubscription) {
|
||||
unreadCountBySubscription.forEach((sub, unreadCount) -> webSocketSessions.sendMessage(sub.getUser(),
|
||||
WebSocketMessageBuilder.newFeedEntries(sub, unreadCount)));
|
||||
}
|
||||
|
||||
@AllArgsConstructor
|
||||
private static class AddEntryResult {
|
||||
private final boolean processed;
|
||||
private final boolean inserted;
|
||||
private final Set<FeedSubscription> subscriptionsForWhichEntryIsUnread;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ import com.commafeed.backend.model.FeedEntry;
|
||||
import com.commafeed.backend.model.FeedEntryStatus;
|
||||
import com.commafeed.backend.model.FeedSubscription;
|
||||
import com.commafeed.backend.model.User;
|
||||
import com.commafeed.backend.service.FeedEntryFilteringService.FeedEntryFilterException;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
@@ -38,33 +39,34 @@ public class FeedEntryService {
|
||||
/**
|
||||
* this is NOT thread-safe
|
||||
*/
|
||||
public boolean addEntry(Feed feed, Entry entry, List<FeedSubscription> subscriptions) {
|
||||
public FeedEntry findOrCreate(Feed feed, Entry entry) {
|
||||
String guid = FeedUtils.truncate(entry.guid(), 2048);
|
||||
String guidHash = DigestUtils.sha1Hex(entry.guid());
|
||||
Long existing = feedEntryDAO.findExisting(guidHash, feed);
|
||||
FeedEntry existing = feedEntryDAO.findExisting(guidHash, feed);
|
||||
if (existing != null) {
|
||||
return false;
|
||||
return existing;
|
||||
}
|
||||
|
||||
FeedEntry feedEntry = buildEntry(feed, entry, guid, guidHash);
|
||||
feedEntryDAO.saveOrUpdate(feedEntry);
|
||||
return feedEntry;
|
||||
}
|
||||
|
||||
// if filter does not match the entry, mark it as read
|
||||
for (FeedSubscription sub : subscriptions) {
|
||||
boolean matches = true;
|
||||
try {
|
||||
matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), feedEntry);
|
||||
} catch (FeedEntryFilteringService.FeedEntryFilterException e) {
|
||||
log.error("could not evaluate filter {}", sub.getFilter(), e);
|
||||
}
|
||||
if (!matches) {
|
||||
FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, feedEntry);
|
||||
status.setRead(true);
|
||||
feedEntryStatusDAO.saveOrUpdate(status);
|
||||
}
|
||||
public boolean applyFilter(FeedSubscription sub, FeedEntry entry) {
|
||||
boolean matches = true;
|
||||
try {
|
||||
matches = feedEntryFilteringService.filterMatchesEntry(sub.getFilter(), entry);
|
||||
} catch (FeedEntryFilterException e) {
|
||||
log.error("could not evaluate filter {}", sub.getFilter(), e);
|
||||
}
|
||||
|
||||
return true;
|
||||
if (!matches) {
|
||||
FeedEntryStatus status = new FeedEntryStatus(sub.getUser(), sub, entry);
|
||||
status.setRead(true);
|
||||
feedEntryStatusDAO.saveOrUpdate(status);
|
||||
}
|
||||
|
||||
return matches;
|
||||
}
|
||||
|
||||
private FeedEntry buildEntry(Feed feed, Entry e, String guid, String guidHash) {
|
||||
|
||||
Reference in New Issue
Block a user