[Measures] added new measure (UsageCounts) #214
|
@ -391,4 +391,19 @@ public class OafMapperUtils {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static KeyValue newKeyValueInstance(String key, String value, DataInfo dataInfo) {
|
||||||
|
|||||||
|
KeyValue kv = new KeyValue();
|
||||||
|
kv.setDataInfo(dataInfo);
|
||||||
|
kv.setKey(key);
|
||||||
|
kv.setValue(value);
|
||||||
|
return kv;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) {
|
||||||
|
Measure m = new Measure();
|
||||||
|
m.setId(id);
|
||||||
|
m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo)));
|
||||||
|
return m;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ public class Constants {
|
||||||
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
||||||
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
||||||
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
||||||
|
public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts";
|
||||||
|
public static final String UPDATE_KEY_USAGE_COUNTS = "count";
|
||||||
|
|
||||||
public static final String FOS_CLASS_ID = "FOS";
|
public static final String FOS_CLASS_ID = "FOS";
|
||||||
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Measure;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* created the Atomic Action for each type of results
|
||||||
|
*/
|
||||||
|
public class SparkAtomicActionUsageJob implements Serializable {
|
||||||
miriam.baglioni marked this conversation as resolved
claudio.atzori
commented
typo: tipe == type typo: tipe == type
|
|||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class);
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
public static <I extends Result> void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkAtomicActionUsageJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath {}: ", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
||||||
|
|
||||||
|
final String dbname = parser.get("usagestatsdb");
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
miriam.baglioni marked this conversation as resolved
claudio.atzori
commented
Minor: I would name the parameter as Minor: I would name the parameter as `usagestatsdb` to avoid confusion with the other stats database. In this context there is no ambiguity, but from the workflow caller there might be.
|
|||||||
|
|
||||||
|
runWithSparkHiveSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
removeOutputDir(spark, outputPath);
|
||||||
|
prepareResults(dbname, spark, workingPath);
|
||||||
|
prepareActionSet(spark, workingPath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareResults(String db, SparkSession spark, String workingPath) {
|
||||||
|
spark
|
||||||
|
.sql(
|
||||||
|
"Select result_id, downloads, views " +
|
||||||
|
"from " + db + ".usage_stats")
|
||||||
|
.as(Encoders.bean(UsageStatsModel.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(workingPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) {
|
||||||
|
readPath(spark, inputPath, UsageStatsModel.class)
|
||||||
|
.groupByKey((MapFunction<UsageStatsModel, String>) us -> us.getResult_id(), Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, UsageStatsModel, Result>) (k, it) -> {
|
||||||
|
UsageStatsModel first = it.next();
|
||||||
|
it.forEachRemaining(us -> {
|
||||||
|
first.setDownloads(first.getDownloads() + us.getDownloads());
|
||||||
|
first.setViews(first.getViews() + us.getViews());
|
||||||
|
});
|
||||||
|
|
||||||
|
Result res = new Result();
|
||||||
|
res.setId("50|" + k);
|
||||||
|
|
||||||
|
res.setMeasures(getMeasure(first.getDownloads(), first.getViews()));
|
||||||
|
return res;
|
||||||
|
}, Encoders.bean(Result.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<Measure> getMeasure(Long downloads, Long views) {
|
||||||
|
DataInfo dataInfo = OafMapperUtils
|
||||||
|
.dataInfo(
|
||||||
|
false,
|
||||||
|
UPDATE_DATA_INFO_TYPE,
|
||||||
|
true,
|
||||||
|
false,
|
||||||
|
OafMapperUtils
|
||||||
|
.qualifier(
|
||||||
|
UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID,
|
||||||
|
UPDATE_CLASS_NAME,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||||
|
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||||
|
"");
|
||||||
|
|
||||||
|
return Arrays
|
||||||
|
.asList(
|
||||||
|
OafMapperUtils
|
||||||
|
.newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo),
|
||||||
|
OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <R> Dataset<R> readPath(
|
||||||
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
|
return spark
|
||||||
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class UsageStatsModel implements Serializable {
|
||||||
|
private String result_id;
|
||||||
|
private Long downloads;
|
||||||
|
private Long views;
|
||||||
|
|
||||||
|
public String getResult_id() {
|
||||||
|
return result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResult_id(String result_id) {
|
||||||
|
this.result_id = result_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getDownloads() {
|
||||||
|
return downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDownloads(Long downloads) {
|
||||||
|
this.downloads = downloads;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getViews() {
|
||||||
|
return views;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setViews(Long views) {
|
||||||
|
this.views = views;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "when true will stop SparkSession after job execution",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "hmu",
|
||||||
|
"paramLongName": "hive_metastore_uris",
|
||||||
|
"paramDescription": "the URI for the hive metastore",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "o",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the new ActionSet",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "sdb",
|
||||||
|
"paramLongName": "usagestatsdb",
|
||||||
claudio.atzori
commented
Minor: I would name the parameter as Minor: I would name the parameter as `usagestatsdb` to avoid confusion with the other stats database. In this context there is no ambiguity, but from the workflow caller there might be.
|
|||||||
|
"paramDescription": "the name of the db to be used",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "wp",
|
||||||
|
"paramLongName": "workingPath",
|
||||||
|
"paramDescription": "the workingPath where to save the content of the usage_stats table",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,30 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveMetastoreUris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveJdbcUrl</name>
|
||||||
|
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hiveDbName</name>
|
||||||
|
<value>openaire</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,99 @@
|
||||||
|
<workflow-app name="UsageStatsCounts" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path where to store the actionset</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>usagestatsdb</name>
|
||||||
claudio.atzori
commented
Minor: I would name the parameter as Minor: I would name the parameter as `usagestatsdb` to avoid confusion with the other stats database. In this context there is no ambiguity, but from the workflow caller there might be.
|
|||||||
|
<description>the name of the db to be used</description>
|
||||||
claudio.atzori
commented
the description is wrong. Looks like a copy&paste the description is wrong. Looks like a copy&paste
|
|||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozieActionShareLibForSpark2</name>
|
||||||
|
<description>oozie action sharelib for spark 2.*</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
<description>spark 2.* extra listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
<description>spark 2.* sql query execution listeners classname</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<description>spark 2.* yarn history server address</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<description>spark 2.* event log dir location</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.queuename</name>
|
||||||
|
<value>${queueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapred.job.queue.name</name>
|
||||||
|
<value>${oozieLauncherQueueName}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
<start to="atomicactions"/>
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="atomicactions">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Produces the atomic action with the usage stats count for results</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.usagestats.SparkAtomicActionUsageJob</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--hive_metastore_uris</arg><arg>${hiveMetastoreUris}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--usagestatsdb</arg><arg>${usagestatsdb}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingDir}/usageDb</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -0,0 +1,259 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.actionmanager.usagestats;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob;
|
||||||
|
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
public class SparkAtomicActionCountJobTest {
|
||||||
|
|
||||||
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
private static Path workingDir;
|
||||||
|
private static final Logger log = LoggerFactory
|
||||||
|
.getLogger(SparkAtomicActionCountJobTest.class);
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void beforeAll() throws IOException {
|
||||||
|
workingDir = Files
|
||||||
|
.createTempDirectory(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
log.info("using work dir {}", workingDir);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
conf.setAppName(SparkAtomicActionCountJobTest.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.setMaster("local[*]");
|
||||||
|
conf.set("spark.driver.host", "localhost");
|
||||||
|
conf.set("hive.metastore.local", "true");
|
||||||
|
conf.set("spark.ui.enabled", "false");
|
||||||
|
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||||
|
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||||
|
|
||||||
|
spark = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkAtomicActionCountJobTest.class.getSimpleName())
|
||||||
|
.config(conf)
|
||||||
|
.getOrCreate();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
public static void afterAll() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(workingDir.toFile());
|
||||||
|
spark.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testMatch() {
|
||||||
|
String usageScoresPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
|
SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet");
|
||||||
|
|
||||||
|
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Result> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/actionSet")
|
||||||
|
.map(usm -> OBJECT_MAPPER.readValue(usm, Result.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(9, tmp.count());
|
||||||
|
|
||||||
|
tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size()));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"measure:usage_counts",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassid()))));
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"Inferred by OpenAIRE",
|
||||||
|
u.getDataInfo().getProvenanceaction().getClassname()))));
|
||||||
|
|
||||||
|
tmp
|
||||||
|
.foreach(
|
||||||
|
r -> r
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
m -> m
|
||||||
|
.getUnit()
|
||||||
|
.stream()
|
||||||
|
.forEach(
|
||||||
|
u -> Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"count",
|
||||||
|
u.getKey()))));
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"5",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"0",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"1",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"2",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("downloads"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
"6",
|
||||||
|
tmp
|
||||||
|
.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f"))
|
||||||
|
.collect()
|
||||||
|
.get(0)
|
||||||
|
.getMeasures()
|
||||||
|
.stream()
|
||||||
|
.filter(m -> m.getId().equals("views"))
|
||||||
|
.collect(Collectors.toList())
|
||||||
|
.get(0)
|
||||||
|
.getUnit()
|
||||||
|
.get(0)
|
||||||
|
.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4}
|
||||||
|
{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3}
|
||||||
|
{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3}
|
||||||
|
{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1}
|
||||||
|
{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8}
|
||||||
|
{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2}
|
||||||
|
{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3}
|
Loading…
Reference in New Issue
Please compile before pushing. It seems the code formatting was not applied here.