diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java index c284ad8bd..9428f5c59 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionUsageJob.java @@ -9,7 +9,10 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; @@ -29,6 +32,7 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Measure; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; /** * created the Atomic Action for each type of results @@ -73,7 +77,7 @@ public class SparkAtomicActionUsageJob implements Serializable { spark -> { removeOutputDir(spark, outputPath); prepareResults(dbname, spark, workingPath); - prepareActionSet(spark, workingPath, outputPath); + writeActionSet(spark, workingPath, outputPath); }); } @@ -89,7 +93,7 @@ public class SparkAtomicActionUsageJob implements Serializable { .json(workingPath); } - public static void prepareActionSet(SparkSession spark, String inputPath, String outputPath) { + public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) { readPath(spark, inputPath, UsageStatsModel.class) .groupByKey((MapFunction) us -> us.getResult_id(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { @@ -105,10 +109,13 @@ public class SparkAtomicActionUsageJob implements Serializable { res.setMeasures(getMeasure(first.getDownloads(), first.getViews())); return res; }, Encoders.bean(Result.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } private static List getMeasure(Long downloads, Long views) { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java index 7cc9eb326..1af408629 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/usagestats/SparkAtomicActionCountJobTest.java @@ -8,14 +8,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.schema.action.AtomicAction; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -26,8 +24,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob; -import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionCountJobTest { @@ -72,19 +68,20 @@ public class SparkAtomicActionCountJobTest { @Test void testMatch() { String usageScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/usagestats/usagestatsdb") + .getPath(); - SparkAtomicActionUsageJob.prepareActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); + SparkAtomicActionUsageJob.writeActionSet(spark, usageScoresPath, workingDir.toString() + "/actionSet"); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .textFile(workingDir.toString() + "/actionSet") - .map(usm -> OBJECT_MAPPER.readValue(usm, Result.class)); + JavaRDD tmp = sc.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .map(usm -> OBJECT_MAPPER.readValue(usm._2.getBytes(), AtomicAction.class)) + .map(aa -> (Result) aa.getPayload()); Assertions.assertEquals(9, tmp.count()); + tmp.foreach(r -> Assertions.assertEquals(2, r.getMeasures().size())); tmp .foreach(