From 89657a0b78d214017e11613ee4e34b0af805330f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 9 May 2022 14:43:27 +0200 Subject: [PATCH] [UsageCount] refactoring --- .../usagestats/SparkAtomicActionUsageJob.java | 14 +++++++------- .../usagestats/SparkAtomicActionCountJobTest.java | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index 9428f5c59d..5f099b8f26 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -9,7 +9,6 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -27,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Measure; @@ -109,12 +109,12 @@ public class SparkAtomicActionUsageJob implements Serializable { res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); return res; }, Encoders.bean(Result.class)) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 1af408629a..8aa718bae5 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,7 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -24,6 +23,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionCountJobTest { @@ -68,20 +68,20 @@ public class SparkAtomicActionCountJobTest { @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getPath(); SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) - .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) - .map(aa -> (Result) aa.getPayload()); + JavaRDD tmp = sc + .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) + .map(aa -> (Result) aa.getPayload()); Assertions.assertEquals(9, tmp.count()); - tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); tmp .foreach(