From 0634674add8c18d393e65ef68d200ba2be3bd6da Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 2 Feb 2021 12:12:14 +0100 Subject: [PATCH] implemented transformation test --- .../GenerateDataciteDatasetSpark.scala | 2 +- .../transformation/TransformSparkJobNode.java | 15 +- .../transformation/TransformationFactory.java | 4 +- .../oozie_app/config-default.xml | 5 +- .../dhp/transformation/oozie_app/workflow.xml | 53 ++++- .../dhp/aggregation/AggregationJobTest.java | 197 ++++++++++++++++++ .../GenerateNativeStoreSparkJobTest.java | 169 --------------- .../transformation/TransformationJobTest.java | 4 + .../dhp/collection/mdStoreCleanedVersion.json | 9 + 9 files changed, 275 insertions(+), 183 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java delete mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala index 6837e94b2..f04f92c63 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/GenerateDataciteDatasetSpark.scala @@ -27,7 +27,7 @@ object GenerateDataciteDatasetSpark { val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) - + log.info(s"vocabulary size is ${vocabularies.getTerms("dnet:languages").size()}") val spark: SparkSession = SparkSession.builder().config(conf) .appName(GenerateDataciteDatasetSpark.getClass.getSimpleName) .master(master) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index b9df902a1..193da3878 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -24,6 +24,7 @@ import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -60,15 +61,23 @@ public class TransformSparkJobNode { final String isLookupUrl = parser.get("isLookupUrl"); log.info(String.format("isLookupUrl: %s", isLookupUrl)); + final String dateOfTransformation = parser.get("dateOfTransformation"); + log.info(String.format("dateOfTransformation: %s", dateOfTransformation)); + + final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); + final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); + + log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size()); + SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> transformRecords( - parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath(), - cleanedMdStoreVersion.getHdfsPath())); + parser.getObjectMap(), isLookupService, spark, nativeMdStoreVersion.getHdfsPath() + "/store", + cleanedMdStoreVersion.getHdfsPath() + "/store")); } public static void transformRecords(final Map args, final ISLookUpService isLookUpService, @@ -82,7 +91,7 @@ public class TransformSparkJobNode { final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); final MapFunction XSLTTransformationFunction = TransformationFactory - .getTransformationPlugin(args, ct, isLookUpService); + .getTransformationPlugin(args, ct, isLookUpService); mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath + "/store"); log.info("Transformed item " + ct.getProcessedItems().count()); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java index d1f896964..45ba2981f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java @@ -18,7 +18,7 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class TransformationFactory { private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class); - public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value = \"%s\" return $x//CODE/text()"; + public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//RESOURCE_IDENTIFIER/@value = \"%s\" return $x//CODE/*[local-name() =\"stylesheet\"]"; public static MapFunction getTransformationPlugin( final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) @@ -57,7 +57,7 @@ public class TransformationFactory { private static String queryTransformationRuleFromIS(final String transformationRuleId, final ISLookUpService isLookUpService) throws Exception { final String query = String.format(TRULE_XQUERY, transformationRuleId); - log.info("asking query to IS: " + query); + System.out.println("asking query to IS: " + query); List result = isLookUpService.quickSearchProfile(query); if (result == null || result.isEmpty()) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml index e77dd09c9..bdd48b0ab 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml @@ -15,8 +15,5 @@ oozie.action.sharelib.for.spark spark2 - - oozie.launcher.mapreduce.user.classpath.first - true - + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml index aff87dc79..43b270eaf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml @@ -18,12 +18,17 @@ transformationPlugin + XSLT_TRANSFORM The transformation Plugin dateOfTransformation The timestamp of the transformation date + + isLookupUrl + The IS lookUp service endopoint + @@ -35,22 +40,36 @@ + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionREAD_LOCK --mdStoreID${mdStoreInputId} --mdStoreManagerURI${mdStoreManagerURI} + + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionNEW_VERSION --mdStoreID${mdStoreOutputId} --mdStoreManagerURI${mdStoreManagerURI} + @@ -62,7 +81,7 @@ cluster Transform MetadataStore eu.dnetlib.dhp.transformation.TransformSparkJobNode - dhp-aggregations-${projectVersion}.jar + dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -72,11 +91,12 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --mdstoreInputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} - --mdstoreOutputVersion${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdstoreInputVersion${wf:actionData('BeginRead')['mdStoreReadLockVersion']} --dateOfTransformation${dateOfTransformation} --transformationPlugin${transformationPlugin} --transformationRuleId${transformationRuleId} + --isLookupUrl${isLookupUrl} @@ -84,6 +104,13 @@ + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionREAD_UNLOCK --mdStoreManagerURI${mdStoreManagerURI} @@ -96,6 +123,12 @@ + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionCOMMIT --namenode${nameNode} @@ -108,18 +141,30 @@ + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionREAD_UNLOCK --mdStoreManagerURI${mdStoreManagerURI} --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} - + + + + oozie.launcher.mapreduce.user.classpath.first + true + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode --actionROLLBACK --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java new file mode 100644 index 000000000..c9ccbc7ff --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java @@ -0,0 +1,197 @@ + +package eu.dnetlib.dhp.aggregation; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.transformation.TransformSparkJobNode; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class AggregationJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static Encoder encoder; + + private static final String encoding = "XML"; + private static final String dateOfCollection = System.currentTimeMillis() + ""; + private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; + private static String provenance; + + private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + provenance = IOUtils.toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); + workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + + conf.setAppName(AggregationJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + encoder = Encoders.bean(MetadataRecord.class); + spark = SparkSession + .builder() + .appName(AggregationJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + @Order(1) + public void testGenerateNativeStoreSparkJobRefresh() throws Exception { + + MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); + FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); + + IOUtils + .copy( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), + new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); + + GenerateNativeStoreSparkJob + .main( + new String[]{ + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-readMdStoreVersion", "", + "-workflowId", "abc" + }); + + verify(mdStoreV1); + } + + @Test + @Order(2) + public void testGenerateNativeStoreSparkJobIncremental() throws Exception { + + MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); + FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); + + IOUtils + .copy( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), + new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); + + MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); + + GenerateNativeStoreSparkJob + .main( + new String[]{ + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), + "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-workflowId", "abc" + }); + + verify(mdStoreV2); + } + + + //@Test + @Order(3) + public void testTransformSparkJob() throws Exception { + + MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); + MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); + + TransformSparkJobNode.main(new String[]{ + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-dateOfTransformation", dateOfCollection, + "-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), + "-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion), + "-transformationPlugin", "XSLT_TRANSFORM", + "-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp", + "-transformationRuleId", "183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="}); + + } + + protected void verify(MDStoreVersion mdStoreVersion) throws IOException { + Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + long seqFileSize = sc + .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class) + .count(); + + final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); + long mdStoreSize = mdstore.count(); + + long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); + + Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); + Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); + + long uniqueIds = mdstore + .map((MapFunction) MetadataRecord::getId, Encoders.STRING()) + .distinct() + .count(); + + Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); + } + + private MDStoreVersion prepareVersion(String filename) throws IOException { + MDStoreVersion mdstore = OBJECT_MAPPER + .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); + mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); + return mdstore; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java deleted file mode 100644 index 715ad8fa6..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java +++ /dev/null @@ -1,169 +0,0 @@ - -package eu.dnetlib.dhp.collection; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; - -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) -public class GenerateNativeStoreSparkJobTest { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private static SparkSession spark; - - private static Path workingDir; - - private static Encoder encoder; - - private static final String encoding = "XML"; - private static final String dateOfCollection = System.currentTimeMillis() + ""; - private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; - private static String provenance; - - private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class); - - @BeforeAll - public static void beforeAll() throws IOException { - provenance = IOUtils.toString(GenerateNativeStoreSparkJobTest.class.getResourceAsStream("provenance.json")); - workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); - - SparkConf conf = new SparkConf(); - - conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName()); - - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - - encoder = Encoders.bean(MetadataRecord.class); - spark = SparkSession - .builder() - .appName(GenerateNativeStoreSparkJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } - - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } - - @Test - @Order(1) - public void testGenerateNativeStoreSparkJobRefresh() throws Exception { - - MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json"); - FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); - - IOUtils - .copy( - getClass().getResourceAsStream("sequence_file"), - new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); - - GenerateNativeStoreSparkJob - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-encoding", encoding, - "-dateOfCollection", dateOfCollection, - "-provenance", provenance, - "-xpath", xpath, - "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), - "-readMdStoreVersion", "", - "-workflowId", "abc" - }); - - verify(mdStoreV1); - } - - @Test - @Order(2) - public void testGenerateNativeStoreSparkJobIncremental() throws Exception { - - MDStoreVersion mdStoreV2 = prepareVersion("mdStoreVersion_2.json"); - FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); - - IOUtils - .copy( - getClass().getResourceAsStream("sequence_file"), - new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); - - MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json"); - - GenerateNativeStoreSparkJob - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-encoding", encoding, - "-dateOfCollection", dateOfCollection, - "-provenance", provenance, - "-xpath", xpath, - "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), - "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), - "-workflowId", "abc" - }); - - verify(mdStoreV2); - } - - protected void verify(MDStoreVersion mdStoreVersion) throws IOException { - Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); - - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - long seqFileSize = sc - .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class) - .count(); - - final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); - long mdStoreSize = mdstore.count(); - - long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); - - Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); - Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); - - long uniqueIds = mdstore - .map((MapFunction) MetadataRecord::getId, Encoders.STRING()) - .distinct() - .count(); - - Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); - } - - private MDStoreVersion prepareVersion(String filename) throws IOException { - MDStoreVersion mdstore = OBJECT_MAPPER - .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); - mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); - return mdstore; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 6a80e01e2..9e46b5f95 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -38,6 +38,7 @@ import eu.dnetlib.dhp.collection.CollectionJobTest; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -74,6 +75,9 @@ public class TransformationJobTest { spark.stop(); } + + + @Test @DisplayName("Test Transform Single XML using XSLTTransformator") public void testTransformSaxonHE() throws Exception { diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json new file mode 100644 index 000000000..a5adc8fda --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json @@ -0,0 +1,9 @@ +{ + "id":"md-cleaned", + "mdstore":"md-cleaned", + "writing":false, + "readCount":1, + "lastUpdate":1612187563099, + "size":71, + "hdfsPath":"%s/mdstore/md-cleaned" +} \ No newline at end of file