diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 5f099b8f2..3dbda3788 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -9,6 +9,8 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.common.MainEntityType; +import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -28,9 +30,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Measure; -import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -76,41 +75,37 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(dbname, spark, workingPath); + prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats"); + prepareData(dbname, spark, workingPath + "/projectDb", "project_stats"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats"); writeActionSet(spark, workingPath, outputPath); }); } - public static void prepareResults(String db, SparkSession spark, String workingPath) { + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName) { spark - .sql( - "Select result_id, downloads, views " + - "from " + db + ".usage_stats") - .as(Encoders.bean(UsageStatsModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + "Select result_id, downloads, views " + + "from " + dbname + "." + tableName) + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } + + public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { - readPath(spark, inputPath, UsageStatsModel.class) - .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { - UsageStatsModel first = it.next(); - it.forEachRemaining(us -> { - first.setDownloads(first.getDownloads() + us.getDownloads()); - first.setViews(first.getViews() + us.getViews()); - }); - - Result res = new Result(); - res.setId("50|" + k); - - res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); - return res; - }, Encoders.bean(Result.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) + getFinalIndicatorsResult(spark, inputPath+ "/usageDb"). + toJavaRDD(). + map(p -> new AtomicAction(p.getClass(),p)) + .union(getFinalIndicatorsProject(spark, inputPath + "/projectDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p ))) + .union(getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) @@ -118,6 +113,54 @@ public class SparkAtomicActionUsageJob implements Serializable { } + private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Result.class)); + } + + private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Project r = new Project(); + r.setId("40|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Project.class)); + } + + private static Dataset getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { + + return getUsageStatsModelDataset(spark, inputPath) + .map((MapFunction) usm -> { + Datasource r = new Datasource(); + r.setId("10|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Datasource.class)); + } + + private static Dataset getUsageStatsModelDataset(SparkSession spark, String inputPath) { + return readPath(spark, inputPath, UsageStatsModel.class) + .groupByKey((MapFunction) us -> us.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + UsageStatsModel first = it.next(); + it.forEachRemaining(us -> { + first.setDownloads(first.getDownloads() + us.getDownloads()); + first.setViews(first.getViews() + us.getViews()); + }); + first.setId(k); + return first; + + }, Encoders.bean(UsageStatsModel.class)); + } + private static List getMeasure(Long downloads, Long views) { DataInfo dataInfo = OafMapperUtils .dataInfo( diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java index df8a77eb6..07f69b0bb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsModel.java @@ -4,16 +4,16 @@ package eu.dnetlib.dhp.actionmanager.usagestats; import java.io.Serializable; public class UsageStatsModel implements Serializable { - private String result_id; + private String id; private Long downloads; private Long views; - public String getResult_id() { - return result_id; + public String getId() { + return id; } - public void setResult_id(String result_id) { - this.result_id = result_id; + public void setId(String id) { + this.id = id; } public Long getDownloads() { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index d94cf7d53..de188718a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -89,7 +89,7 @@ --hive_metastore_uris${hiveMetastoreUris} --outputPath${outputPath} --usagestatsdb${usagestatsdb} - --workingPath${workingDir}/usageDb + --workingPath${workingDir} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 8aa718bae..8ff3c36f7 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -68,24 +69,26 @@ public class SparkAtomicActionCountJobTest { @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats") .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc + JavaRDD tmp = sc .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) - .map(aa -> (Result) aa.getPayload()); + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + //.map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9, tmp.count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); - tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity)r.getPayload()).getMeasures().size())); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -95,14 +98,14 @@ public class SparkAtomicActionCountJobTest { .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -113,7 +116,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -127,7 +130,7 @@ public class SparkAtomicActionCountJobTest { u.getDataInfo().getProvenanceaction().getClassid())))); tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -142,7 +145,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> r + r -> ((OafEntity)r.getPayload()) .getMeasures() .stream() .forEach( @@ -157,12 +160,13 @@ public class SparkAtomicActionCountJobTest { Assertions .assertEquals( - 1, tmp.filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + 1, tmp.filter(r -> ((OafEntity)r.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); Assertions .assertEquals( "0", tmp + .map(r -> ((OafEntity)r.getPayload())) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) @@ -178,7 +182,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "5", tmp - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) .getMeasures() @@ -194,7 +199,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "0", tmp - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -209,7 +215,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "1", tmp - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -225,7 +232,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "2", tmp - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() @@ -240,7 +248,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "6", tmp - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity)r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb new file mode 100644 index 000000000..7337ba3e2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb @@ -0,0 +1,12 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb new file mode 100644 index 000000000..0ecab2a82 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb @@ -0,0 +1,12 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb new file mode 100644 index 000000000..eb3290eda --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb @@ -0,0 +1,12 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb deleted file mode 100644 index fee74f697..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb +++ /dev/null @@ -1,12 +0,0 @@ -{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":4} -{"result_id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":1} -{"result_id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} -{"result_id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} -{"result_id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":3} -{"result_id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":0,"views":3} -{"result_id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} -{"result_id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":8} -{"result_id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":0,"views":2} -{"result_id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9b60b9078..839188a62 100644 --- a/pom.xml +++ b/pom.xml @@ -807,7 +807,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [3.15.0] + [3.15.1-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6]