diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index d7ca007084..b25da07e05 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -8,13 +8,12 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Optional; -import eu.dnetlib.dhp.schema.oaf.*; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; - import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -28,6 +27,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -80,31 +80,31 @@ public class SparkAtomicActionUsageJob implements Serializable { }); } - private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) { + private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, + String attribute_name) { spark - .sql( - "Select " + attribute_name + " as id, sum(downloads) as downloads, sum(views) as views " + - "from " + dbname + "." + tableName + - "group by " + attribute_name) - .as(Encoders.bean(UsageStatsModel.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(workingPath); + .sql(String.format( + "select %s as id, sum(downloads) as downloads, sum(views) as views " + + "from %s.%s group by %s", attribute_name, dbname, tableName, attribute_name)) + .as(Encoders.bean(UsageStatsModel.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(workingPath); } - - public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { - getFinalIndicatorsResult(spark, inputPath+ "/usageDb"). - toJavaRDD(). - map(p -> new AtomicAction(p.getClass(),p)) - .union(getFinalIndicatorsProject(spark, inputPath + "/projectDb") - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p ))) - .union(getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb") - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p))) + getFinalIndicatorsResult(spark, inputPath + "/usageDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getFinalIndicatorsProject(spark, inputPath + "/projectDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) + .union( + getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb") + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p))) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) @@ -115,38 +115,36 @@ public class SparkAtomicActionUsageJob implements Serializable { private static Dataset getFinalIndicatorsResult(SparkSession spark, String inputPath) { return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Result r = new Result(); - r.setId("50|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Result.class)); + .map((MapFunction) usm -> { + Result r = new Result(); + r.setId("50|" + usm.getId()); + r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return r; + }, Encoders.bean(Result.class)); } private static Dataset getFinalIndicatorsProject(SparkSession spark, String inputPath) { return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Project r = new Project(); - r.setId("40|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Project.class)); + .map((MapFunction) usm -> { + Project p = new Project(); + p.setId("40|" + usm.getId()); + p.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return p; + }, Encoders.bean(Project.class)); } private static Dataset getFinalIndicatorsDatasource(SparkSession spark, String inputPath) { return readPath(spark, inputPath, UsageStatsModel.class) - .map((MapFunction) usm -> { - Datasource r = new Datasource(); - r.setId("10|" + usm.getId()); - r.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); - return r; - }, Encoders.bean(Datasource.class)); + .map((MapFunction) usm -> { + Datasource d = new Datasource(); + d.setId("10|" + usm.getId()); + d.setMeasures(getMeasure(usm.getDownloads(), usm.getViews())); + return d; + }, Encoders.bean(Datasource.class)); } - - private static List getMeasure(Long downloads, Long views) { DataInfo dataInfo = OafMapperUtils .dataInfo( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 1512f9cf88..5982c88201 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,7 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.OafEntity; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionCountJobTest { @@ -79,16 +79,16 @@ public class SparkAtomicActionCountJobTest { JavaRDD tmp = sc .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)); - //.map(aa -> (Result) aa.getPayload()); + // .map(aa -> (Result) aa.getPayload()); - Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); - Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); - Assertions.assertEquals(9,tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("50|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("10|")).count()); + Assertions.assertEquals(9, tmp.filter(aa -> ((OafEntity) aa.getPayload()).getId().startsWith("40|")).count()); - tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity)r.getPayload()).getMeasures().size())); + tmp.foreach(r -> Assertions.assertEquals(2, ((OafEntity) r.getPayload()).getMeasures().size())); tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( @@ -98,14 +98,14 @@ public class SparkAtomicActionCountJobTest { .forEach(u -> Assertions.assertFalse(u.getDataInfo().getDeletedbyinference())))); tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( m -> m.getUnit().stream().forEach(u -> Assertions.assertTrue(u.getDataInfo().getInferred())))); tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( @@ -116,7 +116,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( @@ -130,7 +130,7 @@ public class SparkAtomicActionCountJobTest { u.getDataInfo().getProvenanceaction().getClassid())))); tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( @@ -145,7 +145,7 @@ public class SparkAtomicActionCountJobTest { tmp .foreach( - r -> ((OafEntity)r.getPayload()) + r -> ((OafEntity) r.getPayload()) .getMeasures() .stream() .forEach( @@ -160,13 +160,19 @@ public class SparkAtomicActionCountJobTest { Assertions .assertEquals( - 1, tmp.filter(r -> ((OafEntity)r.getPayload()).getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")).count()); + 1, + tmp + .filter( + r -> ((OafEntity) r.getPayload()) + .getId() + .equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .count()); Assertions .assertEquals( "0", tmp - .map(r -> ((OafEntity)r.getPayload())) + .map(r -> ((OafEntity) r.getPayload())) .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) @@ -182,8 +188,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "5", tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|dedup_wf_001::53575dc69e9ace947e02d47ecd54a7a6")) .collect() .get(0) .getMeasures() @@ -199,8 +205,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "0", tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -215,8 +221,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "1", tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::17eda2ff77407538fbe5d3d719b9d1c0")) .collect() .get(0) .getMeasures() @@ -232,8 +238,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "2", tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() @@ -248,8 +254,8 @@ public class SparkAtomicActionCountJobTest { .assertEquals( "6", tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("50|doi_________::3085e4c6e051378ca6157fe7f0430c1f")) .collect() .get(0) .getMeasures() @@ -261,205 +267,203 @@ public class SparkAtomicActionCountJobTest { .get(0) .getValue()); + Assertions + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); + Assertions + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("40|f12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - + .assertEquals( + "5", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "0", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "5", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d1__________::53575dc69e9ace947e02d47ecd54a7a6")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "1", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "0", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "2", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("downloads")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); Assertions - .assertEquals( - "1", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d11_________::17eda2ff77407538fbe5d3d719b9d1c0")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - - Assertions - .assertEquals( - "2", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("downloads")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); - Assertions - .assertEquals( - "6", - tmp - .map(r -> ((OafEntity)r.getPayload())) - .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) - .collect() - .get(0) - .getMeasures() - .stream() - .filter(m -> m.getId().equals("views")) - .collect(Collectors.toList()) - .get(0) - .getUnit() - .get(0) - .getValue()); + .assertEquals( + "6", + tmp + .map(r -> ((OafEntity) r.getPayload())) + .filter(r -> r.getId().equals("10|d12_________::3085e4c6e051378ca6157fe7f0430c1f")) + .collect() + .get(0) + .getMeasures() + .stream() + .filter(m -> m.getId().equals("views")) + .collect(Collectors.toList()) + .get(0) + .getUnit() + .get(0) + .getValue()); } }