diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java index 15e0bb454..7c5ad354d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationConstants.java @@ -1,15 +1,14 @@ + package eu.dnetlib.dhp.aggregation.common; public class AggregationConstants { - public static final String SEQUENCE_FILE_NAME = "/sequence_file"; - public static final String MDSTORE_DATA_PATH = "/store"; - public static final String MDSTORE_SIZE_PATH = "/size"; - - public static final String CONTENT_TOTALITEMS = "TotalItems"; - public static final String CONTENT_INVALIDRECORDS = "InvalidRecords"; - public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems"; - + public static final String SEQUENCE_FILE_NAME = "/sequence_file"; + public static final String MDSTORE_DATA_PATH = "/store"; + public static final String MDSTORE_SIZE_PATH = "/size"; + public static final String CONTENT_TOTALITEMS = "TotalItems"; + public static final String CONTENT_INVALIDRECORDS = "InvalidRecords"; + public static final String CONTENT_TRANSFORMEDRECORDS = "transformedItems"; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java index d657dee02..7332ac071 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java @@ -5,7 +5,6 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -15,6 +14,8 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 13813623c..fdf3965d6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -1,11 +1,16 @@ package eu.dnetlib.dhp.collection; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.model.mdstore.MetadataRecord; -import eu.dnetlib.dhp.model.mdstore.Provenance; +import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; @@ -22,18 +27,15 @@ import org.dom4j.Node; import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.model.mdstore.Provenance; import scala.Tuple2; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Objects; -import java.util.Optional; - -import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index f8ddf47e2..0a01faf1e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -1,19 +1,17 @@ package eu.dnetlib.dhp.transformation; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.aggregation.common.AggregationConstants.*; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.saveDataset; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.writeTotalSizeOnHDFS; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; import java.util.Map; import java.util.Optional; -import eu.dnetlib.dhp.aggregation.common.AggregationConstants; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; - -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -25,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.aggregation.common.AggregationCounter; -import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; @@ -67,7 +64,6 @@ public class TransformSparkJobNode { final String dateOfTransformation = parser.get("dateOfTransformation"); log.info(String.format("dateOfTransformation: %s", dateOfTransformation)); - final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl); final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); @@ -94,15 +90,15 @@ public class TransformSparkJobNode { final Encoder encoder = Encoders.bean(MetadataRecord.class); saveDataset( - spark.read() - .format("parquet") - .load(inputPath) - .as(encoder) - .map( - TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), - encoder), - outputPath + MDSTORE_DATA_PATH); - + spark + .read() + .format("parquet") + .load(inputPath) + .as(encoder) + .map( + TransformationFactory.getTransformationPlugin(args, ct, isLookUpService), + encoder), + outputPath + MDSTORE_DATA_PATH); log.info("Transformed item " + ct.getProcessedItems().count()); log.info("Total item " + ct.getTotalItems().count()); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java index ac65ef6a9..d5ecc9cb0 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/aggregation/AggregationJobTest.java @@ -12,11 +12,6 @@ import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; -import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.transformation.TransformSparkJobNode; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.IntWritable; @@ -35,163 +30,170 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.TransformSparkJobNode; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class AggregationJobTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; + private static Path workingDir; - private static Encoder encoder; + private static Encoder encoder; - private static final String encoding = "XML"; - private static final String dateOfCollection = System.currentTimeMillis() + ""; - private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; - private static String provenance; + private static final String encoding = "XML"; + private static final String dateOfCollection = System.currentTimeMillis() + ""; + private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; + private static String provenance; - private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); + private static final Logger log = LoggerFactory.getLogger(AggregationJobTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - provenance = IOUtils.toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); - workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName()); - log.info("using work dir {}", workingDir); + @BeforeAll + public static void beforeAll() throws IOException { + provenance = IOUtils + .toString(AggregationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json")); + workingDir = Files.createTempDirectory(AggregationJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - conf.setAppName(AggregationJobTest.class.getSimpleName()); + conf.setAppName(AggregationJobTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - encoder = Encoders.bean(MetadataRecord.class); - spark = SparkSession - .builder() - .appName(AggregationJobTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + encoder = Encoders.bean(MetadataRecord.class); + spark = SparkSession + .builder() + .appName(AggregationJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - @Order(1) - public void testGenerateNativeStoreSparkJobRefresh() throws Exception { + @Test + @Order(1) + public void testGenerateNativeStoreSparkJobRefresh() throws Exception { - MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); - FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); + MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); + FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); - IOUtils - .copy( - getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), - new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); + IOUtils + .copy( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), + new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); - GenerateNativeStoreSparkJob - .main( - new String[]{ - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-encoding", encoding, - "-dateOfCollection", dateOfCollection, - "-provenance", provenance, - "-xpath", xpath, - "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), - "-readMdStoreVersion", "", - "-workflowId", "abc" - }); + GenerateNativeStoreSparkJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-readMdStoreVersion", "", + "-workflowId", "abc" + }); - verify(mdStoreV1); - } + verify(mdStoreV1); + } - @Test - @Order(2) - public void testGenerateNativeStoreSparkJobIncremental() throws Exception { + @Test + @Order(2) + public void testGenerateNativeStoreSparkJobIncremental() throws Exception { - MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); - FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); + MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); + FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); - IOUtils - .copy( - getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), - new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); + IOUtils + .copy( + getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), + new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); - MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); + MDStoreVersion mdStoreV1 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json"); - GenerateNativeStoreSparkJob - .main( - new String[]{ - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-encoding", encoding, - "-dateOfCollection", dateOfCollection, - "-provenance", provenance, - "-xpath", xpath, - "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), - "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), - "-workflowId", "abc" - }); + GenerateNativeStoreSparkJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), + "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-workflowId", "abc" + }); - verify(mdStoreV2); - } + verify(mdStoreV2); + } + @Test + @Order(3) + public void testTransformSparkJob() throws Exception { - @Test - @Order(3) - public void testTransformSparkJob() throws Exception { + MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); + MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); - MDStoreVersion mdStoreV2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json"); - MDStoreVersion mdStoreCleanedVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json"); + TransformSparkJobNode.main(new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-dateOfTransformation", dateOfCollection, + "-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), + "-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion), + "-transformationPlugin", "XSLT_TRANSFORM", + "-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp", + "-transformationRuleId", + "183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU=" + }); - TransformSparkJobNode.main(new String[]{ - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-dateOfTransformation", dateOfCollection, - "-mdstoreInputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), - "-mdstoreOutputVersion", OBJECT_MAPPER.writeValueAsString(mdStoreCleanedVersion), - "-transformationPlugin", "XSLT_TRANSFORM", - "-isLookupUrl", "https://dev-openaire.d4science.org/is/services/isLookUp", - "-transformationRuleId", "183dde52-a69b-4db9-a07e-1ef2be105294_VHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZXMvVHJhbnNmb3JtYXRpb25SdWxlRFNSZXNvdXJjZVR5cGU="}); + } - } + protected void verify(MDStoreVersion mdStoreVersion) throws IOException { + Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); - protected void verify(MDStoreVersion mdStoreVersion) throws IOException { - Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + long seqFileSize = sc + .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class) + .count(); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - long seqFileSize = sc - .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class) - .count(); + final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); + long mdStoreSize = mdstore.count(); - final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); - long mdStoreSize = mdstore.count(); + long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); - long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); + Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); + Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); - Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); - Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); + long uniqueIds = mdstore + .map((MapFunction) MetadataRecord::getId, Encoders.STRING()) + .distinct() + .count(); - long uniqueIds = mdstore - .map((MapFunction) MetadataRecord::getId, Encoders.STRING()) - .distinct() - .count(); + Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); + } - Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); - } - - private MDStoreVersion prepareVersion(String filename) throws IOException { - MDStoreVersion mdstore = OBJECT_MAPPER - .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); - mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); - return mdstore; - } + private MDStoreVersion prepareVersion(String filename) throws IOException { + MDStoreVersion mdstore = OBJECT_MAPPER + .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); + mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); + return mdstore; + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 9e46b5f95..d03c3acef 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -75,9 +75,6 @@ public class TransformationJobTest { spark.stop(); } - - - @Test @DisplayName("Test Transform Single XML using XSLTTransformator") public void testTransformSaxonHE() throws Exception {