WIP mdstore

transaction implemented on hadoop side
pull/94/head
Sandro La Bruzzo 3 years ago
parent d942d0c77d
commit 0276180039

@ -157,7 +157,9 @@ public class MDStore implements Serializable {
@Override
public String toString() {
return String
.format("MDStore [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, hdfsPath=%s, creationDate=%s]", id, format, layout, interpretation, datasourceName, datasourceId, apiId, hdfsPath, creationDate);
.format(
"MDStore [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, hdfsPath=%s, creationDate=%s]",
id, format, layout, interpretation, datasourceName, datasourceId, apiId, hdfsPath, creationDate);
}
@Override
@ -167,8 +169,12 @@ public class MDStore implements Serializable {
@Override
public boolean equals(final Object obj) {
if (this == obj) { return true; }
if (!(obj instanceof MDStore)) { return false; }
if (this == obj) {
return true;
}
if (!(obj instanceof MDStore)) {
return false;
}
final MDStore other = (MDStore) obj;
return Objects.equals(id, other.id);
}

@ -62,8 +62,12 @@ public class MDStoreCurrentVersion implements Serializable {
@Override
public boolean equals(final Object obj) {
if (this == obj) { return true; }
if (!(obj instanceof MDStoreCurrentVersion)) { return false; }
if (this == obj) {
return true;
}
if (!(obj instanceof MDStoreCurrentVersion)) {
return false;
}
final MDStoreCurrentVersion other = (MDStoreCurrentVersion) obj;
return Objects.equals(currentVersion, other.currentVersion) && Objects.equals(mdstore, other.mdstore);
}

@ -116,7 +116,9 @@ public class MDStoreVersion implements Serializable {
@Override
public String toString() {
return String
.format("MDStoreVersion [id=%s, mdstore=%s, writing=%s, readCount=%s, lastUpdate=%s, size=%s, hdfsPath=%s]", id, mdstore, writing, readCount, lastUpdate, size, hdfsPath);
.format(
"MDStoreVersion [id=%s, mdstore=%s, writing=%s, readCount=%s, lastUpdate=%s, size=%s, hdfsPath=%s]", id,
mdstore, writing, readCount, lastUpdate, size, hdfsPath);
}
@Override
@ -126,8 +128,12 @@ public class MDStoreVersion implements Serializable {
@Override
public boolean equals(final Object obj) {
if (this == obj) { return true; }
if (!(obj instanceof MDStoreVersion)) { return false; }
if (this == obj) {
return true;
}
if (!(obj instanceof MDStoreVersion)) {
return false;
}
final MDStoreVersion other = (MDStoreVersion) obj;
return Objects.equals(id, other.id);
}

@ -168,7 +168,10 @@ public class MDStoreWithInfo implements Serializable {
@Override
public String toString() {
return String
.format("MDStoreWithInfo [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, currentVersion=%s, creationDate=%s, lastUpdate=%s, size=%s, numberOfVersions=%s, hdfsPath=%s]", id, format, layout, interpretation, datasourceName, datasourceId, apiId, currentVersion, creationDate, lastUpdate, size, numberOfVersions, hdfsPath);
.format(
"MDStoreWithInfo [id=%s, format=%s, layout=%s, interpretation=%s, datasourceName=%s, datasourceId=%s, apiId=%s, currentVersion=%s, creationDate=%s, lastUpdate=%s, size=%s, numberOfVersions=%s, hdfsPath=%s]",
id, format, layout, interpretation, datasourceName, datasourceId, apiId, currentVersion, creationDate,
lastUpdate, size, numberOfVersions, hdfsPath);
}
@Override
@ -178,8 +181,12 @@ public class MDStoreWithInfo implements Serializable {
@Override
public boolean equals(final Object obj) {
if (this == obj) { return true; }
if (!(obj instanceof MDStoreWithInfo)) { return false; }
if (this == obj) {
return true;
}
if (!(obj instanceof MDStoreWithInfo)) {
return false;
}
final MDStoreWithInfo other = (MDStoreWithInfo) obj;
return Objects.equals(id, other.id);
}

@ -1,5 +1,5 @@
package eu.dnetlib.collector.worker.model;
package eu.dnetlib.dhp.collector.worker.model;
import java.util.HashMap;
import java.util.Map;

@ -0,0 +1,54 @@
package eu.dnetlib.dhp.common.rest;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import com.fasterxml.jackson.databind.ObjectMapper;
public class DNetRestClient {
private static ObjectMapper mapper = new ObjectMapper();
public static <T> T doGET(final String url, Class<T> clazz) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet, clazz);
}
public static String doGET(final String url) throws Exception {
final HttpGet httpGet = new HttpGet(url);
return doHTTPRequest(httpGet);
}
public static <V> String doPOST(final String url, V objParam) throws Exception {
final HttpPost httpPost = new HttpPost(url);
if (objParam != null) {
final StringEntity entity = new StringEntity(mapper.writeValueAsString(objParam));
httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
}
return doHTTPRequest(httpPost);
}
public static <T, V> T doPOST(final String url, V objParam, Class<T> clazz) throws Exception {
return mapper.readValue(doPOST(url, objParam), clazz);
}
private static String doHTTPRequest(final HttpUriRequest r) throws Exception {
CloseableHttpClient client = HttpClients.createDefault();
CloseableHttpResponse response = client.execute(r);
return IOUtils.toString(response.getEntity().getContent());
}
private static <T> T doHTTPRequest(final HttpUriRequest r, Class<T> clazz) throws Exception {
return mapper.readValue(doHTTPRequest(r), clazz);
}
}

@ -0,0 +1,164 @@
package eu.dnetlib.dhp.aggregation.mdstore;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
public class MDStoreActionNode {
private static final Logger log = LoggerFactory.getLogger(MDStoreActionNode.class);
enum MDAction {
NEW_VERSION, ROLLBACK, COMMIT, READ_LOCK, READ_UNLOCK
}
private static final ObjectMapper mapper = new ObjectMapper();
public static String NEW_VERSION_URI = "%s/mdstore/%s/newVersion";
public static final String COMMIT_VERSION_URL = "%s/version/%s/commit/%s";
public static final String ROLLBACK_VERSION_URL = "%s/version/%s/abort";
public static final String READ_LOCK_URL = "%s/mdstores/mdstore/%s/startReading";
public static final String READ_UNLOCK_URL = "%s/mdstores/version/%s/endReading";
private static final String MDSTOREVERSIONPARAM = "mdStoreVersion";
private static final String MDSTOREREADLOCKPARAM = "mdStoreReadLockVersion";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
IOUtils
.toString(
CollectorWorker.class
.getResourceAsStream(
"/eu/dnetlib/dhp/collection/mdstore_action_parameters.json")));
argumentParser.parseArgument(args);
final MDAction action = MDAction.valueOf(argumentParser.get("action"));
log.info("Curren action is {}", action);
final String mdStoreManagerURI = argumentParser.get("mdStoreManagerURI");
log.info("mdStoreManagerURI is {}", mdStoreManagerURI);
switch (action) {
case NEW_VERSION: {
final String mdStoreID = argumentParser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreId");
}
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(NEW_VERSION_URI, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
populateOOZIEEnv(MDSTOREVERSIONPARAM, mapper.writeValueAsString(currentVersion));
break;
}
case COMMIT: {
final String hdfsuri = argumentParser.get("namenode");
if (StringUtils.isBlank(hdfsuri)) {
throw new IllegalArgumentException("missing or empty argument namenode");
}
final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
Configuration conf = new Configuration();
// Set FileSystem URI
conf.set("fs.defaultFS", hdfsuri);
// Because of Maven
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("hadoop.home.dir", "/");
// Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
String mdStoreSizeParam = argumentParser.get("mdStoreSize");
if (StringUtils.isBlank(mdStoreSizeParam)) {
throw new IllegalArgumentException("missing or empty argument mdStoreSize");
}
Path hdfstoreSizepath = new Path(mdStoreVersion.getHdfsPath() + "/size");
FSDataInputStream inputStream = fs.open(hdfstoreSizepath);
final Long mdStoreSize = Long.parseLong(IOUtils.toString(inputStream));
inputStream.close();
fs.create(hdfstoreSizepath);
DNetRestClient
.doGET(String.format(COMMIT_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId(), mdStoreSize));
break;
}
case ROLLBACK: {
final String mdStoreVersion_params = argumentParser.get("mdStoreVersion");
final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
DNetRestClient.doGET(String.format(ROLLBACK_VERSION_URL, mdStoreManagerURI, mdStoreVersion.getId()));
break;
}
case READ_LOCK: {
final String mdStoreID = argumentParser.get("mdStoreID");
if (StringUtils.isBlank(mdStoreID)) {
throw new IllegalArgumentException("missing or empty argument mdStoreId");
}
final MDStoreVersion currentVersion = DNetRestClient
.doGET(String.format(READ_LOCK_URL, mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
populateOOZIEEnv(MDSTOREREADLOCKPARAM, mapper.writeValueAsString(currentVersion));
break;
}
case READ_UNLOCK: {
final String mdStoreVersion_params = argumentParser.get("readMDStoreId");
final MDStoreVersion mdStoreVersion = mapper.readValue(mdStoreVersion_params, MDStoreVersion.class);
if (StringUtils.isBlank(mdStoreVersion.getId())) {
throw new IllegalArgumentException(
"invalid MDStoreVersion value current is " + mdStoreVersion_params);
}
DNetRestClient.doGET(String.format(READ_UNLOCK_URL, mdStoreManagerURI, mdStoreVersion.getId()));
break;
}
default:
throw new IllegalArgumentException("invalid action");
}
}
public static void populateOOZIEEnv(final String paramName, String value) throws Exception {
File file = new File(System.getProperty("oozie.action.output.properties"));
Properties props = new Properties();
props.setProperty(paramName, value);
OutputStream os = new FileOutputStream(file);
props.store(os, "");
os.close();
}
}

@ -3,13 +3,17 @@ package eu.dnetlib.dhp.collection;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.ByteArrayInputStream;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
@ -19,6 +23,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
@ -28,7 +33,11 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.message.MessageManager;
@ -36,6 +45,7 @@ import eu.dnetlib.message.MessageManager;
public class GenerateNativeStoreSparkJob {
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
private static final String DATASET_NAME = "/store";
public static void main(String[] args) throws Exception {
@ -50,11 +60,15 @@ public class GenerateNativeStoreSparkJob {
final String provenanceArgument = parser.get("provenance");
log.info("Provenance is {}", provenanceArgument);
final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class);
final String dateOfCollectionArgs = parser.get("dateOfCollection");
log.info("dateOfCollection is {}", dateOfCollectionArgs);
final long dateOfCollection = new Long(dateOfCollectionArgs);
final String sequenceFileInputPath = parser.get("input");
log.info("sequenceFileInputPath is {}", dateOfCollectionArgs);
String mdStoreVersion = parser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion);
final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
@ -70,7 +84,9 @@ public class GenerateNativeStoreSparkJob {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final JavaPairRDD<IntWritable, Text> inputRDD = sc
.sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class);
.sequenceFile(
currentVersion.getHdfsPath() + CollectorWorkerApplication.SEQUENTIAL_FILE_NAME,
IntWritable.class, Text.class);
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
@ -89,12 +105,26 @@ public class GenerateNativeStoreSparkJob {
.distinct();
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
mdstore
.write()
.mode(SaveMode.Overwrite)
.format("parquet")
.save(currentVersion.getHdfsPath() + DATASET_NAME);
mdstore = spark.read().load(currentVersion.getHdfsPath() + DATASET_NAME).as(encoder);
final Long total = mdstore.count();
FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
FSDataOutputStream output = fs.create(new Path(currentVersion.getHdfsPath() + "/size"));
final BufferedOutputStream os = new BufferedOutputStream(output);
mdstore.write().format("parquet").save(parser.get("output"));
os.write(total.toString().getBytes(StandardCharsets.UTF_8));
os.close();
});
}

@ -3,8 +3,8 @@ package eu.dnetlib.dhp.collection.plugin;
import java.util.stream.Stream;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public interface CollectorPlugin {

@ -13,9 +13,9 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.CollectorException;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class OaiCollectorPlugin implements CollectorPlugin {

@ -14,12 +14,9 @@ import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
public class CollectorWorker {

@ -1,15 +1,22 @@
package eu.dnetlib.dhp.collection.worker;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
/**
* DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module
@ -24,6 +31,8 @@ public class CollectorWorkerApplication {
private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
public static String SEQUENTIAL_FILE_NAME = "/sequence_file";
/**
* @param args
*/
@ -38,18 +47,23 @@ public class CollectorWorkerApplication {
argumentParser.parseArgument(args);
final String hdfsuri = argumentParser.get("namenode");
log.info("hdfsURI is {}", hdfsuri);
final String hdfsPath = argumentParser.get("hdfsPath");
log.info("hdfsPath is {}" + hdfsPath);
final String apiDescriptor = argumentParser.get("apidescriptor");
log.info("apiDescriptor is {}" + apiDescriptor);
log.info("apiDescriptor is {}", apiDescriptor);
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
log.info("mdStoreVersion is {}", mdStoreVersion);
final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
final MDStoreVersion currentVersion = jsonMapper.readValue(mdStoreVersion, MDStoreVersion.class);
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath);
final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class);
final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri,
currentVersion.getHdfsPath() + SEQUENTIAL_FILE_NAME);
worker.collect();
}
}

@ -15,4 +15,9 @@
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

@ -30,15 +30,9 @@
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "input",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "output",
"paramDescription": "the path of the result DataFrame on HDFS",
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the Metadata Store Version Info",
"paramRequired": true
},
{

@ -1,6 +1,26 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false}
{
"paramName": "a",
"paramLongName": "apidescriptor",
"paramDescription": "the JSON encoding of the API Descriptor",
"paramRequired": true
},
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": true
},
{
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the MDStore Version bean",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workflowId",
"paramDescription": "the identifier of the dnet Workflow",
"paramRequired": false
}
]

