diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java index ce65e710f9..0b59dcce07 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java @@ -26,13 +26,13 @@ public class MetadataRecord implements Serializable { private String body; /** the date when the record has been stored */ - private long dateOfCollection; + private Long dateOfCollection; /** the date when the record has been stored */ - private long dateOfTransformation; + private Long dateOfTransformation; public MetadataRecord() { - this.dateOfCollection = System.currentTimeMillis(); + } public MetadataRecord( @@ -40,7 +40,7 @@ public class MetadataRecord implements Serializable { String encoding, Provenance provenance, String body, - long dateOfCollection) { + Long dateOfCollection) { this.originalId = originalId; this.encoding = encoding; @@ -90,19 +90,19 @@ public class MetadataRecord implements Serializable { this.body = body; } - public long getDateOfCollection() { + public Long getDateOfCollection() { return dateOfCollection; } - public void setDateOfCollection(long dateOfCollection) { + public void setDateOfCollection(Long dateOfCollection) { this.dateOfCollection = dateOfCollection; } - public long getDateOfTransformation() { + public Long getDateOfTransformation() { return dateOfTransformation; } - public void setDateOfTransformation(long dateOfTransformation) { + public void setDateOfTransformation(Long dateOfTransformation) { this.dateOfTransformation = dateOfTransformation; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java index 1f5ed27cb1..eb971c4754 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationUtility.java @@ -8,21 +8,38 @@ import java.nio.charset.StandardCharsets; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; public class AggregationUtility { + private static final Logger log = LoggerFactory.getLogger(AggregationUtility.class); + public static void writeTotalSizeOnHDFS(final SparkSession spark, final Long total, final String path) throws IOException { - FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + log.info("writing size ({}) info file {}", total, path); + try (FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + BufferedOutputStream os = new BufferedOutputStream(fs.create(new Path(path)))) { + os.write(total.toString().getBytes(StandardCharsets.UTF_8)); + os.flush(); + } - FSDataOutputStream output = fs.create(new Path(path)); - - final BufferedOutputStream os = new BufferedOutputStream(output); - - os.write(total.toString().getBytes(StandardCharsets.UTF_8)); - - os.close(); } + + public static void saveDataset(final Dataset mdstore, final String targetPath) { + log.info("saving dataset in: {}", targetPath); + mdstore + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(targetPath); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 553a3dc5fb..bbed36a9c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -1,23 +1,20 @@ package eu.dnetlib.dhp.collection; +import static eu.dnetlib.dhp.aggregation.common.AggregationUtility.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.*; import java.nio.charset.StandardCharsets; -import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; @@ -30,31 +27,155 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.aggregation.common.AggregationUtility; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; +import net.sf.saxon.expr.Component; import scala.Tuple2; public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String DATASET_NAME = "/store"; + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateNativeStoreSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + parser.parseArgument(args); + + final String provenanceArgument = parser.get("provenance"); + log.info("Provenance is {}", provenanceArgument); + final Provenance provenance = MAPPER.readValue(provenanceArgument, Provenance.class); + + final String dateOfCollectionArgs = parser.get("dateOfCollection"); + log.info("dateOfCollection is {}", dateOfCollectionArgs); + final Long dateOfCollection = new Long(dateOfCollectionArgs); + + String mdStoreVersion = parser.get("mdStoreVersion"); + log.info("mdStoreVersion is {}", mdStoreVersion); + + final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); + + String readMdStoreVersionParam = parser.get("readMdStoreVersion"); + log.info("readMdStoreVersion is {}", readMdStoreVersionParam); + + final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null + : MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class); + + final String xpath = parser.get("xpath"); + log.info("xpath is {}", xpath); + + final String encoding = parser.get("encoding"); + log.info("encoding is {}", encoding); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + SparkConf conf = new SparkConf(); + /* + * conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf .registerKryoClasses( new + * Class[] { MetadataRecord.class, Provenance.class }); + */ + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> createNativeMDStore( + spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion)); + } + + private static void createNativeMDStore(SparkSession spark, + Provenance provenance, + Long dateOfCollection, + String xpath, + String encoding, + MDStoreVersion currentVersion, + MDStoreVersion readVersion) throws IOException { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + + final String seqFilePath = currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENCE_FILE_NAME; + final JavaRDD nativeStore = sc + .sequenceFile(seqFilePath, IntWritable.class, Text.class) + .map( + item -> parseRecord( + item._2().toString(), + xpath, + encoding, + provenance, + dateOfCollection, + totalItems, + invalidRecords)) + .filter(Objects::nonNull) + .distinct(); + + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); + + final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME; + + if (readVersion != null) { // INCREMENTAL MODE + log.info("updating {} incrementally with {}", targetPath, readVersion.getHdfsPath()); + Dataset currentMdStoreVersion = spark + .read() + .load(readVersion.getHdfsPath() + DATASET_NAME) + .as(encoder); + TypedColumn aggregator = new MDStoreAggregator().toColumn(); + + final Dataset map = currentMdStoreVersion + .union(mdstore) + .groupByKey( + (MapFunction) MetadataRecord::getId, + Encoders.STRING()) + .agg(aggregator) + .map((MapFunction, MetadataRecord>) Tuple2::_2, encoder); + + map.select("id").takeAsList(100).forEach(s -> log.info(s.toString())); + + saveDataset(map, targetPath); + + } else { + saveDataset(mdstore, targetPath); + } + + final Long total = spark.read().load(targetPath).count(); + log.info("collected {} records for datasource '{}'", total, provenance.getDatasourceName()); + + writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); + } + public static class MDStoreAggregator extends Aggregator { @Override public MetadataRecord zero() { - return new MetadataRecord(); + return null; } @Override public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) { + return getLatestRecord(b, a); + } + @Override + public MetadataRecord merge(MetadataRecord b, MetadataRecord a) { return getLatestRecord(b, a); } @@ -68,136 +189,22 @@ public class GenerateNativeStoreSparkJob { } @Override - public MetadataRecord merge(MetadataRecord b, MetadataRecord a) { - return getLatestRecord(b, a); - } - - @Override - public MetadataRecord finish(MetadataRecord j) { - return j; + public MetadataRecord finish(MetadataRecord r) { + return r; } @Override public Encoder bufferEncoder() { - return Encoders.kryo(MetadataRecord.class); + return Encoders.bean(MetadataRecord.class); } @Override public Encoder outputEncoder() { - return Encoders.kryo(MetadataRecord.class); + return Encoders.bean(MetadataRecord.class); } } - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GenerateNativeStoreSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); - parser.parseArgument(args); - final ObjectMapper jsonMapper = new ObjectMapper(); - final String provenanceArgument = parser.get("provenance"); - log.info("Provenance is {}", provenanceArgument); - final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class); - - final String dateOfCollectionArgs = parser.get("dateOfCollection"); - log.info("dateOfCollection is {}", dateOfCollectionArgs); - final long dateOfCollection = new Long(dateOfCollectionArgs); - - String mdStoreVersion = parser.get("mdStoreVersion"); - log.info("mdStoreVersion is {}", mdStoreVersion); - - final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class); - - String readMdStoreVersionParam = parser.get("readMdStoreVersion"); - log.info("readMdStoreVersion is {}", readMdStoreVersionParam); - - final MDStoreVersion readMdStoreVersion = StringUtils.isBlank(readMdStoreVersionParam) ? null - : jsonMapper.readValue(readMdStoreVersionParam, MDStoreVersion.class); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(Collections.singleton(MetadataRecord.class).toArray(new Class[] {})); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - final JavaPairRDD inputRDD = sc - .sequenceFile( - currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENTIAL_FILE_NAME, - IntWritable.class, Text.class); - - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - - final JavaRDD nativeStore = inputRDD - .map( - item -> parseRecord( - item._2().toString(), - parser.get("xpath"), - parser.get("encoding"), - provenance, - dateOfCollection, - totalItems, - invalidRecords)) - .filter(Objects::nonNull) - .distinct(); - - final Encoder encoder = Encoders.bean(MetadataRecord.class); - - final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); - - final String targetPath = currentVersion.getHdfsPath() + DATASET_NAME; - if (readMdStoreVersion != null) { - // INCREMENTAL MODE - - Dataset currentMdStoreVersion = spark - .read() - .load(readMdStoreVersion.getHdfsPath() + DATASET_NAME) - .as(encoder); - TypedColumn aggregator = new MDStoreAggregator().toColumn(); - - saveDataset( - currentMdStoreVersion - .union(mdstore) - .groupByKey( - (MapFunction) MetadataRecord::getId, - Encoders.STRING()) - .agg(aggregator) - .map((MapFunction, MetadataRecord>) Tuple2::_2, encoder), - targetPath); - - } else { - saveDataset(mdstore, targetPath); - } - - final Long total = spark.read().load(targetPath).count(); - - AggregationUtility.writeTotalSizeOnHDFS(spark, total, currentVersion.getHdfsPath() + "/size"); - }); - } - - private static void saveDataset(final Dataset currentMdStore, final String targetPath) { - currentMdStore - .write() - .mode(SaveMode.Overwrite) - .format("parquet") - .save(targetPath); - - } - public static MetadataRecord parseRecord( final String input, final String xpath, @@ -219,7 +226,7 @@ public class GenerateNativeStoreSparkJob { invalidRecords.add(1); return null; } - return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection); + return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection); } catch (Throwable e) { invalidRecords.add(1); return null; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index 29ae98c5bd..e24b9ad1da 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -1,11 +1,6 @@ package eu.dnetlib.dhp.collection.worker; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStream; -import java.util.Properties; - import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +11,6 @@ import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; -import eu.dnetlib.dhp.common.rest.DNetRestClient; /** * DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module @@ -31,7 +25,7 @@ public class CollectorWorkerApplication { private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); - public static String SEQUENTIAL_FILE_NAME = "/sequence_file"; + public static String SEQUENCE_FILE_NAME = "/sequence_file"; /** * @param args @@ -61,7 +55,7 @@ public class CollectorWorkerApplication { final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class); final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, - currentVersion.getHdfsPath() + SEQUENTIAL_FILE_NAME); + currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME); worker.collect(); } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java new file mode 100644 index 0000000000..715ad8fa6f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.java @@ -0,0 +1,169 @@ + +package eu.dnetlib.dhp.collection; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class GenerateNativeStoreSparkJobTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + private static Encoder encoder; + + private static final String encoding = "XML"; + private static final String dateOfCollection = System.currentTimeMillis() + ""; + private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']"; + private static String provenance; + + private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + provenance = IOUtils.toString(GenerateNativeStoreSparkJobTest.class.getResourceAsStream("provenance.json")); + workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + + conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + encoder = Encoders.bean(MetadataRecord.class); + spark = SparkSession + .builder() + .appName(GenerateNativeStoreSparkJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + @Order(1) + public void testGenerateNativeStoreSparkJobRefresh() throws Exception { + + MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json"); + FileUtils.forceMkdir(new File(mdStoreV1.getHdfsPath())); + + IOUtils + .copy( + getClass().getResourceAsStream("sequence_file"), + new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file")); + + GenerateNativeStoreSparkJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-readMdStoreVersion", "", + "-workflowId", "abc" + }); + + verify(mdStoreV1); + } + + @Test + @Order(2) + public void testGenerateNativeStoreSparkJobIncremental() throws Exception { + + MDStoreVersion mdStoreV2 = prepareVersion("mdStoreVersion_2.json"); + FileUtils.forceMkdir(new File(mdStoreV2.getHdfsPath())); + + IOUtils + .copy( + getClass().getResourceAsStream("sequence_file"), + new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file")); + + MDStoreVersion mdStoreV1 = prepareVersion("mdStoreVersion_1.json"); + + GenerateNativeStoreSparkJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-encoding", encoding, + "-dateOfCollection", dateOfCollection, + "-provenance", provenance, + "-xpath", xpath, + "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV2), + "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(mdStoreV1), + "-workflowId", "abc" + }); + + verify(mdStoreV2); + } + + protected void verify(MDStoreVersion mdStoreVersion) throws IOException { + Assertions.assertTrue(new File(mdStoreVersion.getHdfsPath()).exists()); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + long seqFileSize = sc + .sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class) + .count(); + + final Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder); + long mdStoreSize = mdstore.count(); + + long declaredSize = Long.parseLong(IOUtils.toString(new FileReader(mdStoreVersion.getHdfsPath() + "/size"))); + + Assertions.assertEquals(seqFileSize, declaredSize, "the size must be equal"); + Assertions.assertEquals(seqFileSize, mdStoreSize, "the size must be equal"); + + long uniqueIds = mdstore + .map((MapFunction) MetadataRecord::getId, Encoders.STRING()) + .distinct() + .count(); + + Assertions.assertEquals(seqFileSize, uniqueIds, "the size must be equal"); + } + + private MDStoreVersion prepareVersion(String filename) throws IOException { + MDStoreVersion mdstore = OBJECT_MAPPER + .readValue(IOUtils.toString(getClass().getResource(filename)), MDStoreVersion.class); + mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString())); + return mdstore; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/input.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/input.json deleted file mode 100644 index 4ffc33d247..0000000000 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/input.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "id": "md-7557225f-77cc-407d-bdf4-d2fe03131464-1611935085410", - "mdstore": "md-7557225f-77cc-407d-bdf4-d2fe03131464", - "writing": true, - "readCount": 0, - "lastUpdate": null, - "size": 0, - "hdfsPath": "/data/dnet.dev/mdstore/md-7557225f-77cc-407d-bdf4-d2fe03131464/md-7557225f-77cc-407d-bdf4-d2fe03131464-1611935085410" -} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json new file mode 100644 index 0000000000..8945c3d881 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_1.json @@ -0,0 +1,9 @@ +{ + "id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187678801", + "mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42", + "writing":true, + "readCount":0, + "lastUpdate":null, + "size":0, + "hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v1" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json new file mode 100644 index 0000000000..c3d4617cb8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/mdStoreVersion_2.json @@ -0,0 +1,9 @@ +{ + "id":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42-1612187459108", + "mdstore":"md-84e86d00-5771-4ed9-b17f-177ef4b46e42", + "writing":false, + "readCount":1, + "lastUpdate":1612187563099, + "size":71, + "hdfsPath":"%s/mdstore/md-84e86d00-5771-4ed9-b17f-177ef4b46e42/v2" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json new file mode 100644 index 0000000000..2cf0dab706 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/provenance.json @@ -0,0 +1,5 @@ +{ + "datasourceId":"74912366-d6df-49c1-a1fd-8a52fa98ce5f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU\u003d", + "datasourceName":"PSNC Institutional Repository", + "nsPrefix":"psnc______pl" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file new file mode 100644 index 0000000000..309645a5f2 Binary files /dev/null and b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/collection/sequence_file differ