diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 6a0938708..8eafaadca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -11,6 +11,9 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -26,6 +29,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.message.MessageSender; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -33,6 +37,8 @@ public class TransformSparkJobNode { private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class); + private static int RECORDS_PER_TASK = 200; + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -67,6 +73,11 @@ public class TransformSparkJobNode { final String dateOfTransformation = parser.get("dateOfTransformation"); log.info(String.format("dateOfTransformation: %s", dateOfTransformation)); + final Integer rpt = Optional + .ofNullable(parser.get("recordsPerTask")) + .map(Integer::valueOf) + .orElse(RECORDS_PER_TASK); + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); @@ -79,12 +90,12 @@ public class TransformSparkJobNode { isSparkSessionManaged, spark -> { transformRecords( - parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath); + parser.getObjectMap(), isLookupService, spark, inputPath, outputBasePath, rpt); }); } public static void transformRecords(final Map args, final ISLookUpService isLookUpService, - final SparkSession spark, final String inputPath, final String outputBasePath) + final SparkSession spark, final String inputPath, final String outputBasePath, final Integer rpt) throws DnetTransformationException, IOException { final LongAccumulator totalItems = spark.sparkContext().longAccumulator(CONTENT_TOTALITEMS); @@ -99,18 +110,25 @@ public class TransformSparkJobNode { final String workflowId = args.get("workflowId"); log.info("workflowId is {}", workflowId); + MapFunction x = TransformationFactory + .getTransformationPlugin(args, ct, isLookUpService); + + final Dataset inputMDStore = spark + .read() + .format("parquet") + .load(inputPath) + .as(encoder); + + final long totalInput = inputMDStore.count(); + final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId); try (AggregatorReport report = new AggregatorReport(messageSender)) { try { - final Dataset mdstore = spark - .read() - .format("parquet") - .load(inputPath) - .as(encoder) - .map( - TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), - encoder); - saveDataset(mdstore, outputBasePath + MDSTORE_DATA_PATH); + JavaRDD mdstore = inputMDStore + .javaRDD() + .repartition(getRepartitionNumber(totalInput, rpt)) + .map((Function) x::call); + saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH); log.info("Transformed item " + ct.getProcessedItems().count()); log.info("Total item " + ct.getTotalItems().count()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index f803c7cbc..e6f42fe09 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.transformation.xslt; import java.io.ByteArrayInputStream; +import java.io.Serializable; import java.io.StringWriter; import java.nio.charset.StandardCharsets; @@ -15,7 +16,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.mdstore.MetadataRecord; import net.sf.saxon.s9api.*; -public class XSLTTransformationFunction implements MapFunction { +public class XSLTTransformationFunction implements MapFunction, Serializable { public final static String QNAME_BASE_URI = "http://eu/dnetlib/transform"; @@ -27,6 +28,8 @@ public class XSLTTransformationFunction implements MapFunctiondnetMessageManagerURL The URI of the Dnet Message Manager + + recordsPerTask + 200 + The URI of the Dnet Message Manager + + @@ -103,6 +109,7 @@ --transformationPlugin${transformationPlugin} --transformationRuleId${transformationRuleId} --isLookupUrl${isLookupUrl} + --recordsPerTask${recordsPerTask} --workflowId${workflowId} --dnetMessageManagerURL${dnetMessageManagerURL} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json index ee9099dde..4cc2da0c4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json @@ -48,6 +48,12 @@ "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true }, + { + "paramName": "rpt", + "paramLongName": "recordsPerTask", + "paramDescription": "the number of records transformed by a single Task", + "paramRequired": false + }, { "paramName": "tp", "paramLongName": "transformationPlugin", diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java index b8eb58ec2..7016d39fa 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java @@ -180,7 +180,7 @@ public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest { TransformSparkJobNode .transformRecords( parameters, isLookUpService, spark, mdStoreV2.getHdfsPath() + MDSTORE_DATA_PATH, - mdStoreCleanedVersion.getHdfsPath()); + mdStoreCleanedVersion.getHdfsPath(), 200); final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mOutput = spark diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 1669fe89d..0a96fe406 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -167,7 +167,8 @@ public class TransformationJobTest extends AbstractVocabularyTest { }).collect(Collectors.toMap(data -> data[0], data -> data[1])); - TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output); + TransformSparkJobNode + .transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output, 200); // TODO introduce useful assertions @@ -221,7 +222,8 @@ public class TransformationJobTest extends AbstractVocabularyTest { }).collect(Collectors.toMap(data -> data[0], data -> data[1])); - TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output); + TransformSparkJobNode + .transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output, 200); // TODO introduce useful assertions diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/zenodo_tr.xslt b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/zenodo_tr.xslt index 9a02c9071..2ee3c1719 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/zenodo_tr.xslt +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/zenodo_tr.xslt @@ -1,16 +1,13 @@ - + - @@ -29,34 +26,27 @@ + select="'sgov________::'"/> - + - - - - - - - - - + @@ -68,55 +58,44 @@ - - - - - - - + + - - + + - + - + - - + - @@ -147,17 +126,12 @@ - - + - - - @@ -308,13 +282,10 @@ select="concat($varWT, replace(normalize-space(.), '(info:eu-repo/grantagreement/wt/.*?/)([^/]*)(/.*)?', '$2', 'i'))"/> - - - + @@ -322,7 +293,6 @@ - @@ -343,24 +313,19 @@ - - - - - @@ -370,7 +335,6 @@ select="concat('http://hdl.handle.net/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/> - @@ -380,7 +344,6 @@ select="concat('http://nbn-resolving.org/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/> - @@ -390,11 +353,8 @@ select="concat('http://dx.doi.org/', //*[local-name() = 'resource']/*[local-name()='identifier'])"/> - - - @@ -404,8 +364,7 @@ - + @@ -413,8 +372,7 @@ - + @@ -422,17 +380,12 @@ - + - - - - @@ -441,5 +394,4 @@ - \ No newline at end of file