@ -0,0 +1,45 @@
[
{
"paramName": "a",
"paramLongName": "action",
"paramDescription": "the JSON encoding of the API Descriptor",
"paramRequired": true
},
{
"paramName": "mu",
"paramLongName": "mdStoreManagerURI",
"paramDescription": "the MDStore Manager URI",
"paramRequired": true
},
{
"paramName": "mi",
"paramLongName": "mdStoreID",
"paramDescription": "the Metadata Store ID",
"paramRequired": false
},
{
"paramName": "ms",
"paramLongName": "mdStoreSize",
"paramDescription": "the Metadata Store Size",
"paramRequired": false
},
{
"paramName": "mv",
"paramLongName": "mdStoreVersion",
"paramDescription": "the Metadata Version Bean",
"paramRequired": false
},
{
"paramName": "n",
"paramLongName": "namenode",
"paramDescription": "the Name Node URI",
"paramRequired": false
},
{
"paramName": "rm",
"paramLongName": "readMDStoreId",
"paramDescription": "the ID Locked to Read",
"paramRequired": false
}
]

@ -1,10 +1,5 @@
<workflow-app name="CollectionWorkflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
</property>
<property>
<name>apiDescription</name>
<description>A json encoding of the API Description class</description>
@ -16,7 +11,7 @@
</property>
<property>
<name>identifierPath</name>
<description>An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier </description>
<description>An xpath to retrieve the metadata identifier for the generation of DNet Identifier </description>
</property>
<property>
@ -33,26 +28,78 @@
<name>workflowId</name>
<description>The identifier of the workflow</description>
</property>
<property>
<name>mdStoreID</name>
<description>The identifier of the mdStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>The URI of the MDStore Manager</description>
</property>
<property>
<name>collectionMode</name>
<description>Should be Refresh or Incremental</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>
<start to="CollectionWorker"/>
<start to="StartTransaction"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="collection_mode">
<switch>
<case to="StartTransaction">${wf:conf('collectionMode') eq 'REFRESH'}</case>
<case to="BeginRead">${wf:conf('collectionMode') eq 'INCREMENTAL'}</case>
<default to="ImportDatacite"/>
</switch>
</decision>
<action name="BeginRead">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_LOCK</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="StartTransaction"/>
<error to="Kill"/>
</action>
<action name="StartTransaction">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreID}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<java>
<main-class>eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/sequenceFile_${mdstoreVersion}</arg>
<arg>--apidescriptor</arg><arg>${apiDescription}</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
</java>
<ok to="GenerateNativeStoreSparkJob"/>
<error to="Kill"/>
<error to="RollBack"/>
</action>
<action name="GenerateNativeStoreSparkJob">
@ -75,13 +122,56 @@
<arg>--dateOfCollection</arg><arg>${timestamp}</arg>
<arg>--provenance</arg><arg>${dataSourceInfo}</arg>
<arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>--input</arg><arg>${workingDir}/sequenceFile</arg>
<arg>--output</arg><arg>${mdStorePath}</arg>
<arg>-w</arg><arg>${workflowId}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
</spark>
<ok to="collection_mode_end"/>
<error to="RollBack"/>
</action>
<decision name="collection_mode_end">
<switch>
<case to="CommitVersion">${wf:conf('collectionMode') eq 'REFRESH'}</case>
<case to="EndRead">${wf:conf('collectionMode') eq 'INCREMENTAL'}</case>
<default to="ImportDatacite"/>
</switch>
</decision>
<action name="EndRead">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="CommitVersion"/>
<error to="Kill"/>
</action>
<action name="CommitVersion">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('CollectionWorker')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -2,25 +2,18 @@
package eu.dnetlib.dhp.collector.worker;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.Mockito.*;
import java.io.File;
import java.nio.file.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.worker.CollectorWorker;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
@Disabled
public class DnetCollectorWorkerApplicationTests {

Loading…
Cancel
Save