diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index 0a51e8600..9ee359cd5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -391,4 +391,19 @@ public class OafMapperUtils { } return null; } + + public static KeyValue newKeyValueInstance(String key, String value, DataInfo dataInfo) { + KeyValue kv = new KeyValue(); + kv.setDataInfo(dataInfo); + kv.setKey(key); + kv.setValue(value); + return kv; + } + + public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo))); + return m; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index b790d90cb..aa25ca633 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -27,6 +27,8 @@ public class Constants { public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; + public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts"; + public static final String UPDATE_KEY_USAGE_COUNTS = "count"; public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java new file mode 100644 index 000000000..c284ad8bd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -0,0 +1,149 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Measure; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +/** + * created the Atomic Action for each type of results + */ +public class SparkAtomicActionUsageJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkAtomicActionUsageJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + final String dbname = parser.get("usagestatsdb"); + + final String workingPath = parser.get("workingPath"); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareResults(dbname, spark, workingPath); + prepareActionSet(spark, workingPath, outputPath); + }); + } + + public static void prepareResults(String db, SparkSession spark, String workingPath) { + spark + .sql( + "Select result_id, downloads, views " + + "from " + db + ".usage_stats") + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) { + readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + UsageStatsModel first = it.next(); + it.forEachRemaining(us -> { + first.setDownloads(first.getDownloads() + us.getDownloads()); + first.setViews(first.getViews() + us.getViews()); + }); + + Result res = new Result(); + res.setId("50|" + k); + + res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); + return res; + }, Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static List getMeasure(Long downloads, Long views) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + OafMapperUtils + .newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), + OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); + + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java new file mode 100644 index 000000000..df8a77eb6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java @@ -0,0 +1,34 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import java.io.Serializable; + +public class UsageStatsModel implements Serializable { + private String result_id; + private Long downloads; + private Long views; + + public String getResult_id() { + return result_id; + } + + public void setResult_id(String result_id) { + this.result_id = result_id; + } + + public Long getDownloads() { + return downloads; + } + + public void setDownloads(Long downloads) { + this.downloads = downloads; + } + + public Long getViews() { + return views; + } + + public void setViews(Long views) { + this.views = views; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json new file mode 100644 index 000000000..e9200d3ad --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hive_metastore_uris", + "paramDescription": "the URI for the hive metastore", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + }, + { + "paramName": "sdb", + "paramLongName": "usagestatsdb", + "paramDescription": "the name of the db to be used", + "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml new file mode 100644 index 000000000..d94cf7d53 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -0,0 +1,99 @@ + + + + outputPath + the path where to store the actionset + + + usagestatsdb + the name of the db to be used + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn + cluster + Produces the atomic action with the usage stats count for results + eu.dnetlib.dhp.actionmanager.usagestats.SparkAtomicActionUsageJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --usagestatsdb${usagestatsdb} + --workingPath${workingDir}/usageDb + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java new file mode 100644 index 000000000..7cc9eb326 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -0,0 +1,259 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkAtomicActionCountJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(SparkAtomicActionCountJobTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(SparkAtomicActionCountJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(SparkAtomicActionCountJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SparkAtomicActionCountJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testMatch() { + String usageScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getPath(); + + SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/actionSet") + .map(usm -> OBJECT_MAPPER.readValue(usm, Result.class)); + + Assertions.assertEquals(9, tmp.count()); + + tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb new file mode 100644 index 000000000..fee74f697 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb @@ -0,0 +1,12 @@ +{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file