Compare commits

...

6 Commits

Author SHA1 Message Date
Sandro La Bruzzo 0e80385258 fixed typo 2022-07-06 11:33:39 +03:00
Sandro La Bruzzo f413122661 added test 2022-06-20 14:46:06 +02:00
Sandro La Bruzzo 6af85fc542 removed not used field 2022-05-11 11:13:11 +02:00
Sandro La Bruzzo 452dbc68ec enhanced metric data model 2022-05-09 08:29:21 +02:00
Sandro La Bruzzo e138b60d5c Merge branch 'beta' into monitoring 2022-05-05 12:12:03 +02:00
Sandro La Bruzzo 8d96832cf6 Added first model of Metric class 2022-05-02 15:49:48 +02:00
8 changed files with 649 additions and 0 deletions

View File

@ -73,6 +73,10 @@
<groupId>me.xuender</groupId>
<artifactId>unidecode</artifactId>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.monitor;
public class MonitorConstant {
public enum AggregationType {
COLLECTION,
TRANSFORMATION
}
public enum AggregationMode {
REFRESH,
INCREMENTAL
}
public static String AGGREGATION_LABEL_NAME ="aggregation";
public static String COLLECTION_LABEL_NAME ="collection";
public static String TRANSFORM_LABEL_NAME ="transform";
public static String AGGREGATION_MODE_LABEL = "mode";
public static String DATASOURCE_ID_LABEL_NAME= "datasourceId";
public static String DATASOURCE_NAME_LABEL_NAME= "datasourceName";
public static final String DATASOURCE_API_LABEL_NAME = "api";
public static String AGGREGATION_TRANSFORM_METRIC = String.format("%s-%s",AGGREGATION_LABEL_NAME,TRANSFORM_LABEL_NAME);
public static String AGGREGATION_COLLECTION_METRIC = String.format("%s-%s",AGGREGATION_LABEL_NAME,COLLECTION_LABEL_NAME);
}

View File

@ -0,0 +1,222 @@
package eu.dnetlib.dhp.monitor.model;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import eu.dnetlib.dhp.monitor.MonitorConstant;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* The type Aggregation metric
* models the metric about the aggregation statistics
* like collection or transformation,
* giving the stats
*/
public class AggregationMetric extends Metric <Long> {
private List<MetricLabel> labels;
/**
* Instantiates a new Aggregation metric.
*/
public AggregationMetric() {
super();
}
@Override
public Point asInfluxDBPoint() {
final Point point = Point.measurement(getName())
.time(getTimestampDate(), WritePrecision.MS)
.addField("total", getTotal());
if (labels!= null && labels.size()>0)
labels.forEach(l -> point.addTag(l.getName(), l.getValue()));
return point;
}
/**
* Instantiates a new Aggregation metric.
*
* @param name the name
* @param isoDate the iso date
* @param value the value
* @param labels the labels
* @throws MonitorException the monitor exception
*/
public AggregationMetric(final String name, final String isoDate, final Long value, final List<MetricLabel> labels) throws MonitorException {
super(name, isoDate, value);
this.labels = labels;
}
/**
* Gets labels.
*
* @return the labels
*/
public List<MetricLabel> getLabels() {
return labels;
}
/**
* Sets labels.
*
* @param labels the labels
* @return the labels
*/
public AggregationMetric setLabels(List<MetricLabel> labels) {
this.labels = labels;
return this;
}
/**
* The Aggregation builder is a builder class utility to create Aggregation metrics.
*/
public static class AggregationBuilder {
private MonitorConstant.AggregationType aggregationType;
private MonitorConstant.AggregationMode aggregationMode;
private String datasourceId;
private String datasourceName;
private String api;
/**
* Creates aggregation builder already set the metric type of aggregation collection.
*
* @return the aggregation builder
*/
public AggregationBuilder createCollectionMetrics() {
this.aggregationType = MonitorConstant.AggregationType.COLLECTION;
return this;
}
/**
* Creates aggregation builder already set the metric type of aggregation transformation.
*
* @return the aggregation builder
*/
public AggregationBuilder createTransformationMetrics() {
this.aggregationType = MonitorConstant.AggregationType.TRANSFORMATION;
return this;
}
/**
* Creates aggregation builder already set the aggregation mode REFRESH.
*
* @return the aggregation builder
*/
public AggregationBuilder withRefreshMode(){
this.aggregationMode = MonitorConstant.AggregationMode.REFRESH;
return this;
}
/**
* Creates aggregation builder already set the aggregation mode INCREMENTAL.
*
* @return the aggregation builder
*/
public AggregationBuilder withIncrementalMode(){
this.aggregationMode = MonitorConstant.AggregationMode.INCREMENTAL;
return this;
}
/**
* Set the datasource Identifier to the aggregation builder
*
* @param datasourceId the datasource id
* @return the aggregation builder
*/
public AggregationBuilder withDatasourceId(final String datasourceId) {
this.datasourceId = datasourceId;
return this;
}
/**
* Set the datasource Name to the aggregation builder
*
* @param datasourceName the datasource name
* @return the aggregation builder
*/
public AggregationBuilder withDatasourceName(final String datasourceName) {
this.datasourceName = datasourceName;
return this;
}
/**
* Set the datasource API Identifier to the aggregation builder
*
* @param api the api
* @return the aggregation builder
*/
public AggregationBuilder withDatasourceAPI(final String api) {
this.api = api;
return this;
}
/**
* Build aggregation metric.
*
* Here the method check if all the fields are already set, otherwise it
* raises a Monitor Exception
*
* @param isoDate the iso date
* @param total the total
* @return the aggregation metric
* @throws MonitorException the monitor exception
*/
public AggregationMetric build(final String isoDate, final Long total) throws MonitorException {
final AggregationMetric metric= new AggregationMetric();
if(aggregationType ==null)
throw new MonitorException("Aggregation type needed please instantiate builder calling createCollectionMetrics or createTransformationMetrics");
switch(aggregationType) {
case COLLECTION:
metric.setName(MonitorConstant.AGGREGATION_COLLECTION_METRIC);
break;
case TRANSFORMATION:
metric.setName(MonitorConstant.AGGREGATION_TRANSFORM_METRIC);
break;
}
if(aggregationMode ==null)
throw new MonitorException("Aggregation mode needed please instantiate builder calling withRefreshMode or withIncrementalMode");
final List<MetricLabel> labels = new ArrayList<>();
metric.setLabels(labels);
labels.add(new MetricLabel(MonitorConstant.AGGREGATION_MODE_LABEL, aggregationMode.toString()));
if(StringUtils.isBlank(datasourceName))
throw new MonitorException("Datasource name should be not blank please set using 'withDatasourceName' function ");
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_NAME_LABEL_NAME, datasourceName));
if(StringUtils.isBlank(datasourceId))
throw new MonitorException("Datasource Identifier should be not blank please set using 'withDatasourceId' function ");
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_ID_LABEL_NAME, datasourceId));
if(StringUtils.isBlank(api))
throw new MonitorException("Datasource API should be not blank please set using 'withDatasourceIAPI' function ");
labels.add(new MetricLabel(MonitorConstant.DATASOURCE_API_LABEL_NAME, api));
metric.setIsoDate(isoDate);
metric.setTotal(total);
return metric;
}
}
}

