Compare commits
6 Commits
master
...
monitoring
Author | SHA1 | Date |
---|---|---|
Sandro La Bruzzo | 0e80385258 | |
Sandro La Bruzzo | f413122661 | |
Sandro La Bruzzo | 6af85fc542 | |
Sandro La Bruzzo | 452dbc68ec | |
Sandro La Bruzzo | e138b60d5c | |
Sandro La Bruzzo | 8d96832cf6 |
|
@ -73,6 +73,10 @@
|
|||
<groupId>me.xuender</groupId>
|
||||
<artifactId>unidecode</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.influxdb</groupId>
|
||||
<artifactId>influxdb-client-java</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package eu.dnetlib.dhp.monitor;
|
||||
|
||||
public class MonitorConstant {
|
||||
|
||||
|
||||
|
||||
public enum AggregationType {
|
||||
COLLECTION,
|
||||
TRANSFORMATION
|
||||
}
|
||||
|
||||
public enum AggregationMode {
|
||||
REFRESH,
|
||||
INCREMENTAL
|
||||
|
||||
}
|
||||
|
||||
public static String AGGREGATION_LABEL_NAME ="aggregation";
|
||||
public static String COLLECTION_LABEL_NAME ="collection";
|
||||
public static String TRANSFORM_LABEL_NAME ="transform";
|
||||
|
||||
public static String AGGREGATION_MODE_LABEL = "mode";
|
||||
|
||||
public static String DATASOURCE_ID_LABEL_NAME= "datasourceId";
|
||||
|
||||
public static String DATASOURCE_NAME_LABEL_NAME= "datasourceName";
|
||||
|
||||
public static final String DATASOURCE_API_LABEL_NAME = "api";
|
||||
|
||||
public static String AGGREGATION_TRANSFORM_METRIC = String.format("%s-%s",AGGREGATION_LABEL_NAME,TRANSFORM_LABEL_NAME);
|
||||
|
||||
public static String AGGREGATION_COLLECTION_METRIC = String.format("%s-%s",AGGREGATION_LABEL_NAME,COLLECTION_LABEL_NAME);
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
package eu.dnetlib.dhp.monitor.model;
|
||||
|
||||
import com.influxdb.client.domain.WritePrecision;
|
||||
import com.influxdb.client.write.Point;
|
||||
import eu.dnetlib.dhp.monitor.MonitorConstant;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The type Aggregation metric
|
||||
* models the metric about the aggregation statistics
|
||||
* like collection or transformation,
|
||||
* giving the stats
|
||||
*/
|
||||
public class AggregationMetric extends Metric <Long> {
|
||||
|
||||
private List<MetricLabel> labels;
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a new Aggregation metric.
|
||||
*/
|
||||
public AggregationMetric() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Point asInfluxDBPoint() {
|
||||
final Point point = Point.measurement(getName())
|
||||
.time(getTimestampDate(), WritePrecision.MS)
|
||||
.addField("total", getTotal());
|
||||
if (labels!= null && labels.size()>0)
|
||||
labels.forEach(l -> point.addTag(l.getName(), l.getValue()));
|
||||
return point;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Aggregation metric.
|
||||
*
|
||||
* @param name the name
|
||||
* @param isoDate the iso date
|
||||
* @param value the value
|
||||
* @param labels the labels
|
||||
* @throws MonitorException the monitor exception
|
||||
*/
|
||||
public AggregationMetric(final String name, final String isoDate, final Long value, final List<MetricLabel> labels) throws MonitorException {
|
||||
super(name, isoDate, value);
|
||||
this.labels = labels;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets labels.
|
||||
*
|
||||
* @return the labels
|
||||
*/
|
||||
public List<MetricLabel> getLabels() {
|
||||
return labels;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets labels.
|
||||
*
|
||||
* @param labels the labels
|
||||
* @return the labels
|
||||
*/
|
||||
public AggregationMetric setLabels(List<MetricLabel> labels) {
|
||||
this.labels = labels;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The Aggregation builder is a builder class utility to create Aggregation metrics.
|
||||
*/
|
||||
public static class AggregationBuilder {
|
||||
|
||||
private MonitorConstant.AggregationType aggregationType;
|
||||
|
||||
private MonitorConstant.AggregationMode aggregationMode;
|
||||
|
||||
private String datasourceId;
|
||||
|
||||
private String datasourceName;
|
||||
|
||||
private String api;
|
||||
|
||||
|
||||
/**
|
||||
* Creates aggregation builder already set the metric type of aggregation collection.
|
||||
*
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder createCollectionMetrics() {
|
||||
this.aggregationType = MonitorConstant.AggregationType.COLLECTION;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates aggregation builder already set the metric type of aggregation transformation.
|
||||
*
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder createTransformationMetrics() {
|
||||
this.aggregationType = MonitorConstant.AggregationType.TRANSFORMATION;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates aggregation builder already set the aggregation mode REFRESH.
|
||||
*
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder withRefreshMode(){
|
||||
this.aggregationMode = MonitorConstant.AggregationMode.REFRESH;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates aggregation builder already set the aggregation mode INCREMENTAL.
|
||||
*
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder withIncrementalMode(){
|
||||
this.aggregationMode = MonitorConstant.AggregationMode.INCREMENTAL;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the datasource Identifier to the aggregation builder
|
||||
*
|
||||
* @param datasourceId the datasource id
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder withDatasourceId(final String datasourceId) {
|
||||
this.datasourceId = datasourceId;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the datasource Name to the aggregation builder
|
||||
*
|
||||
* @param datasourceName the datasource name
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder withDatasourceName(final String datasourceName) {
|
||||
this.datasourceName = datasourceName;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the datasource API Identifier to the aggregation builder
|
||||
*
|
||||
* @param api the api
|
||||
* @return the aggregation builder
|
||||
*/
|
||||
public AggregationBuilder withDatasourceAPI(final String api) {
|
||||
this.api = api;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build aggregation metric.
|
||||
*
|
||||
* Here the method check if all the fields are already set, otherwise it
|
||||
* raises a Monitor Exception
|
||||
*
|
||||
* @param isoDate the iso date
|
||||
* @param total the total
|
||||
* @return the aggregation metric
|
||||
* @throws MonitorException the monitor exception
|
||||
*/
|
||||
public AggregationMetric build(final String isoDate, final Long total) throws MonitorException {
|
||||
|
||||
final AggregationMetric metric= new AggregationMetric();
|
||||
|
||||
if(aggregationType ==null)
|
||||
throw new MonitorException("Aggregation type needed please instantiate builder calling createCollectionMetrics or createTransformationMetrics");
|
||||
|
||||
switch(aggregationType) {
|
||||
case COLLECTION:
|
||||
metric.setName(MonitorConstant.AGGREGATION_COLLECTION_METRIC);
|
||||
break;
|
||||
case TRANSFORMATION:
|
||||
metric.setName(MonitorConstant.AGGREGATION_TRANSFORM_METRIC);
|
||||
break;
|
||||
}
|
||||
|
||||
if(aggregationMode ==null)
|
||||
throw new MonitorException("Aggregation mode needed please instantiate builder calling withRefreshMode or withIncrementalMode");
|
||||
|
||||
final List<MetricLabel> labels = new ArrayList<>();
|
||||
metric.setLabels(labels);
|
||||
|
||||
labels.add(new MetricLabel(MonitorConstant.AGGREGATION_MODE_LABEL, aggregationMode.toString()));
|
||||
|
||||
if(StringUtils.isBlank(datasourceName))
|
||||
throw new MonitorException("Datasource name should be not blank please set using 'withDatasourceName' function ");
|
||||
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_NAME_LABEL_NAME, datasourceName));
|
||||
|
||||
if(StringUtils.isBlank(datasourceId))
|
||||
throw new MonitorException("Datasource Identifier should be not blank please set using 'withDatasourceId' function ");
|
||||
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_ID_LABEL_NAME, datasourceId));
|
||||
|
||||
if(StringUtils.isBlank(api))
|
||||
throw new MonitorException("Datasource API should be not blank please set using 'withDatasourceIAPI' function ");
|
||||
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_API_LABEL_NAME, api));
|
||||
|
||||
metric.setIsoDate(isoDate);
|
||||
metric.setTotal(total);
|
||||
return metric;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
package eu.dnetlib.dhp.monitor.model;
|
||||
|
||||
import java.text.DateFormat;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import com.influxdb.client.write.Point;
|
||||
import java.time.Instant;
|
||||
|
||||
|
||||
/**
|
||||
* The base Metric class.
|
||||
* It contains the common attribute of the DHP Metrics like
|
||||
* - name
|
||||
* - isoDate
|
||||
* - Total value of the metric that is a numeric value
|
||||
*
|
||||
* @param <T> the type parameter
|
||||
*/
|
||||
public abstract class Metric<T extends Number> {
|
||||
|
||||
|
||||
// The name of the metric
|
||||
private String name;
|
||||
|
||||
// The sting date in ISO format
|
||||
private String isoDate;
|
||||
|
||||
|
||||
private Date convertedDate;
|
||||
|
||||
//Total value of the metric
|
||||
private T total;
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a new Metric.
|
||||
*/
|
||||
public Metric() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Metric.
|
||||
*
|
||||
* @param name the name
|
||||
* @param isoDate the iso date
|
||||
* @param total the total
|
||||
* @throws MonitorException the monitor exception
|
||||
*/
|
||||
public Metric(final String name, final String isoDate, final T total) throws MonitorException {
|
||||
this.name = name;
|
||||
setIsoDate(isoDate);
|
||||
this.total = total;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets iso date.
|
||||
*
|
||||
* @return the iso date
|
||||
*/
|
||||
public String getIsoDate() {
|
||||
return isoDate;
|
||||
}
|
||||
|
||||
private String valiDate(final String aDate) throws MonitorException {
|
||||
try {
|
||||
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
|
||||
convertedDate = df1.parse(aDate);
|
||||
return df1.format(convertedDate);
|
||||
} catch (ParseException e) {
|
||||
throw new MonitorException("Error the date is not in the format yyyy-MM-dd'T'HH:mm:ss ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets name.
|
||||
*
|
||||
* @return the name
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets name.
|
||||
*
|
||||
* @param name the name
|
||||
* @return the name
|
||||
*/
|
||||
public Metric<T> setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets iso date.
|
||||
* <p>
|
||||
* This method verify that the Date is in ISO FORMAT like yyyy-MM-dd'T'HH:mm:ss
|
||||
*
|
||||
* @param isoDate the iso date
|
||||
* @return the iso date
|
||||
* @throws MonitorException the monitor exception
|
||||
*/
|
||||
public Metric<T> setIsoDate(String isoDate) throws MonitorException {
|
||||
this.isoDate = valiDate(isoDate);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method convert the ISO Date to Instant
|
||||
* useful for influxDB
|
||||
*
|
||||
* @return the isoDate in Instant
|
||||
*/
|
||||
public Instant getTimestampDate() {
|
||||
return convertedDate.toInstant();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets total.
|
||||
*
|
||||
* @return the total
|
||||
*/
|
||||
public T getTotal() {
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets total.
|
||||
*
|
||||
* @param total the total
|
||||
* @return the total
|
||||
*/
|
||||
public Metric<T> setTotal(T total) {
|
||||
this.total = total;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This method convert the metric data model
|
||||
* into the corresponding influxDB Point
|
||||
*
|
||||
* @return the point
|
||||
*/
|
||||
public abstract Point asInfluxDBPoint();
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package eu.dnetlib.dhp.monitor.model;
|
||||
|
||||
/**
|
||||
* This class represents the Base Metric Label
|
||||
* it contains the common attribute like
|
||||
* - name
|
||||
* - value
|
||||
*
|
||||
* Each metrics should contain one or more MetricLabel
|
||||
*/
|
||||
public class MetricLabel {
|
||||
|
||||
/**
|
||||
* The Name.
|
||||
*/
|
||||
public String name;
|
||||
|
||||
private String value;
|
||||
|
||||
/**
|
||||
* Instantiates a new Metric label.
|
||||
*/
|
||||
public MetricLabel() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates a new Metric label.
|
||||
*
|
||||
* @param name the name
|
||||
* @param value the value
|
||||
*/
|
||||
public MetricLabel(String name, String value) {
|
||||
this.name = name;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Gets name.
|
||||
*
|
||||
* @return the name
|
||||
*/
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets name.
|
||||
*
|
||||
* @param name the name
|
||||
* @return the name
|
||||
*/
|
||||
public MetricLabel setName(String name) {
|
||||
this.name = name;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets value.
|
||||
*
|
||||
* @return the value
|
||||
*/
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets value.
|
||||
*
|
||||
* @param value the value
|
||||
* @return the value
|
||||
*/
|
||||
public MetricLabel setValue(String value) {
|
||||
this.value = value;
|
||||
return this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package eu.dnetlib.dhp.monitor.model;
|
||||
|
||||
/**
|
||||
* The Monitor exception thrown by monitor stuff.
|
||||
*/
|
||||
public class MonitorException extends Exception{
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a new Monitor exception.
|
||||
*/
|
||||
public MonitorException() {super();}
|
||||
|
||||
/**
|
||||
* Instantiates a new Monitor exception.
|
||||
*
|
||||
* @param message the message
|
||||
*/
|
||||
public MonitorException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Instantiates a new Monitor exception.
|
||||
*
|
||||
* @param message the message
|
||||
* @param e the exception
|
||||
*/
|
||||
public MonitorException(final String message, final Throwable e) {
|
||||
super(message,e);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
package eu.dnetlib.dhp.monitor;
|
||||
|
||||
import com.influxdb.client.InfluxDBClient;
|
||||
import com.influxdb.client.InfluxDBClientFactory;
|
||||
import eu.dnetlib.dhp.monitor.model.AggregationMetric;
|
||||
import eu.dnetlib.dhp.monitor.model.MonitorException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* The type Aggregation metrics test.
|
||||
*/
|
||||
public class AggregationMetricsTest {
|
||||
|
||||
/**
|
||||
* Test aggregation builder
|
||||
* Here we test case where builder should work
|
||||
* and where it should raise exception
|
||||
*
|
||||
* @throws Exception the exception
|
||||
*/
|
||||
@Test
|
||||
public void testAggregationBuilder() throws Exception {
|
||||
|
||||
// TEST AGGREGATION METRICS INITIALIZED CORRECTLY
|
||||
AggregationMetric metric = new AggregationMetric.AggregationBuilder()
|
||||
.createCollectionMetrics()
|
||||
.withRefreshMode()
|
||||
.withDatasourceAPI("API___1234")
|
||||
.withDatasourceName("A Datasource")
|
||||
.withDatasourceId("DS_ID_1")
|
||||
.build("2021-06-27T13:03:14+00:00", 100L);
|
||||
|
||||
assertNotNull(metric);
|
||||
|
||||
assertEquals(MonitorConstant.AGGREGATION_COLLECTION_METRIC, metric.getName());
|
||||
|
||||
assertEquals(100L, metric.getTotal());
|
||||
|
||||
assertEquals(1624791794000L, metric.getTimestampDate().toEpochMilli());
|
||||
|
||||
assertEquals(4, metric.getLabels().size());
|
||||
|
||||
|
||||
System.out.println(metric.asInfluxDBPoint().toLineProtocol());
|
||||
|
||||
|
||||
|
||||
assertThrows(MonitorException.class, ()-> {
|
||||
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder();
|
||||
builder.build("A-b_c",null);
|
||||
});
|
||||
|
||||
assertThrows(MonitorException.class, ()-> {
|
||||
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
|
||||
.createTransformationMetrics()
|
||||
.withIncrementalMode();
|
||||
builder.build("A-b_c",null);
|
||||
});
|
||||
|
||||
assertThrows(MonitorException.class, ()-> {
|
||||
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
|
||||
.createTransformationMetrics()
|
||||
.withIncrementalMode();
|
||||
builder.build("2021-06-27T13:03:14+00:00",null);
|
||||
});
|
||||
|
||||
assertDoesNotThrow(() -> {
|
||||
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
|
||||
.createTransformationMetrics()
|
||||
.withIncrementalMode()
|
||||
.withDatasourceAPI("PAI")
|
||||
.withDatasourceId("ID")
|
||||
.withDatasourceName("NAME");
|
||||
|
||||
builder.build("2021-06-27T13:03:14+00:00",100L);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConnection () throws Exception {
|
||||
|
||||
String token = "jhB7Ixgn5uAUhy2Jgwod3rGscifmc30woUln05v0RgRYOqpGzpPTnTIA8bjwbqTXccSVcEfUwiHl_ESrmqz8Lg==";
|
||||
String bucket = "Aggregation";
|
||||
String org = "ISTI CNR";
|
||||
|
||||
InfluxDBClient client = InfluxDBClientFactory.create("https://ip-90-147-167-221.ct1.garrservices.it:8086", token.toCharArray());
|
||||
|
||||
String fileName = "/Users/sandro/Develop/python/monitor/metrics/part-00000";
|
||||
|
||||
//read file into stream, try-with-resources
|
||||
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
|
||||
final AtomicInteger count=new AtomicInteger(0);
|
||||
stream.forEach(s-> count.incrementAndGet());
|
||||
System.out.println("count = " + count.get());
|
||||
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
6
pom.xml
6
pom.xml
|
@ -200,6 +200,12 @@
|
|||
<version>${dhp.commons.lang.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.influxdb</groupId>
|
||||
<artifactId>influxdb-client-java</artifactId>
|
||||
<version>3.1.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.github.sisyphsu</groupId>
|
||||
<artifactId>dateparser</artifactId>
|
||||
|
|
Loading…
Reference in New Issue