diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 9811fb707..861ae5201 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -1,17 +1,21 @@ package eu.dnetlib.dhp.collection; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import org.apache.commons.cli.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -23,6 +27,8 @@ import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,6 +41,8 @@ import eu.dnetlib.message.MessageType; public class GenerateNativeStoreSparkJob { + private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); + public static MetadataRecord parseRecord( final String input, final String xpath, @@ -78,84 +86,90 @@ public class GenerateNativeStoreSparkJob { final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); final long dateOfCollection = new Long(parser.get("dateOfCollection")); - final SparkSession spark = SparkSession - .builder() - .appName("GenerateNativeStoreSparkJob") - .master(parser.get("master")) - .getOrCreate(); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final Map ongoingMap = new HashMap<>(); final Map reportMap = new HashMap<>(); final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final JavaPairRDD inputRDD = sc - .sequenceFile(parser.get("input"), IntWritable.class, Text.class); + final JavaPairRDD inputRDD = sc + .sequenceFile(parser.get("input"), IntWritable.class, Text.class); - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + final MessageManager manager = new MessageManager( + parser.get("rabbitHost"), + parser.get("rabbitUser"), + parser.get("rabbitPassword"), + false, + false, + null); - final MessageManager manager = new MessageManager( - parser.get("rabbitHost"), - parser.get("rabbitUser"), - parser.get("rabbitPassword"), - false, - false, - null); + final JavaRDD mappeRDD = inputRDD + .map( + item -> parseRecord( + item._2().toString(), + parser.get("xpath"), + parser.get("encoding"), + provenance, + dateOfCollection, + totalItems, + invalidRecords)) + .filter(Objects::nonNull) + .distinct(); - final JavaRDD mappeRDD = inputRDD - .map( - item -> parseRecord( - item._2().toString(), - parser.get("xpath"), - parser.get("encoding"), - provenance, - dateOfCollection, - totalItems, - invalidRecords)) - .filter(Objects::nonNull) - .distinct(); + ongoingMap.put("ongoing", "0"); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } - ongoingMap.put("ongoing", "0"); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); + final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); + mdStoreRecords.add(mdstore.count()); + ongoingMap.put("ongoing", "" + totalItems.value()); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } + mdstore.write().format("parquet").save(parser.get("output")); + reportMap.put("inputItem", "" + totalItems.value()); + reportMap.put("invalidRecords", "" + invalidRecords.value()); + reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); + if (!test) { + manager + .sendMessage( + new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), + parser.get("rabbitReportQueue"), + true, + false); + manager.close(); + } + }); - final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); - mdStoreRecords.add(mdstore.count()); - ongoingMap.put("ongoing", "" + totalItems.value()); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - mdstore.write().format("parquet").save(parser.get("output")); - reportMap.put("inputItem", "" + totalItems.value()); - reportMap.put("invalidRecords", "" + invalidRecords.value()); - reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); - if (!test) { - manager - .sendMessage( - new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), - parser.get("rabbitReportQueue"), - true, - false); - manager.close(); - } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 5f39717d0..8737d36ef 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -1,13 +1,17 @@ package eu.dnetlib.dhp.transformation; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + import java.io.ByteArrayInputStream; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import org.apache.commons.cli.*; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; @@ -17,8 +21,11 @@ import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary; import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper; @@ -29,6 +36,8 @@ import eu.dnetlib.message.MessageType; public class TransformSparkJobNode { + private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class); + public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -40,12 +49,18 @@ public class TransformSparkJobNode { parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("input"); final String outputPath = parser.get("output"); final String workflowId = parser.get("workflowId"); final String trasformationRule = extractXSLTFromTR( Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule")))); - final String master = parser.get("master"); + final String rabbitUser = parser.get("rabbitUser"); final String rabbitPassword = parser.get("rabbitPassword"); final String rabbitHost = parser.get("rabbitHost"); @@ -53,46 +68,48 @@ public class TransformSparkJobNode { final long dateOfCollection = new Long(parser.get("dateOfCollection")); final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); - final SparkSession spark = SparkSession - .builder() - .appName("TransformStoreSparkJob") - .master(master) - .getOrCreate(); + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); + final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); + final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); + final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); + final Map vocabularies = new HashMap<>(); + vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages")); + final TransformFunction transformFunction = new TransformFunction( + totalItems, + errorItems, + transformedItems, + trasformationRule, + dateOfCollection, + vocabularies); + mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath); + if (rabbitHost != null) { + System.out.println("SEND FINAL REPORT"); + final Map reportMap = new HashMap<>(); + reportMap.put("inputItem", "" + totalItems.value()); + reportMap.put("invalidRecords", "" + errorItems.value()); + reportMap.put("mdStoreSize", "" + transformedItems.value()); + System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap)); + if (!test) { + final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, + false, + null); + manager + .sendMessage( + new Message(workflowId, "Transform", MessageType.REPORT, reportMap), + rabbitReportQueue, + true, + false); + manager.close(); + } + } + }); - final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); - final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); - final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); - final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); - final Map vocabularies = new HashMap<>(); - vocabularies.put("dnet:languages", VocabularyHelper.getVocabularyFromAPI("dnet:languages")); - final TransformFunction transformFunction = new TransformFunction( - totalItems, - errorItems, - transformedItems, - trasformationRule, - dateOfCollection, - vocabularies); - mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath); - if (rabbitHost != null) { - System.out.println("SEND FINAL REPORT"); - final Map reportMap = new HashMap<>(); - reportMap.put("inputItem", "" + totalItems.value()); - reportMap.put("invalidRecords", "" + errorItems.value()); - reportMap.put("mdStoreSize", "" + transformedItems.value()); - System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap)); - if (!test) { - final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, - null); - manager - .sendMessage( - new Message(workflowId, "Transform", MessageType.REPORT, reportMap), - rabbitReportQueue, - true, - false); - manager.close(); - } - } } private static String extractXSLTFromTR(final String tr) throws DocumentException { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json index 4b4925f27..4a6aec5ee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json @@ -1,16 +1,86 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true}, - {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, - {"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true}, - {"paramName":"x", "paramLongName":"xpath", "paramDescription": "the xpath to identify the record ifentifier", "paramRequired": true}, - {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, - {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, - {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, - {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}, - {"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false} + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "e", + "paramLongName": "encoding", + "paramDescription": "the encoding of the input record should be JSON or XML", + "paramRequired": true + }, + { + "paramName": "d", + "paramLongName": "dateOfCollection", + "paramDescription": "the date when the record has been stored", + "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "provenance", + "paramDescription": "the infos about the provenance of the collected records", + "paramRequired": true + }, + { + "paramName": "x", + "paramLongName": "xpath", + "paramDescription": "the xpath to identify the record identifier", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "input", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "output", + "paramDescription": "the path of the result DataFrame on HDFS", + "paramRequired": true + }, + { + "paramName": "ru", + "paramLongName": "rabbitUser", + "paramDescription": "the user to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rp", + "paramLongName": "rabbitPassword", + "paramDescription": "the password to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rh", + "paramLongName": "rabbitHost", + "paramDescription": "the host of the RabbitMq server", + "paramRequired": true + }, + { + "paramName": "ro", + "paramLongName": "rabbitOngoingQueue", + "paramDescription": "the name of the ongoing queue", + "paramRequired": true + }, + { + "paramName": "rr", + "paramLongName": "rabbitReportQueue", + "paramDescription": "the name of the report queue", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workflowId", + "paramDescription": "the identifier of the dnet Workflow", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "isTest", + "paramDescription": "the name of the report queue", + "paramRequired": false + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json index 3af21f53f..4bb5fd56a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json @@ -1,16 +1,74 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, - {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}, - {"paramName":"tr", "paramLongName":"transformationRule","paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true}, - {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, - {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, - {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, - {"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false} - - + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "d", + "paramLongName": "dateOfCollection", + "paramDescription": "the date when the record has been stored", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "input", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "output", + "paramDescription": "the path of the result DataFrame on HDFS", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workflowId", + "paramDescription": "the identifier of the dnet Workflow", + "paramRequired": true + }, + { + "paramName": "tr", + "paramLongName": "transformationRule", + "paramDescription": "the transformation Rule to apply to the input MDStore", + "paramRequired": true + }, + { + "paramName": "ru", + "paramLongName": "rabbitUser", + "paramDescription": "the user to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rp", + "paramLongName": "rabbitPassword", + "paramDescription": "the password to connect with RabbitMq for messaging", + "paramRequired": true + }, + { + "paramName": "rh", + "paramLongName": "rabbitHost", + "paramDescription": "the host of the RabbitMq server", + "paramRequired": true + }, + { + "paramName": "ro", + "paramLongName": "rabbitOngoingQueue", + "paramDescription": "the name of the ongoing queue", + "paramRequired": true + }, + { + "paramName": "rr", + "paramLongName": "rabbitReportQueue", + "paramDescription": "the name of the report queue", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "isTest", + "paramDescription": "the name of the report queue", + "paramRequired": false + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java index 44364b30a..c3b05f5c9 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java @@ -9,65 +9,60 @@ import java.nio.file.Path; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; +import eu.dnetlib.dhp.schema.common.ModelSupport; public class CollectionJobTest { - private Path testDir; + private static SparkSession spark; - @BeforeEach - public void setup() throws IOException { - testDir = Files.createTempDirectory("dhp-collection"); + @BeforeAll + public static void beforeAll() { + SparkConf conf = new SparkConf(); + conf.setAppName(CollectionJobTest.class.getSimpleName()); + conf.setMaster("local"); + spark = SparkSession.builder().config(conf).getOrCreate(); } - @AfterEach - public void teadDown() throws IOException { - FileUtils.deleteDirectory(testDir.toFile()); + @AfterAll + public static void afterAll() { + spark.stop(); } @Test - public void tesCollection() throws Exception { + public void tesCollection(@TempDir Path testDir) throws Exception { final Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); + Assertions.assertNotNull(new ObjectMapper().writeValueAsString(provenance)); + GenerateNativeStoreSparkJob .main( new String[] { - "-mt", - "local", - "-w", - "wid", - "-e", - "XML", - "-d", - "" + System.currentTimeMillis(), - "-p", - new ObjectMapper().writeValueAsString(provenance), - "-x", - "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", - "-i", - this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(), - "-o", - testDir.toString() + "/store", - "-t", - "true", - "-ru", - "", - "-rp", - "", - "-rh", - "", - "-ro", - "", - "-rr", - "" + "issm", "true", + "-w", "wid", + "-e", "XML", + "-d", "" + System.currentTimeMillis(), + "-p", new ObjectMapper().writeValueAsString(provenance), + "-x", "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", + "-i", this.getClass().getResource("/eu/dnetlib/dhp/collection/native.seq").toString(), + "-o", testDir.toString() + "/store", + "-t", "true", + "-ru", "", + "-rp", "", + "-rh", "", + "-ro", "", + "-rr", "" }); - System.out.println(new ObjectMapper().writeValueAsString(provenance)); + + // TODO introduce useful assertions + } @Test @@ -85,9 +80,8 @@ public class CollectionJobTest { null, null); - assert record != null; - System.out.println(record.getId()); - System.out.println(record.getOriginalId()); + assertNotNull(record.getId()); + assertNotNull(record.getOriginalId()); } @Test @@ -112,10 +106,12 @@ public class CollectionJobTest { System.currentTimeMillis(), null, null); - assert record != null; + record.setBody("ciao"); - assert record1 != null; record1.setBody("mondo"); + + assertNotNull(record); + assertNotNull(record1); assertEquals(record, record1); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 01c9e3103..98c8cf66c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -12,10 +12,14 @@ import java.util.Map; import javax.xml.transform.stream.StreamSource; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -23,6 +27,7 @@ import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import eu.dnetlib.dhp.collection.CollectionJobTest; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.functions.Cleaner; import eu.dnetlib.dhp.transformation.vocabulary.Vocabulary; @@ -33,6 +38,21 @@ import net.sf.saxon.s9api.*; @ExtendWith(MockitoExtension.class) public class TransformationJobTest { + private static SparkSession spark; + + @BeforeAll + public static void beforeAll() { + SparkConf conf = new SparkConf(); + conf.setAppName(CollectionJobTest.class.getSimpleName()); + conf.setMaster("local"); + spark = SparkSession.builder().config(conf).getOrCreate(); + } + + @AfterAll + public static void afterAll() { + spark.stop(); + } + @Mock private LongAccumulator accumulator; @@ -78,31 +98,21 @@ public class TransformationJobTest { TransformSparkJobNode .main( new String[] { - "-mt", - "local", - "-i", - mdstore_input, - "-o", - mdstore_output, - "-d", - "1", - "-w", - "1", - "-tr", - xslt, - "-t", - "true", - "-ru", - "", - "-rp", - "", - "-rh", - "", - "-ro", - "", - "-rr", - "" + "-issm", "true", + "-i", mdstore_input, + "-o", mdstore_output, + "-d", "1", + "-w", "1", + "-tr", xslt, + "-t", "true", + "-ru", "", + "-rp", "", + "-rh", "", + "-ro", "", + "-rr", "" }); + + // TODO introduce useful assertions } @Test