View File

@ -0,0 +1,152 @@
package eu.dnetlib.dhp.monitor.model;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.influxdb.client.write.Point;
import java.time.Instant;
/**
* The base Metric class.
* It contains the common attribute of the DHP Metrics like
* - name
* - isoDate
* - Total value of the metric that is a numeric value
*
* @param <T> the type parameter
*/
public abstract class Metric<T extends Number> {
// The name of the metric
private String name;
// The sting date in ISO format
private String isoDate;
private Date convertedDate;
//Total value of the metric
private T total;
/**
* Instantiates a new Metric.
*/
public Metric() {
}
/**
* Instantiates a new Metric.
*
* @param name the name
* @param isoDate the iso date
* @param total the total
* @throws MonitorException the monitor exception
*/
public Metric(final String name, final String isoDate, final T total) throws MonitorException {
this.name = name;
setIsoDate(isoDate);
this.total = total;
}
/**
* Gets iso date.
*
* @return the iso date
*/
public String getIsoDate() {
return isoDate;
}
private String valiDate(final String aDate) throws MonitorException {
try {
DateFormat df1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
convertedDate = df1.parse(aDate);
return df1.format(convertedDate);
} catch (ParseException e) {
throw new MonitorException("Error the date is not in the format yyyy-MM-dd'T'HH:mm:ss ", e);
}
}
/**
* Gets name.
*
* @return the name
*/
public String getName() {
return name;
}
/**
* Sets name.
*
* @param name the name
* @return the name
*/
public Metric<T> setName(String name) {
this.name = name;
return this;
}
/**
* Sets iso date.
* <p>
* This method verify that the Date is in ISO FORMAT like yyyy-MM-dd'T'HH:mm:ss
*
* @param isoDate the iso date
* @return the iso date
* @throws MonitorException the monitor exception
*/
public Metric<T> setIsoDate(String isoDate) throws MonitorException {
this.isoDate = valiDate(isoDate);
return this;
}
/**
* This method convert the ISO Date to Instant
* useful for influxDB
*
* @return the isoDate in Instant
*/
public Instant getTimestampDate() {
return convertedDate.toInstant();
}
/**
* Gets total.
*
* @return the total
*/
public T getTotal() {
return total;
}
/**
* Sets total.
*
* @param total the total
* @return the total
*/
public Metric<T> setTotal(T total) {
this.total = total;
return this;
}
/**
* This method convert the metric data model
* into the corresponding influxDB Point
*
* @return the point
*/
public abstract Point asInfluxDBPoint();
}

