new metrics

This commit is contained in:
Michele Artini 2021-07-30 14:12:09 +02:00
parent 853d4b66e5
commit 53750ff818
16 changed files with 227 additions and 53 deletions

View File

@ -4,16 +4,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.broker.common.elasticsearch.EventRepository;
import eu.dnetlib.broker.common.elasticsearch.NotificationRepository;
import eu.dnetlib.broker.common.stats.OpenaireDsStatRepository;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.broker.common.topics.TopicTypeRepository;
import eu.dnetlib.broker.controllers.objects.BufferStatus;
@ -57,7 +56,7 @@ public class AjaxController extends AbstractDnetController {
private ThreadManager threadManager;
@Autowired
private JdbcTemplate jdbcTemplate;
private OpenaireDsStatRepository openaireDsStatRepository;
@Value("${lbs.elastic.homepage}")
private String elasticSearchUiUrl;
@ -98,7 +97,7 @@ public class AjaxController extends AbstractDnetController {
currentStatus.setThreads(threads);
currentStatus.getTotals().put("topics", topicTypeRepo.count());
currentStatus.getTotals().put("events_es", eventRepository.count());
currentStatus.getTotals().put("events_db", countEventsInTheDb());
currentStatus.getTotals().put("events_db", openaireDsStatRepository.totalEvents());
currentStatus.getTotals().put("subscriptions", subscriptionRepo.count());
currentStatus.getTotals().put("notifications_es", notificationRepository.count());
@ -106,15 +105,6 @@ public class AjaxController extends AbstractDnetController {
}
}
private Long countEventsInTheDb() {
try {
final String sql = IOUtils.toString(getClass().getResourceAsStream("/sql/totalEvents.sql"));
return jdbcTemplate.queryForObject(sql, Long.class);
} catch (final Exception e) {
return 0l;
}
}
@GetMapping("/resetCounters")
public CurrentStatus resetCounters() {
dispatcherManager.getDispatchers().forEach(NotificationDispatcher::resetCount);

View File

@ -1 +0,0 @@
select sum(size) from oa_datasource_stats;

View File

@ -4,6 +4,7 @@ import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -57,6 +58,7 @@ import eu.dnetlib.broker.common.elasticsearch.EventRepository;
import eu.dnetlib.broker.common.elasticsearch.Notification;
import eu.dnetlib.broker.common.elasticsearch.NotificationRepository;
import eu.dnetlib.broker.common.properties.ElasticSearchProperties;
import eu.dnetlib.broker.common.stats.OpenaireDsStatRepository;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
@ -82,6 +84,9 @@ public class OpenairePublicController extends AbstractDnetController {
@Autowired
private SubscriptionRepository subscriptionRepo;
@Autowired
private OpenaireDsStatRepository openaireDsStatRepository;
@Autowired
private ElasticSearchProperties props;
@ -154,7 +159,7 @@ public class OpenairePublicController extends AbstractDnetController {
boolean first = true;
IOUtils.write("[\n", gzOut);
IOUtils.write("[\n", gzOut, StandardCharsets.UTF_8);
ScrollPage<ShortEventMessage> page = null;
@ -165,13 +170,13 @@ public class OpenairePublicController extends AbstractDnetController {
if (first) {
first = false;
} else {
IOUtils.write(",\n", gzOut);
IOUtils.write(",\n", gzOut, StandardCharsets.UTF_8);
}
IOUtils.write(gson.toJson(msg), gzOut);
IOUtils.write(gson.toJson(msg), gzOut, StandardCharsets.UTF_8);
}
} while (!page.isCompleted());
IOUtils.write("\n]\n", gzOut);
IOUtils.write("\n]\n", gzOut, StandardCharsets.UTF_8);
gzOut.flush();
@ -196,7 +201,7 @@ public class OpenairePublicController extends AbstractDnetController {
final GZIPOutputStream gzOut = new GZIPOutputStream(out)) {
boolean first = true;
IOUtils.write("[\n", gzOut);
IOUtils.write("[\n", gzOut, StandardCharsets.UTF_8);
try {
for (final FileStatus fileStatus : fs.listStatus(pathDir)) {
@ -223,7 +228,7 @@ public class OpenairePublicController extends AbstractDnetController {
} catch (final FileNotFoundException e) {
log.warn("File not found - " + e.getMessage());
}
IOUtils.write("\n]\n", gzOut);
IOUtils.write("\n]\n", gzOut, StandardCharsets.UTF_8);
gzOut.flush();
} catch (final Throwable e) {
log.error("Error accessing hdfs file", e);
@ -237,10 +242,10 @@ public class OpenairePublicController extends AbstractDnetController {
if (first) {
first = false;
} else {
IOUtils.write(",\n", gzOut);
IOUtils.write(",\n", gzOut, StandardCharsets.UTF_8);
}
IOUtils.write(line, gzOut);
IOUtils.write(line, gzOut, StandardCharsets.UTF_8);
line = br.readLine();
}
@ -258,7 +263,8 @@ public class OpenairePublicController extends AbstractDnetController {
private Map<String, Long> status() {
final Map<String, Long> res = new LinkedHashMap<>();
res.put("n_subscriptions", subscriptionRepo.count());
res.put("n_events", eventRepository.count());
res.put("n_events_es", eventRepository.count());
res.put("n_events_db", openaireDsStatRepository.totalEvents());
res.put("n_notifications", notificationRepository.count());
return res;
}

View File

@ -0,0 +1,21 @@
package eu.dnetlib.organizations.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.common.metrics.MetricInfo;
import eu.dnetlib.organizations.repository.OrganizationRepository;
import eu.dnetlib.organizations.utils.OrganizationStatus;
@Component("valid_organizations_total")
public class ValidOrganizationsMetric extends MetricInfo {
@Autowired
private OrganizationRepository organizationRepository;
@Override
public double obtainValue(final MetricInfo info) {
return organizationRepository.countByStatus(OrganizationStatus.approved.toString());
}
}

View File

@ -26,4 +26,6 @@ public interface OrganizationRepository extends JpaRepository<Organization, Stri
@Query(value = "insert into organizations(id) values (?1)", nativeQuery = true)
void preparePendingOrg(String id);
double countByStatus(String string);
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.common.metrics;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import io.micrometer.core.instrument.Metrics;
public abstract class MetricInfo implements BeanNameAware {
private static final Log log = LogFactory.getLog(MetricInfo.class);
private String beanName;
abstract public double obtainValue(MetricInfo info);
public String getBeanName() {
return beanName;
}
@Override
public final void setBeanName(final String beanName) {
this.beanName = beanName;
}
@PostConstruct
public final void register() {
log.info("Prometheus - new metric registered: " + getBeanName());
Metrics.gauge(getBeanName(), this, this::obtainValue);
}
}

View File

@ -84,34 +84,13 @@
<version>3.2.1</version>
</dependency>
<!-- for /metrics and /health controllers -->
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_spring_boot</artifactId>
<version>${prometheus.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</exclusion>
</exclusions>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dnet-apps-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_servlet</artifactId>
<version>${prometheus.version}</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_spring_web</artifactId>
<version>0.3.0</version>
</dependency>
<!-- JUNIT -->
<dependency>
<groupId>junit</groupId>

View File

@ -3,8 +3,6 @@ package eu.dnetlib.broker.common.elasticsearch;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
@ -29,8 +27,6 @@ public class EventStatsManager {
@Autowired
private ElasticSearchProperties elasticSearchProperties;
private static final Log log = LogFactory.getLog(EventStatsManager.class);
public class BrowseEntry {
private final String value;

View File

@ -0,0 +1,20 @@
package eu.dnetlib.broker.common.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.stats.OpenaireDsStatRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_datasources_with_events")
public class TotalDatasourcesWithEventsMetric extends MetricInfo {
@Autowired
private OpenaireDsStatRepository openaireDsStatRepository;
@Override
public double obtainValue(final MetricInfo info) {
return openaireDsStatRepository.countDatasourcesWithEvents();
}
}

View File

@ -0,0 +1,37 @@
package eu.dnetlib.broker.common.metrics;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.subscriptions.ConditionOperator;
import eu.dnetlib.broker.common.subscriptions.MapCondition;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_datasources_with_subscriptions")
public class TotalDatasourcesWithSubscriptionsMetric extends MetricInfo {
@Autowired
private SubscriptionRepository subscriptionRepository;
@Override
public double obtainValue(final MetricInfo info) {
return StreamSupport.stream(subscriptionRepository.findAll().spliterator(), false)
.map(Subscription::getConditionsAsList)
.flatMap(List::stream)
.filter(c -> c.getField().equals("targetDatasourceName"))
.filter(c -> c.getOperator() == ConditionOperator.EXACT)
.map(MapCondition::getListParams)
.filter(l -> !l.isEmpty())
.map(l -> l.get(0).getValue())
.filter(StringUtils::isNotBlank)
.distinct()
.count();
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.broker.common.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.stats.OpenaireDsStatRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_events")
public class TotalEventsMetric extends MetricInfo {
@Autowired
private OpenaireDsStatRepository openaireDsStatRepository;
@Override
public double obtainValue(final MetricInfo info) {
return openaireDsStatRepository.totalEvents();
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.broker.common.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.elasticsearch.NotificationRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_notifications")
public class TotalNotificationsMetric extends MetricInfo {
@Autowired
private NotificationRepository notificationRepository;
@Override
public double obtainValue(final MetricInfo info) {
return notificationRepository.count();
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.broker.common.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_subscribers")
public class TotalSubscribersMetric extends MetricInfo {
@Autowired
private SubscriptionRepository subscriptionRepository;
@Override
public double obtainValue(final MetricInfo info) {
return subscriptionRepository.countSubscribers();
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.broker.common.metrics;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import eu.dnetlib.broker.common.subscriptions.SubscriptionRepository;
import eu.dnetlib.common.metrics.MetricInfo;
@Component("provide_broker_subscriptions")
public class TotalSubscriptionsMetric extends MetricInfo {
@Autowired
private SubscriptionRepository subscriptionRepository;
@Override
public double obtainValue(final MetricInfo info) {
return subscriptionRepository.count();
}
}

View File

@ -1,7 +1,13 @@
package eu.dnetlib.broker.common.stats;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
public interface OpenaireDsStatRepository extends CrudRepository<OpenaireDsStat, OpenaireDsStatPK> {
@Query(value = "select sum(size) from oa_datasource_stats", nativeQuery = true)
long totalEvents();
@Query(value = "select count(distinct name) from oa_datasource_stats where size > 0", nativeQuery = true)
long countDatasourcesWithEvents();
}

View File

@ -1,5 +1,6 @@
package eu.dnetlib.broker.common.subscriptions;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;
@ -10,4 +11,7 @@ public interface SubscriptionRepository extends CrudRepository<Subscription, Str
Iterable<Subscription> findBySubscriber(String email);
@Query(value = "select count(distinct subscriber) from subscriptions", nativeQuery = true)
long countSubscribers();
}