forked from D-Net/dnet-hadoop
[Measures] added new measure (usagecounts) as action set. Measure added at the level of the result. Ref #7587
This commit is contained in:
parent
73c172926a
commit
869407c6e2
|
@ -27,6 +27,8 @@ public class Constants {
|
||||||
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
||||||
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
||||||
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
||||||
|
public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts";
|
||||||
|
public static final String UPDATE_KEY_USAGE_COUNTS = "count";
|
||||||
|
|
||||||
public static final String FOS_CLASS_ID = "FOS";
|
public static final String FOS_CLASS_ID = "FOS";
|
||||||
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.cxf.wsdl.service.factory.MethodNameSoapActionServiceConfiguration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.ForeachFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize;
|
||||||
|
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
import lombok.val;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* created the Atomic Action for each tipe of results
|
||||||
|
*/
|
||||||
|
public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static <I extends Result> void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkAtomicActionUsageJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
|
|
||||||
|
final String dbname = parser.get("statsdb");
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
|
||||||
|
runWithSparkHiveSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
prepareResults(dbname, spark, outputPath);
|
||||||
|
prepareActionSet(spark, workingPath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareResults(String db, SparkSession spark, String workingPath) {
|
||||||
|
spark
|
||||||
|
.sql(
|
||||||
|
"Select result_id, downloads, views " +
|
||||||
|
"from " + db + ".usage_stats")
|
||||||
|
.as(Encoders.bean(UsageStatsModel.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){
|
||||||
|
readPath(spark, inputPath, UsageStatsModel.class)
|
||||||
|
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
|
||||||
|
UsageStatsModel first = it.next();
|
||||||
|
it.forEachRemaining(us -> {
|
||||||
|
first.setDownloads(first.getDownloads() + us.getDownloads());
|
||||||
|
first.setViews(first.getViews() + us.getViews());
|
||||||
|
});
|
||||||
|
|
||||||
|
Result res = new Result();
|
||||||
|
res.setId("50|" + k);
|
||||||
|
|
||||||
|
|
||||||
|
res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
|
||||||
|
return res;
|
||||||
|
}, Encoders.bean(Result.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Measure> getMeasure(Long downloads, Long views) {
|
||||||
|
DataInfo dataInfo = OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false,
|
||||||
|
UPDATE_DATA_INFO_TYPE,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
|
||||||
|
UPDATE_CLASS_NAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"");
|
||||||
|
|
||||||
|
return Arrays
|
||||||
|
.asList(
|
||||||
|
Measure
|
||||||
|
.newInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo),
|
||||||
|
Measure.newInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R> Dataset<R> readPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class UsageStatsModel implements Serializable {
|
||||||
|
private String result_id;
|
||||||
|
private Long downloads;
|
||||||
|
private Long views;
|
||||||
|
|
||||||
|
public String getResult_id() {
|
||||||
|
return result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult_id(String result_id) {
|
||||||
|
this.result_id = result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getDownloads() {
|
||||||
|
return downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDownloads(Long downloads) {
|
||||||
|
this.downloads = downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getViews() {
|
||||||
|
return views;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setViews(Long views) {
|
||||||
|
this.views = views;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hive_metastore_uris",
|
||||||
|
"paramDescription": "the URI for the hive metastore",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "sdb",
|
||||||
|
"paramLongName": "statsdb",
|
||||||
|
"paramDescription": "the name of the db to be used",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,30 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveJdbcUrl</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<value>openaire</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,98 @@
|
||||||
|
<workflow-app name="UsageStatsCounts" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>statsdb</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<description>oozie action sharelib for spark 2.*</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
<description>spark 2.* extra listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
<description>spark 2.* sql query execution listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<description>spark 2.* yarn history server address</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<description>spark 2.* event log dir location</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
<start to="atomicactions"/>
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="atomicactions">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the usage stats count for results</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.usagestats.SparkAtomicActionUsageJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--statsdb</arg><arg>${statsdb}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,133 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
public class SparkAtomicActionCountJobTest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(SparkAtomicActionCountJobTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkAtomicActionCountJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMatch() {
|
||||||
|
String usageScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Result> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/actionSet")
|
||||||
|
.map(usm -> OBJECT_MAPPER.readValue(usm, Result.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(9, tmp.count());
|
||||||
|
|
||||||
|
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
|
||||||
|
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassid()))));
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||||
|
|
||||||
|
tmp.foreach(r -> r.getMeasures().stream().forEach(m ->
|
||||||
|
m.getUnit().stream().forEach(u -> Assertions.assertEquals("count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
|
||||||
|
|
||||||
|
Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
|
||||||
|
Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
|
||||||
|
Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0)
|
||||||
|
.getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0)
|
||||||
|
.getUnit().get(0).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
|
||||||
|
{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
|
||||||
|
{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
2
pom.xml
2
pom.xml
|
@ -801,7 +801,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[2.10.32]</dhp-schemas.version>
|
<dhp-schemas.version>[2.11.34-SNAPSHOT]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue