From 55ea485783ad1980bf40388dc48e8991db1d02f7 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Jun 2023 18:39:30 +0200 Subject: [PATCH 1/3] [UsageCount] split the count for result at the level of the datasource. for each indicator one unit is specified for each datasource contrinuting to that indicator value. The datasource key is the value of the key element in the unit for the measure, while the count for that datasource is in the value --- .../usagestats/SparkAtomicActionUsageJob.java | 76 ++- .../usagestats/UsageStatsResultModel.java | 17 + .../SparkAtomicActionCountJobTest.java | 594 ++++++++++++++---- .../usagestats/{ => test1}/datasourceDb | 0 .../usagestats/{ => test1}/projectDb | 0 .../actionmanager/usagestats/test1/usageDb | 9 + .../usagestats/test2/datasourceDb | 9 + .../actionmanager/usagestats/test2/projectDb | 9 + .../actionmanager/usagestats/test2/usageDb | 9 + .../dhp/actionmanager/usagestats/usageDb | 9 - 10 files changed, 604 insertions(+), 128 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/{ => test1}/datasourceDb (100%) rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/{ => test1}/projectDb (100%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb delete mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 9b444c6fa..981da0789 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -14,6 +15,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -73,13 +75,28 @@ public class SparkAtomicActionUsageJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id"); + prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id"); prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); - prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repositor_id"); + prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id"); writeActionSet(spark, workingPath, outputPath); }); } + private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) { + spark + .sql( + String + .format( + "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s, %s", + resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) + .as(Encoders.bean(UsageStatsResultModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); + } + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { spark @@ -114,16 +131,55 @@ public class SparkAtomicActionUsageJob implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - + public static Measure newMeasureInstance(String id) { + Measure m = new Measure(); + m.setId(id); + m.setUnit(new ArrayList<>()); + return m; + } private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { - return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Result.class)); + return readPath(spark, inputPath, UsageStatsResultModel.class) + .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k,it) -> { + Result r = new Result(); + r.setId("50|" + k); + //id = download or view and unit = list of key value pairs + Measure download = newMeasureInstance("downloads"); + Measure view = newMeasureInstance("views"); + UsageStatsResultModel first = it.next(); + addCountForDatasource(download, first, view); + it.forEachRemaining(usm -> { + addCountForDatasource(download, usm, view); + }); + r.setMeasures(Arrays.asList(download, view)); + return r; + }, Encoders.bean(Result.class)) +// .map((MapFunction) usm -> { +// Result r = new Result(); +// r.setId("50|" + usm.getId()); +// r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); +// return r; +// }, Encoders.bean(Result.class)); + ; + } + + private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) { + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); + view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java new file mode 100644 index 000000000..71d046c6f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java @@ -0,0 +1,17 @@ +package eu.dnetlib.dhp.actionmanager.usagestats; + +/** + * @author miriam.baglioni + * @Date 30/06/23 + */ +public class UsageStatsResultModel extends UsageStatsModel{ + private String datasourceId ; + + public String getDatasourceId() { + return datasourceId; + } + + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 5982c8820..82728ff9d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,6 +8,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.oaf.Measure; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -67,9 +68,9 @@ public class SparkAtomicActionCountJobTest { } @Test - void testMatch() { + void testUsageStatsDb2() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats") + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test2") .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); @@ -81,7 +82,7 @@ public class SparkAtomicActionCountJobTest { .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); // .map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(7, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); @@ -143,7 +144,7 @@ public class SparkAtomicActionCountJobTest { "Inferred by OpenAIRE", u.getDataInfo().getProvenanceaction().getClassname())))); - tmp + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) .foreach( r -> ((OafEntity) r.getPayload()) .getMeasures() @@ -159,113 +160,60 @@ public class SparkAtomicActionCountJobTest { u.getKey())))); Assertions - .assertEquals( - 1, - tmp - .filter( + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first() + .getPayload(); + + entity + .getMeasures() + .stream() + .forEach( + m -> Assertions.assertEquals(3, m.getUnit().size() )); + + Measure downloads = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .findFirst() + .get(); + + + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + Measure views = entity.getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .findFirst() + .get(); + + Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); Assertions .assertEquals( @@ -465,5 +413,433 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); } + @Test + void testMatch() { + String usageScoresPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") + .getPath(); + SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + // .map(aa -> (Result) aa.getPayload()); + + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); + + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); + tmp + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "10|fake1", + u.getKey())))); + + tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + + Assertions + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/datasourceDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/datasourceDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/projectDb rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/projectDb diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb new file mode 100644 index 000000000..a636407d0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test1/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb new file mode 100644 index 000000000..efbb4cfbd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/datasourceDb @@ -0,0 +1,9 @@ +{"id":"d1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"d11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"d11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"d11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"d12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"d12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"d12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"d13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"d13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb new file mode 100644 index 000000000..0b8cd1d70 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/projectDb @@ -0,0 +1,9 @@ +{"id":"f1__________::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} +{"id":"f11_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} +{"id":"f11_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} +{"id":"f11_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} +{"id":"f12_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} +{"id":"f12_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} +{"id":"f12_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} +{"id":"f13_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} +{"id":"f13_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb new file mode 100644 index 000000000..0712d110e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/test2/usageDb @@ -0,0 +1,9 @@ +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake1","downloads":0,"views":5} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake2","downloads":0,"views":1} +{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","datasourceId":"10|fake3","downloads":1,"views":3} +{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","datasourceId":"10|fake1","downloads":1,"views":1} +{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","datasourceId":"10|fake1","downloads":2,"views":6} +{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","datasourceId":"10|fake1","downloads":0,"views":1} +{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","datasourceId":"10|fake1","downloads":1,"views":3} +{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","datasourceId":"10|fake1","downloads":1,"views":10} +{"id":"doi_________::4938a71a884dd481d329657aa543b850","datasourceId":"10|fake1","downloads":0,"views":3} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb deleted file mode 100644 index 495ae0fc5..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/usagestats/usageDb +++ /dev/null @@ -1,9 +0,0 @@ -{"id":"dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6","downloads":0,"views":5} -{"id":"doi_________::17eda2ff77407538fbe5d3d719b9d1c0","downloads":0,"views":1} -{"id":"doi_________::1d4dc08605fd0a2be1105d30c63bfea1","downloads":1,"views":3} -{"id":"doi_________::2e3527822854ca9816f6dfea5bff61a8","downloads":1,"views":1} -{"id":"doi_________::3085e4c6e051378ca6157fe7f0430c1f","downloads":2,"views":6} -{"id":"doi_________::33f710e6dd30cc5e67e35b371ddc33cf","downloads":0,"views":1} -{"id":"doi_________::39738ebf10654732dd3a7af9f24655f8","downloads":1,"views":3} -{"id":"doi_________::3c3b65f07c1a06c7894397eda1d11bbf","downloads":1,"views":10} -{"id":"doi_________::4938a71a884dd481d329657aa543b850","downloads":0,"views":3} \ No newline at end of file -- 2.17.1 From 4c9bc4c3a58c1983ab2f246e1cbead8103761f45 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Jun 2023 19:05:15 +0200 Subject: [PATCH 2/3] refactoring --- .../usagestats/SparkAtomicActionUsageJob.java | 92 +- .../usagestats/UsageStatsResultModel.java | 17 +- .../SparkAtomicActionCountJobTest.java | 893 +++++++++--------- 3 files changed, 520 insertions(+), 482 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 981da0789..01a30ac4a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -82,19 +82,21 @@ public class SparkAtomicActionUsageJob implements Serializable { }); } - private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName) { + private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, + String resultAttributeName, String datasourceAttributeName) { spark - .sql( - String - .format( - "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + - "from %s.%s group by %s, %s", - resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) - .as(Encoders.bean(UsageStatsResultModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql( + String + .format( + "select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s, %s", + resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, + datasourceAttributeName)) + .as(Encoders.bean(UsageStatsResultModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, @@ -131,30 +133,32 @@ public class SparkAtomicActionUsageJob implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } + public static Measure newMeasureInstance(String id) { Measure m = new Measure(); m.setId(id); m.setUnit(new ArrayList<>()); return m; } + private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { return readPath(spark, inputPath, UsageStatsResultModel.class) - .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k,it) -> { - Result r = new Result(); - r.setId("50|" + k); - //id = download or view and unit = list of key value pairs - Measure download = newMeasureInstance("downloads"); - Measure view = newMeasureInstance("views"); - UsageStatsResultModel first = it.next(); - addCountForDatasource(download, first, view); - it.forEachRemaining(usm -> { - addCountForDatasource(download, usm, view); - }); - r.setMeasures(Arrays.asList(download, view)); - return r; - }, Encoders.bean(Result.class)) + .groupByKey((MapFunction) usm -> usm.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + Result r = new Result(); + r.setId("50|" + k); + // id = download or view and unit = list of key value pairs + Measure download = newMeasureInstance("downloads"); + Measure view = newMeasureInstance("views"); + UsageStatsResultModel first = it.next(); + addCountForDatasource(download, first, view); + it.forEachRemaining(usm -> { + addCountForDatasource(download, usm, view); + }); + r.setMeasures(Arrays.asList(download, view)); + return r; + }, Encoders.bean(Result.class)) // .map((MapFunction) usm -> { // Result r = new Result(); // r.setId("50|" + usm.getId()); @@ -166,20 +170,26 @@ public class SparkAtomicActionUsageJob implements Serializable { private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) { DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils - .qualifier( - UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - ""); - download.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); - view.getUnit().add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_USAGE_COUNTS_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + ""); + download + .getUnit() + .add( + OafMapperUtils + .newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getDownloads()), dataInfo)); + view + .getUnit() + .add(OafMapperUtils.newKeyValueInstance(usm.getDatasourceId(), String.valueOf(usm.getViews()), dataInfo)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java index 71d046c6f..994e03bee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/UsageStatsResultModel.java @@ -1,17 +1,18 @@ + package eu.dnetlib.dhp.actionmanager.usagestats; /** * @author miriam.baglioni * @Date 30/06/23 */ -public class UsageStatsResultModel extends UsageStatsModel{ - private String datasourceId ; +public class UsageStatsResultModel extends UsageStatsModel { + private String datasourceId; - public String getDatasourceId() { - return datasourceId; - } + public String getDatasourceId() { + return datasourceId; + } - public void setDatasourceId(String datasourceId) { - this.datasourceId = datasourceId; - } + public void setDatasourceId(String datasourceId) { + this.datasourceId = datasourceId; + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 82728ff9d..c5a1ec126 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,7 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.Measure; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; @@ -144,7 +144,8 @@ public class SparkAtomicActionCountJobTest { "Inferred by OpenAIRE", u.getDataInfo().getProvenanceaction().getClassname())))); - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) .foreach( r -> ((OafEntity) r.getPayload()) .getMeasures() @@ -160,60 +161,82 @@ public class SparkAtomicActionCountJobTest { u.getKey())))); Assertions - .assertEquals( - 1, - tmp - .filter( - r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); - - OafEntity entity = (OafEntity) tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).first() - .getPayload(); - - entity - .getMeasures() - .stream() - .forEach( - m -> Assertions.assertEquals(3, m.getUnit().size() )); - - Measure downloads = entity.getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .findFirst() - .get(); - - - Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(0), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(1), downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); - - Measure views = entity.getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .findFirst() - .get(); - - Assertions.assertEquals(String.valueOf(5), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(1), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); - Assertions.assertEquals(String.valueOf(3), views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) - .foreach( + .assertEquals( + 1, + tmp + .filter( r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); + OafEntity entity = (OafEntity) tmp + .filter( + aa -> ((OafEntity) aa.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .first() + .getPayload(); + + entity + .getMeasures() + .stream() + .forEach( + m -> Assertions.assertEquals(3, m.getUnit().size())); + + Measure downloads = entity + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .findFirst() + .get(); + + Assertions + .assertEquals( + String.valueOf(0), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(0), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(1), + downloads.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + Measure views = entity + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .findFirst() + .get(); + + Assertions + .assertEquals( + String.valueOf(5), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake1")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(1), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake2")).findFirst().get().getValue()); + Assertions + .assertEquals( + String.valueOf(3), + views.getUnit().stream().filter(u -> u.getKey().equals("10|fake3")).findFirst().get().getValue()); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); Assertions .assertEquals( @@ -413,19 +436,20 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); } + @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/test1") + .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); // .map(aa -> (Result) aa.getPayload()); Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); @@ -434,412 +458,415 @@ public class SparkAtomicActionCountJobTest { tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach(u -> Assertions.assertFalse(u.getDataInfo().getInvisible())))); tmp - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "measure:usage_counts", - u.getDataInfo().getProvenanceaction().getClassid())))); + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "measure:usage_counts", + u.getDataInfo().getProvenanceaction().getClassid())))); tmp - .foreach( + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "Inferred by OpenAIRE", + u.getDataInfo().getProvenanceaction().getClassname())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "10|fake1", + u.getKey())))); + + tmp + .filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) + .foreach( + r -> ((OafEntity) r.getPayload()) + .getMeasures() + .stream() + .forEach( + m -> m + .getUnit() + .stream() + .forEach( + u -> Assertions + .assertEquals( + "count", + u.getKey())))); + + Assertions + .assertEquals( + 1, + tmp + .filter( r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "Inferred by OpenAIRE", - u.getDataInfo().getProvenanceaction().getClassname())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "10|fake1", - u.getKey())))); - - tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")) - .foreach( - r -> ((OafEntity) r.getPayload()) - .getMeasures() - .stream() - .forEach( - m -> m - .getUnit() - .stream() - .forEach( - u -> Assertions - .assertEquals( - "count", - u.getKey())))); + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); Assertions - .assertEquals( - 1, - tmp - .filter( - r -> ((OafEntity) r.getPayload()) - .getId() - .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .count()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity) r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); } } -- 2.17.1 From a418dacb47fba75af4abd94ada4bd53c869eea94 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 29 Jan 2024 18:12:33 +0100 Subject: [PATCH 3/3] [UsageCount] code extention to include also the name of the datasource --- .../usagestats/SparkAtomicActionUsageJob.java | 30 ++++++++++++++++--- .../usagestats/input_actionset_parameter.json | 6 ++++ .../usagestats/oozie_app/workflow.xml | 1 + 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 01a30ac4a..d6b52ad9b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -14,6 +14,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -70,12 +71,19 @@ public class SparkAtomicActionUsageJob implements Serializable { final String workingPath = parser.get("workingPath"); + final String datasourcePath = parser.get("datasourcePath"); + runWithSparkHiveSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id"); + prepareResultData( + dbname, spark, workingPath + "/usageDb", + "usage_stats", + "result_id", + "repository_id", + datasourcePath); prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id"); prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id"); writeActionSet(spark, workingPath, outputPath); @@ -83,8 +91,9 @@ public class SparkAtomicActionUsageJob implements Serializable { } private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, - String resultAttributeName, String datasourceAttributeName) { - spark + String resultAttributeName, String datasourceAttributeName, + String datasourcePath) { + Dataset resultModel = spark .sql( String .format( @@ -92,7 +101,20 @@ public class SparkAtomicActionUsageJob implements Serializable { "from %s.%s group by %s, %s", resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)) - .as(Encoders.bean(UsageStatsResultModel.class)) + .as(Encoders.bean(UsageStatsResultModel.class)); + Dataset datasource = readPath(spark, datasourcePath, Datasource.class) + .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) + .map((MapFunction) d -> { + d.setId(d.getId().substring(3)); + return d; + }, Encoders.bean(Datasource.class)); + resultModel + .joinWith(datasource, resultModel.col("datasourceId").equalTo(datasource.col("id")), "left") + .map((MapFunction, UsageStatsResultModel>) t2 -> { + UsageStatsResultModel usrm = t2._1(); + usrm.setDatasourceId(usrm.getDatasourceId() + "||" + t2._2().getOfficialname().getValue()); + return usrm; + }, Encoders.bean(UsageStatsResultModel.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json index e9200d3ad..a42817f05 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json @@ -28,5 +28,11 @@ "paramLongName": "workingPath", "paramDescription": "the workingPath where to save the content of the usage_stats table", "paramRequired": true + }, + { + "paramName": "dp", + "paramLongName": "datasourcePath", + "paramDescription": "the workingPath where to save the content of the usage_stats table", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml index de188718a..88f259519 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/usagestats/oozie_app/workflow.xml @@ -90,6 +90,7 @@ --outputPath${outputPath} --usagestatsdb${usagestatsdb} --workingPath${workingDir} + --datasourcePath${datasourcePath} -- 2.17.1