From 869407c6e202e765ba26d15b684ad47c516eff12 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 20 Apr 2022 14:02:05 +0200 Subject: [PATCH 1/9] [Measures] added new measure (usagecounts) as action set. Measure added at the level of the result. Ref #7587 --- .../dnetlib/dhp/actionmanager/Constants.java | 2 + .../usagestats/SparkAtomicActionUsageJob.java | 165 ++++++++++++++++++ .../usagestats/UsageStatsModel.java | 34 ++++ .../usagestats/input_actionset_parameter.json | 26 +++ .../usagestats/oozie_app/config-default.xml | 30 ++++ .../usagestats/oozie_app/workflow.xml | 98 +++++++++++ .../SparkAtomicActionCountJobTest.java | 133 ++++++++++++++ .../dhp/actionmanager/usagestats/usagestatsdb | 12 ++ pom.xml | 2 +- 9 files changed, 501 insertions(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index b790d90cb..aa25ca633 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -27,6 +27,8 @@ public class Constants { public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; + public static final String UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID = "measure:usage_counts"; + public static final String UPDATE_KEY_USAGE_COUNTS = "count"; public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java new file mode 100644 index 000000000..04f861df5 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -0,0 +1,165 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.cxf.wsdl.service.factory.MethodNameSoapActionServiceConfiguration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Measure; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.val; +import scala.Tuple2; + +/** + * created the Atomic Action for each tipe of results + */ +public class SparkAtomicActionUsageJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkAtomicActionUsageJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + final String dbname = parser.get("statsdb"); + + final String workingPath = parser.get("workingPath"); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareResults(dbname, spark, outputPath); + prepareActionSet(spark, workingPath, outputPath); + }); + } + + public static void prepareResults(String db, SparkSession spark, String workingPath) { + spark + .sql( + "Select result_id, downloads, views " + + "from " + db + ".usage_stats") + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + + public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){ + readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + UsageStatsModel first = it.next(); + it.forEachRemaining(us -> { + first.setDownloads(first.getDownloads() + us.getDownloads()); + first.setViews(first.getViews() + us.getViews()); + }); + + Result res = new Result(); + res.setId("50|" + k); + + + res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); + return res; + }, Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static List getMeasure(Long downloads, Long views) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + + return Arrays + .asList( + Measure + .newInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), + Measure.newInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); + + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + public static Dataset readPath( + SparkSession spark, String inputPath, Class clazz) { + return spark + .read() + .textFile(inputPath) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java new file mode 100644 index 000000000..df8a77eb6 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java @@ -0,0 +1,34 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import java.io.Serializable; + +public class UsageStatsModel implements Serializable { + private String result_id; + private Long downloads; + private Long views; + + public String getResult_id() { + return result_id; + } + + public void setResult_id(String result_id) { + this.result_id = result_id; + } + + public Long getDownloads() { + return downloads; + } + + public void setDownloads(Long downloads) { + this.downloads = downloads; + } + + public Long getViews() { + return views; + } + + public void setViews(Long views) { + this.views = views; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json new file mode 100644 index 000000000..b8ae24e12 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "hmu", + "paramLongName": "hive_metastore_uris", + "paramDescription": "the URI for the hive metastore", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + }, + { + "paramName": "sdb", + "paramLongName": "statsdb", + "paramDescription": "the name of the db to be used", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml new file mode 100644 index 000000000..781efb475 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -0,0 +1,98 @@ + + + + outputPath + the path where to store the actionset + + + statsdb + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + yarn + cluster + Produces the atomic action with the usage stats count for results + eu.dnetlib.dhp.actionmanager.usagestats.SparkAtomicActionUsageJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --hive_metastore_uris${hiveMetastoreUris} + --outputPath${outputPath} + --statsdb${statsdb} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java new file mode 100644 index 000000000..33d76893d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -0,0 +1,133 @@ + +package eu.dnetlib.dhp.actionmanager.usagestats; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkAtomicActionCountJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(SparkAtomicActionCountJobTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files + .createTempDirectory(SparkAtomicActionCountJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(SparkAtomicActionCountJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SparkAtomicActionCountJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testMatch() { + String usageScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getPath(); + + SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/actionSet") + .map(usm -> OBJECT_MAPPER.readValue(usm, Result.class)); + + Assertions.assertEquals(9, tmp.count()); + + tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp.foreach(r -> r.getMeasures().stream().forEach(m -> + m.getUnit().stream().forEach(u -> Assertions.assertEquals("count", + u.getKey())))); + + Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + + Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + + Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + + Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) + .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) + .getUnit().get(0).getValue()); + } + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb new file mode 100644 index 000000000..fee74f697 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb @@ -0,0 +1,12 @@ +{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 136b9b867..b0957393a 100644 --- a/pom.xml +++ b/pom.xml @@ -801,7 +801,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.10.32] + [2.11.34-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6] From 5feae77937df688e43f8c17988424dcf04259722 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 20 Apr 2022 15:13:09 +0200 Subject: [PATCH 2/9] [Measures] last changes to accomodate tests --- .../usagestats/SparkAtomicActionUsageJob.java | 23 +- .../usagestats/input_actionset_parameter.json | 6 + .../usagestats/oozie_app/workflow.xml | 1 + .../SparkAtomicActionCountJobTest.java | 200 ++++++++++++++---- 4 files changed, 181 insertions(+), 49 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 04f861df5..012261c39 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -94,19 +94,19 @@ public class SparkAtomicActionUsageJob implements Serializable { public static void prepareResults(String db, SparkSession spark, String workingPath) { spark - .sql( - "Select result_id, downloads, views " + - "from " + db + ".usage_stats") - .as(Encoders.bean(UsageStatsModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + "Select result_id, downloads, views " + + "from " + db + ".usage_stats") + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } - public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath){ - readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) + public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) { + readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { UsageStatsModel first = it.next(); it.forEachRemaining(us -> { @@ -117,7 +117,6 @@ public class SparkAtomicActionUsageJob implements Serializable { Result res = new Result(); res.setId("50|" + k); - res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); return res; }, Encoders.bean(Result.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json index b8ae24e12..821905da4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -22,5 +22,11 @@ "paramLongName": "statsdb", "paramDescription": "the name of the db to be used", "paramRequired": true + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index 781efb475..5b552ca4b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -89,6 +89,7 @@ --hive_metastore_uris${hiveMetastoreUris} --outputPath${outputPath} --statsdb${statsdb} + --workingPath${workingDir}/usageDb diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 33d76893d..7cc9eb326 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -75,7 +75,7 @@ public class SparkAtomicActionCountJobTest { .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") .getPath(); - SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -86,48 +86,174 @@ public class SparkAtomicActionCountJobTest { Assertions.assertEquals(9, tmp.count()); tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("measure:usage_counts", - u.getDataInfo().getProvenanceaction().getClassid())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("Inferred by OpenAIRE", - u.getDataInfo().getProvenanceaction().getClassname())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); - tmp.foreach(r -> r.getMeasures().stream().forEach(m -> - m.getUnit().stream().forEach(u -> Assertions.assertEquals("count", - u.getKey())))); + tmp + .foreach( + r -> r + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); - Assertions.assertEquals(1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + Assertions + .assertEquals( + 1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); - Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("5", tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); - Assertions.assertEquals("0", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("1", tmp.filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "0", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); - Assertions.assertEquals("2", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("downloads")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); - Assertions.assertEquals("6", tmp.filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")).collect().get(0) - .getMeasures().stream().filter(m -> m.getId().equals("views")).collect(Collectors.toList()).get(0) - .getUnit().get(0).getValue()); + Assertions + .assertEquals( + "2", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); } - - } From 5295effc9664fea8f92e148019d36469091484fd Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 20 Apr 2022 16:20:40 +0200 Subject: [PATCH 3/9] [Measures] fixed issue --- .../dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 012261c39..261794598 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -87,7 +87,7 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(dbname, spark, outputPath); + prepareResults(dbname, spark, workingPath); prepareActionSet(spark, workingPath, outputPath); }); } From c304657d9195b334c44bce4a11649a68c60a3299 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 11:27:26 +0200 Subject: [PATCH 4/9] [Measures] put the logic in common, no need to change the schema --- .../dhp/schema/oaf/utils/OafMapperUtils.java | 15 +++++++++++++ .../usagestats/SparkAtomicActionUsageJob.java | 22 ++++--------------- pom.xml | 2 +- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index 0a51e8600..8d67ce804 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -391,4 +391,19 @@ public class OafMapperUtils { } return null; } + + public static KeyValue newKeyValueInstance (String key, String value, DataInfo dataInfo){ + KeyValue kv = new KeyValue(); + kv.setDataInfo(dataInfo); + kv.setKey(key); + kv.setValue(value); + return kv; + } + + public static Measure newMeasureInstance(String id, String value, String key, DataInfo dataInfo) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(Arrays.asList(newKeyValueInstance(key, value, dataInfo))); + return m; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 261794598..790170ccc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -3,23 +3,14 @@ package eu.dnetlib.dhp.actionmanager.usagestats; import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; -import org.apache.cxf.wsdl.service.factory.MethodNameSoapActionServiceConfiguration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -31,19 +22,15 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import lombok.val; -import scala.Tuple2; + /** * created the Atomic Action for each tipe of results @@ -143,9 +130,8 @@ public class SparkAtomicActionUsageJob implements Serializable { return Arrays .asList( - Measure - .newInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), - Measure.newInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); + OafMapperUtils.newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), + OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); } diff --git a/pom.xml b/pom.xml index b0957393a..136b9b867 100644 --- a/pom.xml +++ b/pom.xml @@ -801,7 +801,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.11.34-SNAPSHOT] + [2.10.32] [4.0.3] [6.0.5] [3.1.6] From b61efd613b5ba5e324f2a1e4a40f549baf52b4dc Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 12:09:37 +0200 Subject: [PATCH 5/9] [Measures] addressed comments in the PR --- .../eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java | 2 +- .../usagestats/SparkAtomicActionUsageJob.java | 9 ++++----- .../usagestats/input_actionset_parameter.json | 2 +- .../dhp/actionmanager/usagestats/oozie_app/workflow.xml | 6 +++--- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java index 8d67ce804..9ee359cd5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtils.java @@ -392,7 +392,7 @@ public class OafMapperUtils { return null; } - public static KeyValue newKeyValueInstance (String key, String value, DataInfo dataInfo){ + public static KeyValue newKeyValueInstance(String key, String value, DataInfo dataInfo) { KeyValue kv = new KeyValue(); kv.setDataInfo(dataInfo); kv.setKey(key); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 790170ccc..04533aa53 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -31,7 +30,6 @@ import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; - /** * created the Atomic Action for each tipe of results */ @@ -65,7 +63,7 @@ public class SparkAtomicActionUsageJob implements Serializable { SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); - final String dbname = parser.get("statsdb"); + final String dbname = parser.get("usagestatsdb"); final String workingPath = parser.get("workingPath"); @@ -130,8 +128,9 @@ public class SparkAtomicActionUsageJob implements Serializable { return Arrays .asList( - OafMapperUtils.newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), - OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); + OafMapperUtils + .newMeasureInstance("downloads", String.valueOf(downloads), UPDATE_KEY_USAGE_COUNTS, dataInfo), + OafMapperUtils.newMeasureInstance("views", String.valueOf(views), UPDATE_KEY_USAGE_COUNTS, dataInfo)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json index 821905da4..e9200d3ad 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -19,7 +19,7 @@ }, { "paramName": "sdb", - "paramLongName": "statsdb", + "paramLongName": "usagestatsdb", "paramDescription": "the name of the db to be used", "paramRequired": true }, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index 5b552ca4b..d94cf7d53 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -5,8 +5,8 @@ the path where to store the actionset - statsdb - the path where to store the actionset + usagestatsdb + the name of the db to be used sparkDriverMemory @@ -88,7 +88,7 @@ --hive_metastore_uris${hiveMetastoreUris} --outputPath${outputPath} - --statsdb${statsdb} + --usagestatsdb${usagestatsdb} --workingPath${workingDir}/usageDb From 20de75ca64cb0237dcc6a19caa0fc1308d60f7d5 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 21 Apr 2022 12:14:03 +0200 Subject: [PATCH 6/9] [Measures] removed typo --- .../dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 04533aa53..c284ad8bd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -31,7 +31,7 @@ import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; /** - * created the Atomic Action for each tipe of results + * created the Atomic Action for each type of results */ public class SparkAtomicActionUsageJob implements Serializable { From 29150a5d0c69a540b546ffcfbc91114e68c40301 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 21 Apr 2022 13:31:56 +0200 Subject: [PATCH 7/9] code formatting --- .../oa/graph/clean/CleanContextSparkJob.java | 181 +++++---- .../dhp/oa/graph/clean/CleanContextTest.java | 375 ++++++++++++------ 2 files changed, 357 insertions(+), 199 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java index b20dcb67b..3e9b17f3f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextSparkJob.java @@ -1,16 +1,13 @@ + package eu.dnetlib.dhp.oa.graph.clean; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -21,93 +18,113 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.ObjectMapper; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class CleanContextSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(CleanContextSparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - CleanContextSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + String jsonConfiguration = IOUtils + .toString( + CleanContextSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/input_clean_context_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + String inputPath = parser.get("inputPath"); + log.info("inputPath: {}", inputPath); - String workingPath = parser.get("workingPath"); - log.info("workingPath: {}", workingPath); + String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); - String contextId = parser.get("contextId"); - log.info("contextId: {}", contextId); + String contextId = parser.get("contextId"); + log.info("contextId: {}", contextId); - String verifyParam = parser.get("verifyParam"); - log.info("verifyParam: {}", verifyParam); + String verifyParam = parser.get("verifyParam"); + log.info("verifyParam: {}", verifyParam); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + Class entityClazz = (Class) Class.forName(graphTableClassName); - Class entityClazz = (Class) Class.forName(graphTableClassName); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); + }); + } - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { + private static void cleanContext(SparkSession spark, String contextId, String verifyParam, + String inputPath, Class entityClazz, String workingPath) { + Dataset res = spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)); - cleanContext(spark, contextId, verifyParam, inputPath, entityClazz, workingPath); - }); - } + res.map((MapFunction) r -> { + if (!r + .getTitle() + .stream() + .filter( + t -> t + .getQualifier() + .getClassid() + .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) + .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))) { + return r; + } + r + .setContext( + r + .getContext() + .stream() + .filter( + c -> !c.getId().split("::")[0] + .equalsIgnoreCase(contextId)) + .collect(Collectors.toList())); + return r; + }, Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); - private static void cleanContext(SparkSession spark, String contextId, String verifyParam, String inputPath, Class entityClazz, String workingPath) { - Dataset res = spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)); - - res.map((MapFunction) r -> { - if(!r.getTitle() - .stream() - .filter(t -> t.getQualifier().getClassid() - .equalsIgnoreCase(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) - .anyMatch(t -> t.getValue().toLowerCase().startsWith(verifyParam.toLowerCase()))){ - return r; - } - r.setContext(r.getContext().stream().filter(c -> !c.getId().split("::")[0] - .equalsIgnoreCase(contextId)).collect(Collectors.toList())); - return r; - } ,Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(workingPath); - - spark.read().textFile(workingPath).map((MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), - Encoders.bean(entityClazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath); - } + spark + .read() + .textFile(workingPath) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, entityClazz), + Encoders.bean(entityClazz)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java index bba814346..472d3781d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleanContextTest.java @@ -1,18 +1,12 @@ + package eu.dnetlib.dhp.oa.graph.clean; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.dump.Constants; -import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest; -import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.dump.oaf.Instance; -import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; -import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Locale; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -27,133 +21,280 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.Locale; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.DumpJobTest; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.dump.oaf.Instance; +import eu.dnetlib.dhp.schema.dump.oaf.OpenAccessRoute; +import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class CleanContextTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); + private static final Logger log = LoggerFactory.getLogger(CleanContextTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(DumpJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(DumpJobTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(DumpJobTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(DumpJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(DumpJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - public void testResultClean() throws Exception { - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json") - .getPath(); - final String prefix = "gcube "; + @Test + public void testResultClean() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/oa/graph/clean/publication_clean_context.json") + .getPath(); + final String prefix = "gcube "; + spark + .read() + .textFile(sourcePath) + .map( + (MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), + Encoders.bean(Publication.class)) + .write() + .json(workingDir.toString() + "/publication"); - spark.read().textFile(sourcePath).map((MapFunction) r -> OBJECT_MAPPER.readValue(r, Publication.class), Encoders.bean(Publication.class)) - .write().json(workingDir.toString() + "/publication"); + CleanContextSparkJob.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--inputPath", workingDir.toString() + "/publication", + "-graphTableClassName", Publication.class.getCanonicalName(), + "-workingPath", workingDir.toString() + "/working", + "-contextId", "sobigdata", + "-verifyParam", "gCube " + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/publication") + .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); - CleanContextSparkJob.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--inputPath", workingDir.toString() + "/publication", - "-graphTableClassName", Publication.class.getCanonicalName(), - "-workingPath", workingDir.toString() + "/working", - "-contextId","sobigdata", - "-verifyParam","gCube " - }); + Assertions.assertEquals(7, tmp.count()); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/publication") - .map(item -> OBJECT_MAPPER.readValue(item, Publication.class)); + // original result with sobigdata context and gcube as starting string in the main title for the publication + Assertions + .assertEquals( + 0, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")) + .collect() + .get(0) + .getContext() + .size()); - Assertions.assertEquals(7, tmp.count()); + // original result with sobigdata context without gcube as starting string in the main title for the publication + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::2", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); - //original result with sobigdata context and gcube as starting string in the main title for the publication - Assertions.assertEquals(0, - tmp.filter(p->p.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95")).collect().get(0).getContext().size()); + // original result with sobigdata context with gcube as starting string in the subtitle + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::2", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + List titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context without gcube as starting string in the main title for the publication - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9")).collect().get(0).getContext().get(0).getId() ); + // original result with sobigdata context with gcube not as starting string in the main title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "sobigdata::projects::1", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim())); + Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context with gcube as starting string in the subtitle - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::2",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getContext().get(0).getId() ); - List titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c81248c335f0aa07e06817ece6fa6af")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertEquals("subtitle", titles.get(0).getQualifier().getClassid()); + // original result with sobigdata in context and also other contexts with gcube as starting string for the main + // title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(1, titles.size()); + Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix)); + Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); - //original result with sobigdata context with gcube not as starting string in the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().size()); - Assertions.assertEquals("sobigdata::projects::1",tmp.filter(p->p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::3c9f068ddc930360bec6925488a9a97f")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertFalse(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().contains(prefix.trim()) ); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); + // original result with multiple main title one of which whith gcube as starting string and with 2 contextes + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(2, titles.size()); + Assertions + .assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); - //original result with sobigdata in context and also other contexts with gcube as starting string for the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4669a378a73661417182c208e6fdab53")).collect().get(0).getTitle(); - Assertions.assertEquals(1, titles.size()); - Assertions.assertTrue(titles.get(0).getValue().toLowerCase().startsWith(prefix) ); - Assertions.assertEquals("main title", titles.get(0).getQualifier().getClassid()); + // original result without sobigdata in context with gcube as starting string for the main title + Assertions + .assertEquals( + 1, + tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getContext() + .size()); + Assertions + .assertEquals( + "dh-ch", + tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getContext() + .get(0) + .getId()); + titles = tmp + .filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")) + .collect() + .get(0) + .getTitle(); + Assertions.assertEquals(2, titles.size()); - //original result with multiple main title one of which whith gcube as starting string and with 2 contextes - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|DansKnawCris::4a9152e80f860eab99072e921d74a0ff")).collect().get(0).getTitle(); - Assertions.assertEquals(2, titles.size()); - Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix)) ); + Assertions + .assertTrue( + titles + .stream() + .anyMatch( + t -> t.getQualifier().getClassid().equals("main title") + && t.getValue().toLowerCase().startsWith(prefix))); - - //original result without sobigdata in context with gcube as starting string for the main title - Assertions.assertEquals(1, - tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().size()); - Assertions.assertEquals("dh-ch",tmp.filter(p->p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getContext().get(0).getId() ); - titles = tmp.filter(p -> p.getId().equals("50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8")).collect().get(0).getTitle(); - Assertions.assertEquals(2, titles.size()); - - Assertions.assertTrue(titles.stream().anyMatch(t -> t.getQualifier().getClassid().equals("main title") && t.getValue().toLowerCase().startsWith(prefix))); - - - } + } } From aa12429f500e563a114b39650b3d8c13c361d3fa Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 22 Apr 2022 11:05:08 +0200 Subject: [PATCH 8/9] Modified last intersection since we lost many titles. --- .../doiboost/SparkGenerateDoiBoost.scala | 83 ++++++++----------- 1 file changed, 34 insertions(+), 49 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 9323c994c..10a5be969 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -59,52 +59,6 @@ object SparkGenerateDoiBoost { val workingDirPath = parser.get("workingPath") val openaireOrganizationPath = parser.get("openaireOrganizationPath") - val crossrefAggregator = new Aggregator[(String, Publication), Publication, Publication] with Serializable { - override def zero: Publication = new Publication - - override def reduce(b: Publication, a: (String, Publication)): Publication = { - - if (b == null) { - if (a != null && a._2 != null) { - a._2.setId(a._1) - return a._2 - } - } else { - if (a != null && a._2 != null) { - b.mergeFrom(a._2) - b.setId(a._1) - val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) - b.setAuthor(authors) - return b - } - } - new Publication - } - - override def merge(b1: Publication, b2: Publication): Publication = { - if (b1 == null) { - if (b2 != null) - return b2 - } else { - if (b2 != null) { - b1.mergeFrom(b2) - val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) - b1.setAuthor(authors) - if (b2.getId != null && b2.getId.nonEmpty) - b1.setId(b2.getId) - return b1 - } - } - new Publication - } - - override def finish(reduction: Publication): Publication = reduction - - override def bufferEncoder: Encoder[Publication] = Encoders.kryo[Publication] - - override def outputEncoder: Encoder[Publication] = Encoders.kryo[Publication] - } - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] @@ -175,12 +129,43 @@ object SparkGenerateDoiBoost { .map(DoiBoostMappingUtil.fixPublication) .map(p => (p.getId, p)) .groupByKey(_._1) - .agg(crossrefAggregator.toColumn) - .map(p => p._2) + .reduceGroups((left, right) => + { + //Check left is not null + if (left != null && left._1 != null) + { + //If right is null then return left + if (right == null || right._2 == null) + left + else { + // Here Left and Right are not null + // So we have to merge + val b1 = left._2 + val b2 = right._2 + b1.mergeFrom(b2) + b1.mergeOAFDataInfo(b2) + val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) + b1.setAuthor(authors) + if (b2.getId != null && b2.getId.nonEmpty) + b1.setId(b2.getId) + //Return publication Merged + (b1.getId, b1) + } + } + else { + // Left is Null so we return right + right + } + } + + ) + .filter(s => s!= null && s._2!=null) + .map(s => s._2._2) .write .mode(SaveMode.Overwrite) .save(s"$workingDirPath/doiBoostPublicationFiltered") + val affiliationPath = parser.get("affiliationPath") val paperAffiliationPath = parser.get("paperAffiliationPath") @@ -305,4 +290,4 @@ object SparkGenerateDoiBoost { .save(s"$workingDirPath/doiBoostOrganization") } -} +} \ No newline at end of file From a82ec3aaaf60ea77918f3ff1cfab20538ef6fe7e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 22 Apr 2022 11:08:13 +0200 Subject: [PATCH 9/9] code formatter --- .../doiboost/SparkGenerateDoiBoost.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 10a5be969..2cbd53097 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -129,11 +129,9 @@ object SparkGenerateDoiBoost { .map(DoiBoostMappingUtil.fixPublication) .map(p => (p.getId, p)) .groupByKey(_._1) - .reduceGroups((left, right) => - { + .reduceGroups((left, right) => { //Check left is not null - if (left != null && left._1 != null) - { + if (left != null && left._1 != null) { //If right is null then return left if (right == null || right._2 == null) left @@ -151,21 +149,17 @@ object SparkGenerateDoiBoost { //Return publication Merged (b1.getId, b1) } - } - else { + } else { // Left is Null so we return right right } - } - - ) - .filter(s => s!= null && s._2!=null) + }) + .filter(s => s != null && s._2 != null) .map(s => s._2._2) .write .mode(SaveMode.Overwrite) .save(s"$workingDirPath/doiBoostPublicationFiltered") - val affiliationPath = parser.get("affiliationPath") val paperAffiliationPath = parser.get("paperAffiliationPath") @@ -290,4 +284,4 @@ object SparkGenerateDoiBoost { .save(s"$workingDirPath/doiBoostOrganization") } -} \ No newline at end of file +}