From 027618003951bcdc51cbd60b7d09163696795386 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 29 Jan 2021 16:42:41 +0100 Subject: [PATCH] WIP mdstore transaction implemented on hadoop side --- .../mdstore/manager/common/model/MDStore.java | 12 +- .../common/model/MDStoreCurrentVersion.java | 8 +- .../manager/common/model/MDStoreVersion.java | 12 +- .../manager/common/model/MDStoreWithInfo.java | 13 +- .../collector/worker/model/ApiDescriptor.java | 2 +- .../dhp/common/rest/DNetRestClient.java | 54 ++++++ .../mdstore/MDStoreActionNode.java | 164 ++++++++++++++++++ .../GenerateNativeStoreSparkJob.java | 46 ++++- .../collection/plugin/CollectorPlugin.java | 2 +- .../plugin/oai/OaiCollectorPlugin.java | 2 +- .../collection/worker/CollectorWorker.java | 5 +- .../worker/CollectorWorkerApplication.java | 28 ++- .../datacite/oozie_app/config-default.xml | 5 + .../collection_input_parameters.json | 12 +- .../dhp/collection/collector_parameter.json | 28 ++- .../collection/mdstore_action_parameters.json | 45 +++++ .../dhp/collection/oozie_app/workflow.xml | 114 ++++++++++-- .../DnetCollectorWorkerApplicationTests.java | 9 +- 18 files changed, 495 insertions(+), 66 deletions(-) rename dhp-common/src/main/java/eu/dnetlib/{ => dhp}/collector/worker/model/ApiDescriptor.java (93%) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStore.java b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStore.java index db200cd6a..59fe941ed 100644 --- a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStore.java +++ b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStore.java @@ -157,7 +157,9 @@ public class MDStore implements Serializable { @Override public String toString() { return String - .format("MDStore [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, hdfsPath=%s, creationDate=%s]", id, format, layout, interpretation, datasourceName, datasourceId, apiId, hdfsPath, creationDate); + .format( + "MDStore [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, hdfsPath=%s, creationDate=%s]", + id, format, layout, interpretation, datasourceName, datasourceId, apiId, hdfsPath, creationDate); } @Override @@ -167,8 +169,12 @@ public class MDStore implements Serializable { @Override public boolean equals(final Object obj) { - if (this == obj) { return true; } - if (!(obj instanceof MDStore)) { return false; } + if (this == obj) { + return true; + } + if (!(obj instanceof MDStore)) { + return false; + } final MDStore other = (MDStore) obj; return Objects.equals(id, other.id); } diff --git a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreCurrentVersion.java b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreCurrentVersion.java index e25e7dc2a..d808e2de7 100644 --- a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreCurrentVersion.java +++ b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreCurrentVersion.java @@ -62,8 +62,12 @@ public class MDStoreCurrentVersion implements Serializable { @Override public boolean equals(final Object obj) { - if (this == obj) { return true; } - if (!(obj instanceof MDStoreCurrentVersion)) { return false; } + if (this == obj) { + return true; + } + if (!(obj instanceof MDStoreCurrentVersion)) { + return false; + } final MDStoreCurrentVersion other = (MDStoreCurrentVersion) obj; return Objects.equals(currentVersion, other.currentVersion) && Objects.equals(mdstore, other.mdstore); } diff --git a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreVersion.java b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreVersion.java index 26c34fcad..38f8f275e 100644 --- a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreVersion.java +++ b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreVersion.java @@ -116,7 +116,9 @@ public class MDStoreVersion implements Serializable { @Override public String toString() { return String - .format("MDStoreVersion [id=%s, mdstore=%s, writing=%s, readCount=%s, lastUpdate=%s, size=%s, hdfsPath=%s]", id, mdstore, writing, readCount, lastUpdate, size, hdfsPath); + .format( + "MDStoreVersion [id=%s, mdstore=%s, writing=%s, readCount=%s, lastUpdate=%s, size=%s, hdfsPath=%s]", id, + mdstore, writing, readCount, lastUpdate, size, hdfsPath); } @Override @@ -126,8 +128,12 @@ public class MDStoreVersion implements Serializable { @Override public boolean equals(final Object obj) { - if (this == obj) { return true; } - if (!(obj instanceof MDStoreVersion)) { return false; } + if (this == obj) { + return true; + } + if (!(obj instanceof MDStoreVersion)) { + return false; + } final MDStoreVersion other = (MDStoreVersion) obj; return Objects.equals(id, other.id); } diff --git a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreWithInfo.java b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreWithInfo.java index e34e4c000..510c65092 100644 --- a/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreWithInfo.java +++ b/dhp-common/src/main/java/eu/dnetlib/data/mdstore/manager/common/model/MDStoreWithInfo.java @@ -168,7 +168,10 @@ public class MDStoreWithInfo implements Serializable { @Override public String toString() { return String - .format("MDStoreWithInfo [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, currentVersion=%s, creationDate=%s, lastUpdate=%s, size=%s, numberOfVersions=%s, hdfsPath=%s]", id, format, layout, interpretation, datasourceName, datasourceId, apiId, currentVersion, creationDate, lastUpdate, size, numberOfVersions, hdfsPath); + .format( + "MDStoreWithInfo [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, currentVersion=%s, creationDate=%s, lastUpdate=%s, size=%s, numberOfVersions=%s, hdfsPath=%s]", + id, format, layout, interpretation, datasourceName, datasourceId, apiId, currentVersion, creationDate, + lastUpdate, size, numberOfVersions, hdfsPath); } @Override @@ -178,8 +181,12 @@ public class MDStoreWithInfo implements Serializable { @Override public boolean equals(final Object obj) { - if (this == obj) { return true; } - if (!(obj instanceof MDStoreWithInfo)) { return false; } + if (this == obj) { + return true; + } + if (!(obj instanceof MDStoreWithInfo)) { + return false; + } final MDStoreWithInfo other = (MDStoreWithInfo) obj; return Objects.equals(id, other.id); } diff --git a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java b/dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java similarity index 93% rename from dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java index bfd70e8c6..8ba30faeb 100644 --- a/dhp-common/src/main/java/eu/dnetlib/collector/worker/model/ApiDescriptor.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/collector/worker/model/ApiDescriptor.java @@ -1,5 +1,5 @@ -package eu.dnetlib.collector.worker.model; +package eu.dnetlib.dhp.collector.worker.model; import java.util.HashMap; import java.util.Map; diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java new file mode 100644 index 000000000..014f18606 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/rest/DNetRestClient.java @@ -0,0 +1,54 @@ + +package eu.dnetlib.dhp.common.rest; + +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class DNetRestClient { + + private static ObjectMapper mapper = new ObjectMapper(); + + public static T doGET(final String url, Class clazz) throws Exception { + final HttpGet httpGet = new HttpGet(url); + return doHTTPRequest(httpGet, clazz); + } + + public static String doGET(final String url) throws Exception { + final HttpGet httpGet = new HttpGet(url); + return doHTTPRequest(httpGet); + } + + public static String doPOST(final String url, V objParam) throws Exception { + final HttpPost httpPost = new HttpPost(url); + + if (objParam != null) { + final StringEntity entity = new StringEntity(mapper.writeValueAsString(objParam)); + httpPost.setEntity(entity); + httpPost.setHeader("Accept", "application/json"); + httpPost.setHeader("Content-type", "application/json"); + } + return doHTTPRequest(httpPost); + } + + public static T doPOST(final String url, V objParam, Class clazz) throws Exception { + return mapper.readValue(doPOST(url, objParam), clazz); + } + + private static String doHTTPRequest(final HttpUriRequest r) throws Exception { + CloseableHttpClient client = HttpClients.createDefault(); + CloseableHttpResponse response = client.execute(r); + return IOUtils.toString(response.getEntity().getContent()); + } + + private static T doHTTPRequest(final HttpUriRequest r, Class clazz) throws Exception { + return mapper.readValue(doHTTPRequest(r), clazz); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java new file mode 100644 index 000000000..d4824ed0a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/mdstore/MDStoreActionNode.java @@ -0,0 +1,164 @@ + +package eu.dnetlib.dhp.aggregation.mdstore; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.CollectorWorker; +import eu.dnetlib.dhp.common.rest.DNetRestClient; + +public class MDStoreActionNode { + private static final Logger log = LoggerFactory.getLogger(MDStoreActionNode.class); + + enum MDAction { + NEW_VERSION, ROLLBACK, COMMIT, READ_LOCK, READ_UNLOCK + + } + + private static final ObjectMapper mapper = new ObjectMapper(); + + public static String NEW_VERSION_URI = "%s/mdstore/%s/newVersion"; + + public static final String COMMIT_VERSION_URL = "%s/version/%s/commit/%s"; + public static final String ROLLBACK_VERSION_URL = "%s/version/%s/abort"; + + public static final String READ_LOCK_URL = "%s/mdstores/mdstore/%s/startReading"; + public static final String READ_UNLOCK_URL = "%s/mdstores/version/%s/endReading"; + + private static final String MDSTOREVERSIONPARAM = "mdStoreVersion"; + private static final String MDSTOREREADLOCKPARAM = "mdStoreReadLockVersion"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + CollectorWorker.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/mdstore_action_parameters.json"))); + argumentParser.parseArgument(args); + + final MDAction action = MDAction.valueOf(argumentParser.get("action")); + log.info("Curren action is {}", action); + + final String mdStoreManagerURI = argumentParser.get("mdStoreManagerURI"); + log.info("mdStoreManagerURI is {}", mdStoreManagerURI); + + switch (action) { + case NEW_VERSION: { + final String mdStoreID = argumentParser.get("mdStoreID"); + if (StringUtils.isBlank(mdStoreID)) { + throw new IllegalArgumentException("missing or empty argument mdStoreId"); + } + final MDStoreVersion currentVersion = DNetRestClient + .doGET(String.format(NEW_VERSION_URI, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); + populateOOZIEEnv(MDSTOREVERSIONPARAM, mapper.writeValueAsString(currentVersion)); + break; + } + case COMMIT: { + + final String hdfsuri = argumentParser.get("namenode"); + if (StringUtils.isBlank(hdfsuri)) { + throw new IllegalArgumentException("missing or empty argument namenode"); + } + final String mdStoreVersion_params = argumentParser.get("mdStoreVersion"); + final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + + if (StringUtils.isBlank(mdStoreVersion.getId())) { + throw new IllegalArgumentException( + "invalid MDStoreVersion value current is " + mdStoreVersion_params); + } + + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", hdfsuri); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + System.setProperty("hadoop.home.dir", "/"); + // Get the filesystem - HDFS + FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); + String mdStoreSizeParam = argumentParser.get("mdStoreSize"); + + if (StringUtils.isBlank(mdStoreSizeParam)) { + throw new IllegalArgumentException("missing or empty argument mdStoreSize"); + } + Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size"); + + FSDataInputStream inputStream = fs.open(hdfstoreSizepath); + + final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream)); + + inputStream.close(); + fs.create(hdfstoreSizepath); + + DNetRestClient + .doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize)); + break; + } + case ROLLBACK: { + final String mdStoreVersion_params = argumentParser.get("mdStoreVersion"); + final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + + if (StringUtils.isBlank(mdStoreVersion.getId())) { + throw new IllegalArgumentException( + "invalid MDStoreVersion value current is " + mdStoreVersion_params); + } + DNetRestClient.doGET(String.format(ROLLBACK_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId())); + break; + } + + case READ_LOCK: { + final String mdStoreID = argumentParser.get("mdStoreID"); + if (StringUtils.isBlank(mdStoreID)) { + throw new IllegalArgumentException("missing or empty argument mdStoreId"); + } + final MDStoreVersion currentVersion = DNetRestClient + .doGET(String.format(READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class); + populateOOZIEEnv(MDSTOREREADLOCKPARAM, mapper.writeValueAsString(currentVersion)); + break; + } + case READ_UNLOCK: { + final String mdStoreVersion_params = argumentParser.get("readMDStoreId"); + final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class); + + if (StringUtils.isBlank(mdStoreVersion.getId())) { + throw new IllegalArgumentException( + "invalid MDStoreVersion value current is " + mdStoreVersion_params); + } + DNetRestClient.doGET(String.format(READ_UNLOCK_URL, mdStoreManagerURI, mdStoreVersion.getId())); + break; + } + + default: + throw new IllegalArgumentException("invalid action"); + } + + } + + public static void populateOOZIEEnv(final String paramName, String value) throws Exception { + File file = new File(System.getProperty("oozie.action.output.properties")); + Properties props = new Properties(); + + props.setProperty(paramName, value); + OutputStream os = new FileOutputStream(file); + props.store(os, ""); + os.close(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index c9c29b4ea..b28327a40 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -3,13 +3,17 @@ package eu.dnetlib.dhp.collection; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.ByteArrayInputStream; +import java.io.*; import java.nio.charset.StandardCharsets; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -19,6 +23,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; import org.dom4j.Node; @@ -28,7 +33,11 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; +import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication; +import eu.dnetlib.dhp.common.rest.DNetRestClient; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.message.MessageManager; @@ -36,6 +45,7 @@ import eu.dnetlib.message.MessageManager; public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); + private static final String DATASET_NAME = "/store"; public static void main(String[] args) throws Exception { @@ -50,11 +60,15 @@ public class GenerateNativeStoreSparkJob { final String provenanceArgument = parser.get("provenance"); log.info("Provenance is {}", provenanceArgument); final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class); + final String dateOfCollectionArgs = parser.get("dateOfCollection"); log.info("dateOfCollection is {}", dateOfCollectionArgs); final long dateOfCollection = new Long(dateOfCollectionArgs); - final String sequenceFileInputPath = parser.get("input"); - log.info("sequenceFileInputPath is {}", dateOfCollectionArgs); + + String mdStoreVersion = parser.get("mdStoreVersion"); + log.info("mdStoreVersion is {}", mdStoreVersion); + + final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) @@ -70,7 +84,9 @@ public class GenerateNativeStoreSparkJob { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaPairRDD inputRDD = sc - .sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class); + .sequenceFile( + currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENTIAL_FILE_NAME, + IntWritable.class, Text.class); final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); @@ -89,12 +105,26 @@ public class GenerateNativeStoreSparkJob { .distinct(); final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); - mdStoreRecords.add(mdstore.count()); + Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); - mdstore.write().format("parquet").save(parser.get("output")); + mdstore + .write() + .mode(SaveMode.Overwrite) + .format("parquet") + .save(currentVersion.getHdfsPath() + DATASET_NAME); + mdstore = spark.read().load(currentVersion.getHdfsPath() + DATASET_NAME).as(encoder); + final Long total = mdstore.count(); + + FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + + FSDataOutputStream output = fs.create(new Path(currentVersion.getHdfsPath() + "/size")); + + final BufferedOutputStream os = new BufferedOutputStream(output); + + os.write(total.toString().getBytes(StandardCharsets.UTF_8)); + + os.close(); }); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index 7146e610e..ba9bd662e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -3,8 +3,8 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; -import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public interface CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index c4c52271a..a5e261553 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -13,9 +13,9 @@ import com.google.common.base.Splitter; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public class OaiCollectorPlugin implements CollectorPlugin { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java index 380db641a..3605bdfd6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -14,12 +14,9 @@ import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.collector.worker.model.ApiDescriptor; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; public class CollectorWorker { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java index 5e8d0f9c2..29ae98c5b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -1,15 +1,22 @@ package eu.dnetlib.dhp.collection.worker; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.Properties; + import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.collector.worker.model.ApiDescriptor; +import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; +import eu.dnetlib.dhp.common.rest.DNetRestClient; /** * DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module @@ -24,6 +31,8 @@ public class CollectorWorkerApplication { private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); + public static String SEQUENTIAL_FILE_NAME = "/sequence_file"; + /** * @param args */ @@ -38,18 +47,23 @@ public class CollectorWorkerApplication { argumentParser.parseArgument(args); final String hdfsuri = argumentParser.get("namenode"); - log.info("hdfsURI is {}", hdfsuri); - final String hdfsPath = argumentParser.get("hdfsPath"); - log.info("hdfsPath is {}" + hdfsPath); + final String apiDescriptor = argumentParser.get("apidescriptor"); - log.info("apiDescriptor is {}" + apiDescriptor); + log.info("apiDescriptor is {}", apiDescriptor); + + final String mdStoreVersion = argumentParser.get("mdStoreVersion"); + log.info("mdStoreVersion is {}", mdStoreVersion); final ObjectMapper jsonMapper = new ObjectMapper(); - final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class); + final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class); - final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath); + final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class); + final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, + currentVersion.getHdfsPath() + SEQUENTIAL_FILE_NAME); worker.collect(); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml index 2e0ed9aee..dd3c32c62 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/config-default.xml @@ -15,4 +15,9 @@ oozie.action.sharelib.for.spark spark2 + + + oozie.launcher.mapreduce.user.classpath.first + true + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json index 7f5113930..c1aa03bcd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json @@ -30,15 +30,9 @@ "paramRequired": true }, { - "paramName": "i", - "paramLongName": "input", - "paramDescription": "the path of the sequencial file to read", - "paramRequired": true - }, - { - "paramName": "o", - "paramLongName": "output", - "paramDescription": "the path of the result DataFrame on HDFS", + "paramName": "mv", + "paramLongName": "mdStoreVersion", + "paramDescription": "the Metadata Store Version Info", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json index 901664e0d..60e9762ff 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json @@ -1,6 +1,26 @@ [ - {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true}, - {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true}, - {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false} + { + "paramName": "a", + "paramLongName": "apidescriptor", + "paramDescription": "the JSON encoding of the API Descriptor", + "paramRequired": true + }, + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": true + }, + { + "paramName": "mv", + "paramLongName": "mdStoreVersion", + "paramDescription": "the MDStore Version bean", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workflowId", + "paramDescription": "the identifier of the dnet Workflow", + "paramRequired": false + } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json new file mode 100644 index 000000000..57a218a34 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mdstore_action_parameters.json @@ -0,0 +1,45 @@ +[ + { + "paramName": "a", + "paramLongName": "action", + "paramDescription": "the JSON encoding of the API Descriptor", + "paramRequired": true + }, + { + "paramName": "mu", + "paramLongName": "mdStoreManagerURI", + "paramDescription": "the MDStore Manager URI", + "paramRequired": true + }, + { + "paramName": "mi", + "paramLongName": "mdStoreID", + "paramDescription": "the Metadata Store ID", + "paramRequired": false + }, + { + "paramName": "ms", + "paramLongName": "mdStoreSize", + "paramDescription": "the Metadata Store Size", + "paramRequired": false + }, + { + "paramName": "mv", + "paramLongName": "mdStoreVersion", + "paramDescription": "the Metadata Version Bean", + "paramRequired": false + }, + { + "paramName": "n", + "paramLongName": "namenode", + "paramDescription": "the Name Node URI", + "paramRequired": false + }, + { + "paramName": "rm", + "paramLongName": "readMDStoreId", + "paramDescription": "the ID Locked to Read", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index 38cd83da7..28abe0965 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -1,10 +1,5 @@ - - mdStorePath - the path of the native mdstore - - apiDescription A json encoding of the API Description class @@ -16,7 +11,7 @@ identifierPath - An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier + An xpath to retrieve the metadata identifier for the generation of DNet Identifier @@ -33,26 +28,78 @@ workflowId The identifier of the workflow + + + mdStoreID + The identifier of the mdStore + + + + mdStoreManagerURI + The URI of the MDStore Manager + + + + collectionMode + Should be Refresh or Incremental + + ${jobTracker} ${nameNode} - + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ${wf:conf('collectionMode') eq 'REFRESH'} + ${wf:conf('collectionMode') eq 'INCREMENTAL'} + + + + + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_LOCK + --mdStoreID${mdStoreID} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + + + + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreID} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication - --hdfsPath${workingDir}/sequenceFile_${mdstoreVersion} --apidescriptor${apiDescription} --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} - + @@ -75,13 +122,56 @@ --dateOfCollection${timestamp} --provenance${dataSourceInfo} --xpath${identifierPath} - --input${workingDir}/sequenceFile - --output${mdStorePath} - -w${workflowId} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + + + + + + + + ${wf:conf('collectionMode') eq 'REFRESH'} + ${wf:conf('collectionMode') eq 'INCREMENTAL'} + + + + + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --mdStoreVersion${wf:actionData('CollectionWorker')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java index fc19f2064..9abfbacac 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java @@ -2,25 +2,18 @@ package eu.dnetlib.dhp.collector.worker; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.*; -import java.io.File; import java.nio.file.Path; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.collector.worker.model.ApiDescriptor; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.worker.CollectorWorker; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; -import eu.dnetlib.message.Message; -import eu.dnetlib.message.MessageManager; +import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor; @Disabled public class DnetCollectorWorkerApplicationTests {