diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java new file mode 100644 index 0000000000..15e0bb454c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.aggregation.common; + +public class AggregationConstants { + + public static final String SEQUENCE_FILE_NAME = "/sequence_file"; + public static final String MDSTORE_DATA_PATH = "/store"; + public static final String MDSTORE_SIZE_PATH = "/size"; + + public static final String CONTENT_TOTALITEMS = "TotalItems"; + public static final String CONTENT_INVALIDRECORDS = "InvalidRecords"; + public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems"; + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java index eb971c4754..d657dee027 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java @@ -5,6 +5,7 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -21,6 +22,8 @@ public class AggregationUtility { private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class); + public static final ObjectMapper MAPPER = new ObjectMapper(); + public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path) throws IOException { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index bbed36a9c3..13813623cd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -1,15 +1,11 @@ package eu.dnetlib.dhp.collection; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.model.mdstore.Provenance; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; @@ -26,26 +22,22 @@ import org.dom4j.Node; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; -import eu.dnetlib.dhp.model.mdstore.Provenance; -import net.sf.saxon.expr.Component; import scala.Tuple2; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; + +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - - private static final String DATASET_NAME = "/store"; - public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -88,11 +80,6 @@ public class GenerateNativeStoreSparkJob { log.info("isSparkSessionManaged: {}", isSparkSessionManaged); SparkConf conf = new SparkConf(); - /* - * conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf .registerKryoClasses( new - * Class[] { MetadataRecord.class, Provenance.class }); - */ - runWithSparkSession( conf, isSparkSessionManaged, @@ -109,10 +96,10 @@ public class GenerateNativeStoreSparkJob { MDStoreVersion readVersion) throws IOException { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + final LongAccumulator totalItems = sc.sc().longAccumulator(CONTENT_TOTALITEMS); + final LongAccumulator invalidRecords = sc.sc().longAccumulator(CONTENT_INVALIDRECORDS); - final String seqFilePath = currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENCE_FILE_NAME; + final String seqFilePath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME; final JavaRDD nativeStore = sc .sequenceFile(seqFilePath, IntWritable.class, Text.class) .map( @@ -130,13 +117,13 @@ public class GenerateNativeStoreSparkJob { final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); - final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME; + final String targetPath = currentVersion.getHdfsPath() + MDSTORE_DATA_PATH; if (readVersion != null) { // INCREMENTAL MODE log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath()); Dataset currentMdStoreVersion = spark .read() - .load(readVersion.getHdfsPath() + DATASET_NAME) + .load(readVersion.getHdfsPath() + MDSTORE_DATA_PATH) .as(encoder); TypedColumn aggregator = new MDStoreAggregator().toColumn(); @@ -159,7 +146,7 @@ public class GenerateNativeStoreSparkJob { final Long total = spark.read().load(targetPath).count(); log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName()); - writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); + writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH); } public static class MDStoreAggregator extends Aggregator { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index e24b9ad1da..da5b197d64 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.collection.worker; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; + import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,8 +27,6 @@ public class CollectorWorkerApplication { private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); - public static String SEQUENCE_FILE_NAME = "/sequence_file"; - /** * @param args */ diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 193da38788..f8ddf47e2d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -2,14 +2,17 @@ package eu.dnetlib.dhp.transformation; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; import java.io.IOException; import java.util.Map; import java.util.Optional; +import eu.dnetlib.dhp.aggregation.common.AggregationConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -76,29 +79,36 @@ public class TransformSparkJobNode { conf, isSparkSessionManaged, spark -> transformRecords( - parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + "/store", - cleanedMdStoreVersion.getHdfsPath() + "/store")); + parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH, + cleanedMdStoreVersion.getHdfsPath() + MDSTORE_DATA_PATH)); } public static void transformRecords(final Map args, final ISLookUpService isLookUpService, final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException, IOException { - final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); - final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); - final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); + final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS); + final LongAccumulator errorItems = spark.sparkContext().longAccumulator(CONTENT_INVALIDRECORDS); + final LongAccumulator transformedItems = spark.sparkContext().longAccumulator(CONTENT_TRANSFORMEDRECORDS); final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems); final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); - final MapFunction XSLTTransformationFunction = TransformationFactory - .getTransformationPlugin(args, ct, isLookUpService); - mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath + "/store"); + + saveDataset( + spark.read() + .format("parquet") + .load(inputPath) + .as(encoder) + .map( + TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), + encoder), + outputPath + MDSTORE_DATA_PATH); + log.info("Transformed item " + ct.getProcessedItems().count()); log.info("Total item " + ct.getTotalItems().count()); log.info("Transformation Error item " + ct.getErrorItems().count()); - AggregationUtility.writeTotalSizeOnHDFS(spark, ct.getProcessedItems().count(), outputPath + "/size"); + writeTotalSizeOnHDFS(spark, ct.getProcessedItems().count(), outputPath + MDSTORE_SIZE_PATH); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java index c9ccbc7ff4..ac65ef6a9a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java @@ -145,7 +145,7 @@ public class AggregationJobTest { } - //@Test + @Test @Order(3) public void testTransformSparkJob() throws Exception {