View File

@ -0,0 +1,78 @@
package eu.dnetlib.dhp.monitor.model;
/**
* This class represents the Base Metric Label
* it contains the common attribute like
* - name
* - value
*
* Each metrics should contain one or more MetricLabel
*/
public class MetricLabel {
/**
* The Name.
*/
public String name;
private String value;
/**
* Instantiates a new Metric label.
*/
public MetricLabel() {
}
/**
* Instantiates a new Metric label.
*
* @param name the name
* @param value the value
*/
public MetricLabel(String name, String value) {
this.name = name;
this.value = value;
}
/**
* Gets name.
*
* @return the name
*/
public String getName() {
return name;
}
/**
* Sets name.
*
* @param name the name
* @return the name
*/
public MetricLabel setName(String name) {
this.name = name;
return this;
}
/**
* Gets value.
*
* @return the value
*/
public String getValue() {
return value;
}
/**
* Sets value.
*
* @param value the value
* @return the value
*/
public MetricLabel setValue(String value) {
this.value = value;
return this;
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.monitor.model;
/**
* The Monitor exception thrown by monitor stuff.
*/
public class MonitorException extends Exception{
/**
* Instantiates a new Monitor exception.
*/
public MonitorException() {super();}
/**
* Instantiates a new Monitor exception.
*
* @param message the message
*/
public MonitorException(final String message) {
super(message);
}
/**
* Instantiates a new Monitor exception.
*
* @param message the message
* @param e the exception
*/
public MonitorException(final String message, final Throwable e) {
super(message,e);
}
}

View File

@ -0,0 +1,116 @@
package eu.dnetlib.dhp.monitor;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import eu.dnetlib.dhp.monitor.model.AggregationMetric;
import eu.dnetlib.dhp.monitor.model.MonitorException;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.*;
/**
* The type Aggregation metrics test.
*/
public class AggregationMetricsTest {
/**
* Test aggregation builder
* Here we test case where builder should work
* and where it should raise exception
*
* @throws Exception the exception
*/
@Test
public void testAggregationBuilder() throws Exception {
// TEST AGGREGATION METRICS INITIALIZED CORRECTLY
AggregationMetric metric = new AggregationMetric.AggregationBuilder()
.createCollectionMetrics()
.withRefreshMode()
.withDatasourceAPI("API___1234")
.withDatasourceName("A Datasource")
.withDatasourceId("DS_ID_1")
.build("2021-06-27T13:03:14+00:00", 100L);
assertNotNull(metric);
assertEquals(MonitorConstant.AGGREGATION_COLLECTION_METRIC, metric.getName());
assertEquals(100L, metric.getTotal());
assertEquals(1624791794000L, metric.getTimestampDate().toEpochMilli());
assertEquals(4, metric.getLabels().size());
System.out.println(metric.asInfluxDBPoint().toLineProtocol());
assertThrows(MonitorException.class, ()-> {
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder();
builder.build("A-b_c",null);
});
assertThrows(MonitorException.class, ()-> {
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
.createTransformationMetrics()
.withIncrementalMode();
builder.build("A-b_c",null);
});
assertThrows(MonitorException.class, ()-> {
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
.createTransformationMetrics()
.withIncrementalMode();
builder.build("2021-06-27T13:03:14+00:00",null);
});
assertDoesNotThrow(() -> {
AggregationMetric.AggregationBuilder builder = new AggregationMetric.AggregationBuilder()
.createTransformationMetrics()
.withIncrementalMode()
.withDatasourceAPI("PAI")
.withDatasourceId("ID")
.withDatasourceName("NAME");
builder.build("2021-06-27T13:03:14+00:00",100L);
});
}
@Test
public void testConnection () throws Exception {
String token = "jhB7Ixgn5uAUhy2Jgwod3rGscifmc30woUln05v0RgRYOqpGzpPTnTIA8bjwbqTXccSVcEfUwiHl_ESrmqz8Lg==";
String bucket = "Aggregation";
String org = "ISTI CNR";
InfluxDBClient client = InfluxDBClientFactory.create("https://ip-90-147-167-221.ct1.garrservices.it:8086", token.toCharArray());
String fileName = "/Users/sandro/Develop/python/monitor/metrics/part-00000";
//read file into stream, try-with-resources
try (Stream<String> stream = Files.lines(Paths.get(fileName))) {
final AtomicInteger count=new AtomicInteger(0);
stream.forEach(s-> count.incrementAndGet());
System.out.println("count = " + count.get());
} catch (IOException e) {
e.printStackTrace();
}
}
}

View File

@ -200,6 +200,12 @@
<version>${dhp.commons.lang.version}</version>
</dependency>
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.github.sisyphsu</groupId>
<artifactId>dateparser</artifactId>