use codahale metrics library instead of our own

This commit is contained in:
Athou
2013-08-18 16:29:07 +02:00
parent a0be2e0879
commit ee4eb9bb07
21 changed files with 130 additions and 257 deletions

View File

@@ -7,6 +7,9 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
/**
* Wraps a {@link ThreadPoolExecutor} instance. Blocks when queue is full instead of rejecting the task. Allow priority queueing by using
* {@link Task} instead of {@link Runnable}
@@ -19,7 +22,7 @@ public class FeedRefreshExecutor {
private ThreadPoolExecutor pool;
private LinkedBlockingDeque<Runnable> queue;
public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity) {
public FeedRefreshExecutor(final String poolName, int threads, int queueCapacity, MetricRegistry metrics) {
log.info("Creating pool {} with {} threads", poolName, threads);
this.poolName = poolName;
pool = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queue = new LinkedBlockingDeque<Runnable>(queueCapacity) {
@@ -51,20 +54,26 @@ public class FeedRefreshExecutor {
}
}
});
metrics.register(MetricRegistry.name(getClass(), poolName, "active"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return pool.getActiveCount();
}
});
metrics.register(MetricRegistry.name(getClass(), poolName, "pending"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return queue.size();
}
});
}
public void execute(Task task) {
pool.execute(task);
}
public int getQueueSize() {
return queue.size();
}
public int getActiveCount() {
return pool.getActiveCount();
}
public static interface Task extends Runnable {
boolean isUrgent();
}

View File

@@ -17,7 +17,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.time.DateUtils;
import com.commafeed.backend.MetricsBean;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.model.Feed;
import com.commafeed.backend.services.ApplicationSettingsService;
@@ -41,7 +42,7 @@ public class FeedRefreshTaskGiver {
ApplicationSettingsService applicationSettingsService;
@Inject
MetricsBean metricsBean;
MetricRegistry metrics;
@Inject
FeedRefreshWorker worker;
@@ -54,10 +55,15 @@ public class FeedRefreshTaskGiver {
private ExecutorService executor;
private Meter feedRefreshed;
private Meter threadWaited;
@PostConstruct
public void init() {
backgroundThreads = applicationSettingsService.get().getBackgroundThreads();
executor = Executors.newFixedThreadPool(1);
feedRefreshed = metrics.meter(MetricRegistry.name(getClass(), "feedRefreshed"));
threadWaited = metrics.meter(MetricRegistry.name(getClass(), "threadWaited"));
}
@PreDestroy
@@ -88,11 +94,11 @@ public class FeedRefreshTaskGiver {
try {
FeedRefreshContext context = take();
if (context != null) {
metricsBean.feedRefreshed();
feedRefreshed.mark();
worker.updateFeed(context);
} else {
log.debug("nothing to do, sleeping for 15s");
metricsBean.threadWaited();
threadWaited.mark();
try {
Thread.sleep(15000);
} catch (InterruptedException e) {

View File

@@ -19,7 +19,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import com.commafeed.backend.MetricsBean;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.backend.cache.CacheService;
import com.commafeed.backend.dao.FeedDAO;
import com.commafeed.backend.dao.FeedEntryDAO;
@@ -57,7 +58,7 @@ public class FeedRefreshUpdater {
ApplicationSettingsService applicationSettingsService;
@Inject
MetricsBean metricsBean;
MetricRegistry metrics;
@Inject
FeedSubscriptionDAO feedSubscriptionDAO;
@@ -71,12 +72,22 @@ public class FeedRefreshUpdater {
private FeedRefreshExecutor pool;
private Striped<Lock> locks;
private Meter entryCacheMiss;
private Meter entryCacheHit;
private Meter feedUpdated;
private Meter entryInserted;
@PostConstruct
public void init() {
ApplicationSettings settings = applicationSettingsService.get();
int threads = Math.max(settings.getDatabaseUpdateThreads(), 1);
pool = new FeedRefreshExecutor("feed-refresh-updater", threads, Math.min(50 * threads, 1000));
pool = new FeedRefreshExecutor("feed-refresh-updater", threads, Math.min(50 * threads, 1000), metrics);
locks = Striped.lazyWeakLock(threads * 100000);
entryCacheMiss = metrics.meter(MetricRegistry.name(getClass(), "entryCacheMiss"));
entryCacheHit = metrics.meter(MetricRegistry.name(getClass(), "entryCacheHit"));
feedUpdated = metrics.meter(MetricRegistry.name(getClass(), "feedUpdated"));
entryInserted = metrics.meter(MetricRegistry.name(getClass(), "entryInserted"));
}
@PreDestroy
@@ -116,10 +127,10 @@ public class FeedRefreshUpdater {
subscriptions = feedSubscriptionDAO.findByFeed(feed);
}
ok &= addEntry(feed, entry, subscriptions);
metricsBean.entryCacheMiss();
entryCacheMiss.mark();
} else {
log.debug("cache hit for {}", entry.getUrl());
metricsBean.entryCacheHit();
entryCacheHit.mark();
}
currentEntries.add(cacheKey);
@@ -147,7 +158,7 @@ public class FeedRefreshUpdater {
// requeue asap
feed.setDisabledUntil(new Date(0));
}
metricsBean.feedUpdated();
feedUpdated.mark();
taskGiver.giveBack(feed);
}
@@ -180,7 +191,7 @@ public class FeedRefreshUpdater {
if (locked1 && locked2) {
boolean inserted = feedUpdateService.addEntry(feed, entry);
if (inserted) {
metricsBean.entryInserted();
entryInserted.mark();
}
success = true;
} else {
@@ -213,13 +224,4 @@ public class FeedRefreshUpdater {
}
}
}
public int getQueueSize() {
return pool.getQueueSize();
}
public int getActiveCount() {
return pool.getActiveCount();
}
}

View File

@@ -13,6 +13,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.time.DateUtils;
import com.codahale.metrics.MetricRegistry;
import com.commafeed.backend.HttpGetter.NotModifiedException;
import com.commafeed.backend.feeds.FeedRefreshExecutor.Task;
import com.commafeed.backend.model.ApplicationSettings;
@@ -37,6 +38,9 @@ public class FeedRefreshWorker {
@Inject
FeedRefreshTaskGiver taskGiver;
@Inject
MetricRegistry metrics;
@Inject
ApplicationSettingsService applicationSettingsService;
@@ -46,7 +50,7 @@ public class FeedRefreshWorker {
private void init() {
ApplicationSettings settings = applicationSettingsService.get();
int threads = settings.getBackgroundThreads();
pool = new FeedRefreshExecutor("feed-refresh-worker", threads, Math.min(20 * threads, 1000));
pool = new FeedRefreshExecutor("feed-refresh-worker", threads, Math.min(20 * threads, 1000), metrics);
}
@PreDestroy
@@ -58,14 +62,6 @@ public class FeedRefreshWorker {
pool.execute(new FeedTask(context));
}
public int getQueueSize() {
return pool.getQueueSize();
}
public int getActiveCount() {
return pool.getActiveCount();
}
private class FeedTask implements Task {
private FeedRefreshContext context;

View File

@@ -1,80 +0,0 @@
package com.commafeed.backend.feeds;
import java.util.Date;
import java.util.List;
import javax.ejb.Stateless;
import javax.inject.Inject;
import com.commafeed.backend.dao.FeedCategoryDAO;
import com.commafeed.backend.dao.FeedSubscriptionDAO;
import com.commafeed.backend.model.FeedCategory;
import com.commafeed.backend.model.FeedSubscription;
import com.commafeed.backend.model.User;
import com.sun.syndication.feed.opml.Attribute;
import com.sun.syndication.feed.opml.Opml;
import com.sun.syndication.feed.opml.Outline;
@Stateless
public class OPMLExporter {
@Inject
FeedCategoryDAO feedCategoryDAO;
@Inject
FeedSubscriptionDAO feedSubscriptionDAO;
@SuppressWarnings("unchecked")
public Opml export(User user) {
Opml opml = new Opml();
opml.setFeedType("opml_1.1");
opml.setTitle(String.format("%s subscriptions in CommaFeed", user.getName()));
opml.setCreated(new Date());
List<FeedCategory> categories = feedCategoryDAO.findAll(user);
List<FeedSubscription> subscriptions = feedSubscriptionDAO.findAll(user);
for (FeedCategory cat : categories) {
opml.getOutlines().add(buildCategoryOutline(cat, subscriptions));
}
for (FeedSubscription sub : subscriptions) {
if (sub.getCategory() == null) {
opml.getOutlines().add(buildSubscriptionOutline(sub));
}
}
return opml;
}
@SuppressWarnings("unchecked")
private Outline buildCategoryOutline(FeedCategory cat, List<FeedSubscription> subscriptions) {
Outline outline = new Outline();
outline.setText(cat.getName());
outline.setTitle(cat.getName());
for (FeedCategory child : cat.getChildren()) {
outline.getChildren().add(buildCategoryOutline(child, subscriptions));
}
for (FeedSubscription sub : subscriptions) {
if (sub.getCategory() != null && sub.getCategory().getId().equals(cat.getId())) {
outline.getChildren().add(buildSubscriptionOutline(sub));
}
}
return outline;
}
@SuppressWarnings("unchecked")
private Outline buildSubscriptionOutline(FeedSubscription sub) {
Outline outline = new Outline();
outline.setText(sub.getTitle());
outline.setTitle(sub.getTitle());
outline.setType("rss");
outline.getAttributes().add(new Attribute("xmlUrl", sub.getFeed().getUrl()));
if (sub.getFeed().getLink() != null) {
outline.getAttributes().add(new Attribute("htmlUrl", sub.getFeed().getLink()));
}
return outline;
}
}

View File

@@ -1,101 +0,0 @@
package com.commafeed.backend.feeds;
import java.io.StringReader;
import java.util.List;
import javax.ejb.Asynchronous;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import com.commafeed.backend.cache.CacheService;
import com.commafeed.backend.dao.FeedCategoryDAO;
import com.commafeed.backend.model.FeedCategory;
import com.commafeed.backend.model.User;
import com.commafeed.backend.services.FeedSubscriptionService;
import com.commafeed.backend.services.FeedSubscriptionService.FeedSubscriptionException;
import com.sun.syndication.feed.opml.Opml;
import com.sun.syndication.feed.opml.Outline;
import com.sun.syndication.io.WireFeedInput;
@Stateless
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Slf4j
public class OPMLImporter {
@Inject
FeedSubscriptionService feedSubscriptionService;
@Inject
FeedCategoryDAO feedCategoryDAO;
@Inject
CacheService cache;
@SuppressWarnings("unchecked")
@Asynchronous
public void importOpml(User user, String xml) {
xml = xml.substring(xml.indexOf('<'));
WireFeedInput input = new WireFeedInput();
try {
Opml feed = (Opml) input.build(new StringReader(xml));
List<Outline> outlines = (List<Outline>) feed.getOutlines();
for (Outline outline : outlines) {
handleOutline(user, outline, null);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
@SuppressWarnings("unchecked")
private void handleOutline(User user, Outline outline, FeedCategory parent) {
if (StringUtils.isEmpty(outline.getType())) {
String name = FeedUtils.truncate(outline.getText(), 128);
if (name == null) {
name = FeedUtils.truncate(outline.getTitle(), 128);
}
FeedCategory category = feedCategoryDAO.findByName(user, name, parent);
if (category == null) {
if (StringUtils.isBlank(name)) {
name = "Unnamed category";
}
category = new FeedCategory();
category.setName(name);
category.setParent(parent);
category.setUser(user);
feedCategoryDAO.saveOrUpdate(category);
}
List<Outline> children = outline.getChildren();
for (Outline child : children) {
handleOutline(user, child, category);
}
} else {
String name = FeedUtils.truncate(outline.getText(), 128);
if (name == null) {
name = FeedUtils.truncate(outline.getTitle(), 128);
}
if (StringUtils.isBlank(name)) {
name = "Unnamed subscription";
}
// make sure we continue with the import process even a feed failed
try {
feedSubscriptionService.subscribe(user, outline.getXmlUrl(), name, parent);
} catch (FeedSubscriptionException e) {
throw e;
} catch (Exception e) {
log.error("error while importing {}: {}", outline.getXmlUrl(), e.getMessage());
}
}
cache.invalidateUserRootCategory(user);
}
}