Compare commits
6 Commits
master
...
native_rec
Author | SHA1 | Date |
---|---|---|
Claudio Atzori | f249f9d00c | |
Claudio Atzori | 998262321c | |
Claudio Atzori | 67a65442ee | |
Claudio Atzori | 548886fe91 | |
Claudio Atzori | fe34928d54 | |
Claudio Atzori | f6dbf60ab8 |
|
@ -6,6 +6,8 @@ import java.util.Arrays;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
|
@ -65,7 +67,18 @@ public class DNetRestClient {
|
|||
.map(h -> h.getName() + ":" + h.getValue())
|
||||
.collect(Collectors.joining(",")));
|
||||
|
||||
return IOUtils.toString(client.execute(r).getEntity().getContent());
|
||||
try (final CloseableHttpResponse response = client.execute(r)) {
|
||||
|
||||
final int statusCode = response.getStatusLine().getStatusCode();
|
||||
if (HttpStatus.SC_OK != statusCode) {
|
||||
final String err = IOUtils.toString(response.getEntity().getContent());
|
||||
throw new IOException(String.format("got http response: %s, response body: %s", statusCode, err));
|
||||
}
|
||||
|
||||
final String res = IOUtils.toString(response.getEntity().getContent());
|
||||
log.info("got response: {}", res);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,173 @@
|
|||
|
||||
package eu.dnetlib.dhp.migration;
|
||||
|
||||
import static eu.dnetlib.dhp.common.Constants.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoder;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.Element;
|
||||
import org.dom4j.Node;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.github.sisyphsu.dateparser.DateParserUtils;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
||||
|
||||
public class MigrateNativeStoreSparkJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MigrateNativeStoreSparkJob.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
MigrateNativeStoreSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/migration/migrate_native_input_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final String encoding = parser.get("encoding");
|
||||
log.info("encoding is {}", encoding);
|
||||
|
||||
String mdStoreVersion = parser.get("mdStoreVersion");
|
||||
log.info("mdStoreVersion is {}", mdStoreVersion);
|
||||
|
||||
final String datasourceId = parser.get("datasourceId");
|
||||
log.info("datasourceId is {}", datasourceId);
|
||||
|
||||
final String datasourceName = parser.get("datasourceName");
|
||||
log.info("datasourceName is {}", datasourceName);
|
||||
|
||||
final String nsPrefix = parser.get("nsPrefix");
|
||||
log.info("nsPrefix is {}", nsPrefix);
|
||||
|
||||
final Provenance provenance = new Provenance(datasourceId, datasourceName, nsPrefix);
|
||||
|
||||
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> migrateNativeMDStore(
|
||||
spark, provenance, encoding, currentVersion));
|
||||
}
|
||||
|
||||
private static void migrateNativeMDStore(SparkSession spark,
|
||||
Provenance provenance,
|
||||
String encoding,
|
||||
MDStoreVersion currentVersion) throws IOException {
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
final LongAccumulator totalItems = sc.sc().longAccumulator(CONTENT_TOTALITEMS);
|
||||
final LongAccumulator invalidRecords = sc.sc().longAccumulator(CONTENT_INVALIDRECORDS);
|
||||
|
||||
final String seqFilePath = currentVersion.getHdfsPath() + SEQUENCE_FILE_NAME;
|
||||
|
||||
final JavaRDD<MetadataRecord> nativeStore = sc
|
||||
.sequenceFile(seqFilePath, IntWritable.class, Text.class)
|
||||
.map(
|
||||
item -> parseRecord(
|
||||
item._2().toString(),
|
||||
encoding,
|
||||
provenance,
|
||||
totalItems,
|
||||
invalidRecords))
|
||||
.filter(Objects::nonNull)
|
||||
.distinct();
|
||||
|
||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||
final Dataset<MetadataRecord> mdstore = spark.createDataset(nativeStore.rdd(), encoder);
|
||||
|
||||
final String targetPath = currentVersion.getHdfsPath() + MDSTORE_DATA_PATH;
|
||||
|
||||
saveDataset(mdstore, targetPath);
|
||||
|
||||
final Long total = spark.read().load(targetPath).count();
|
||||
log.info("migrated {} records for datasource '{}'", total, provenance.getDatasourceName());
|
||||
|
||||
writeHdfsFile(
|
||||
spark.sparkContext().hadoopConfiguration(), total.toString(),
|
||||
currentVersion.getHdfsPath() + MDSTORE_SIZE_PATH);
|
||||
}
|
||||
|
||||
public static MetadataRecord parseRecord(
|
||||
final String input,
|
||||
final String encoding,
|
||||
final Provenance provenance,
|
||||
final LongAccumulator totalItems,
|
||||
final LongAccumulator invalidRecords) {
|
||||
|
||||
if (totalItems != null)
|
||||
totalItems.add(1);
|
||||
try {
|
||||
SAXReader reader = new SAXReader();
|
||||
reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
|
||||
Document document = reader.read(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
|
||||
document.normalize();
|
||||
String id = document.valueOf("//dri:objIdentifier/text()");
|
||||
String dateOfCollection = document.valueOf("//dri:dateOfCollection/text()");
|
||||
final LocalDate date = DateParserUtils
|
||||
.parseDate(dateOfCollection.trim())
|
||||
.toInstant()
|
||||
.atZone(ZoneId.systemDefault())
|
||||
.toLocalDate();
|
||||
|
||||
document
|
||||
.selectSingleNode(
|
||||
"/*[local-name() = 'record']/*[local-name() = 'header']/*[local-name() = 'objIdentifier']")
|
||||
.detach();
|
||||
document
|
||||
.selectSingleNode(
|
||||
"/*[local-name() = 'record']/*[local-name() = 'header']/*[local-name() = 'recordIdentifier']")
|
||||
.detach();
|
||||
document
|
||||
.selectSingleNode(
|
||||
"/*[local-name() = 'record']/*[local-name() = 'header']/*[local-name() = 'dateOfCollection']")
|
||||
.detach();
|
||||
document
|
||||
.selectSingleNode(
|
||||
"/*[local-name() = 'record']/*[local-name() = 'header']/*[local-name() = 'datasourceprefix']")
|
||||
.detach();
|
||||
document.selectSingleNode("/*[local-name() = 'record']/*[local-name() = 'about']").detach();
|
||||
|
||||
return new MetadataRecord(id, encoding, provenance, document.asXML(), date.toEpochDay());
|
||||
} catch (Throwable e) {
|
||||
invalidRecords.add(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
|
||||
package eu.dnetlib.dhp.migration;
|
||||
|
||||
import static eu.dnetlib.dhp.common.Constants.DNET_MESSAGE_MGR_URL;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.MAPPER;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.getHadoopConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.Spliterator;
|
||||
import java.util.Spliterators;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.bson.Document;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.mongodb.client.MongoCollection;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.collection.ApiDescriptor;
|
||||
import eu.dnetlib.dhp.collection.CollectorWorker;
|
||||
import eu.dnetlib.dhp.collection.CollectorWorkerApplication;
|
||||
import eu.dnetlib.dhp.collection.UnknownCollectorPluginException;
|
||||
import eu.dnetlib.dhp.common.MdstoreClient;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.common.collection.HttpClientParams;
|
||||
import eu.dnetlib.dhp.message.MessageSender;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
|
||||
|
||||
public class MongoDbMDStoreCollectionReader {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(MongoDbMDStoreCollectionReader.class);
|
||||
|
||||
private final FileSystem fileSystem;
|
||||
|
||||
public MongoDbMDStoreCollectionReader(FileSystem fileSystem) {
|
||||
this.fileSystem = fileSystem;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
*/
|
||||
public static void main(final String[] args)
|
||||
throws ParseException, IOException, UnknownCollectorPluginException, CollectorException {
|
||||
|
||||
final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
MongoDbMDStoreCollectionReader.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/migration/native_records_migration_input_parameter.json")));
|
||||
argumentParser.parseArgument(args);
|
||||
|
||||
log.info("Java Xmx: {}m", Runtime.getRuntime().maxMemory() / (1024 * 1024));
|
||||
|
||||
final String hdfsuri = argumentParser.get("namenode");
|
||||
log.info("hdfsURI is {}", hdfsuri);
|
||||
|
||||
final String mongoBaseUrl = Optional
|
||||
.ofNullable(argumentParser.get("mongoDbUri"))
|
||||
.orElseThrow(
|
||||
() -> new IOException("missing mongodb baseUrl"));
|
||||
log.info("mongoBaseUrl: {}", mongoBaseUrl);
|
||||
|
||||
final String dbName = Optional
|
||||
.ofNullable(argumentParser.get("mongoDbName"))
|
||||
.orElseThrow(() -> new IOException("missing parameter 'mongoDbName'"));
|
||||
log.info("dbName: {}", dbName);
|
||||
|
||||
final String mdId = Optional
|
||||
.ofNullable(argumentParser.get("sourceMDStoreId"))
|
||||
.orElseThrow(() -> new IOException("missing parameter 'sourceMDStoreId'"));
|
||||
log.info("sourceMDStoreId: {}", mdId);
|
||||
|
||||
final String mdStoreVersion = argumentParser.get("mdStoreVersion");
|
||||
log.info("mdStoreVersion is {}", mdStoreVersion);
|
||||
|
||||
final String dnetMessageManagerURL = argumentParser.get(DNET_MESSAGE_MGR_URL);
|
||||
log.info("dnetMessageManagerURL is {}", dnetMessageManagerURL);
|
||||
|
||||
final String workflowId = argumentParser.get("workflowId");
|
||||
log.info("workflowId is {}", workflowId);
|
||||
|
||||
final FileSystem fileSystem = FileSystem.get(getHadoopConfiguration(hdfsuri));
|
||||
|
||||
new MongoDbMDStoreCollectionReader(fileSystem)
|
||||
.run(mongoBaseUrl, dbName, mdId, mdStoreVersion, dnetMessageManagerURL, workflowId);
|
||||
}
|
||||
|
||||
protected void run(String mongoBaseUrl, String dbName, String mdId, String mdStoreVersion,
|
||||
String dnetMessageManagerURL, String workflowId)
|
||||
throws IOException, UnknownCollectorPluginException, CollectorException {
|
||||
|
||||
final ApiDescriptor api = new ApiDescriptor();
|
||||
api.setProtocol("other");
|
||||
api.setBaseUrl(mongoBaseUrl);
|
||||
api.getParams().put("other_plugin_type", "mdstore_mongodb");
|
||||
api.getParams().put("mongodb_dbname", dbName);
|
||||
api.getParams().put("mdstore_id", mdId);
|
||||
|
||||
final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
|
||||
final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId);
|
||||
|
||||
try (AggregatorReport report = new AggregatorReport(ms)) {
|
||||
|
||||
new CollectorWorker(api, fileSystem, currentVersion, null, report).collect();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
[
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "encoding",
|
||||
"paramDescription": "record encoding, XML or JSON",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mdv",
|
||||
"paramLongName": "mdStoreVersion",
|
||||
"paramDescription": "target mdstore path on HDFS",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "did",
|
||||
"paramLongName": "datasourceId",
|
||||
"paramDescription": "datasource identifier",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dname",
|
||||
"paramLongName": "datasourceName",
|
||||
"paramDescription": "datasource name",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "np",
|
||||
"paramLongName": "nsPrefix",
|
||||
"paramDescription": "datasource namespace prefix",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "issm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "is the Spark Session Managed",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1,44 @@
|
|||
[
|
||||
{
|
||||
"paramName": "n",
|
||||
"paramLongName": "namenode",
|
||||
"paramDescription": "the Name Node URI",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mdbu",
|
||||
"paramLongName": "mongoDbUri",
|
||||
"paramDescription": "mongodb server uri",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mdbn",
|
||||
"paramLongName": "mongoDbName",
|
||||
"paramDescription": "mongodb database name",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "smdid",
|
||||
"paramLongName": "sourceMDStoreId",
|
||||
"paramDescription": "source mdstore identifier",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "mv",
|
||||
"paramLongName": "mdStoreVersion",
|
||||
"paramDescription": "the MDStore Version bean",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "dm",
|
||||
"paramLongName": "dnetMessageManagerURL",
|
||||
"paramDescription": "the End point URL to send Messages",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "w",
|
||||
"paramLongName": "workflowId",
|
||||
"paramDescription": "the identifier of the dnet Workflow",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,22 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,157 @@
|
|||
<workflow-app name="Native records migration" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>datasourceId</name>
|
||||
<description>datasource identifier</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>datasourceName</name>
|
||||
<description>datasource name</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>nsPrefix</name>
|
||||
<description>datasource namespace prefix</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>metadataEncoding</name>
|
||||
<description> The type of the metadata XML/JSON</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workflowId</name>
|
||||
<description>The identifier of the workflow</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetMdStoreID</name>
|
||||
<description>The identifier of the target mdStore</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sourceMDStoreId</name>
|
||||
<description>The identifier of the source mdStore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mdStoreManagerURI</name>
|
||||
<description>The URI of the MDStore Manager</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dnetMessageManagerURL</name>
|
||||
<description>The URI of the Dnet Message Manager</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mongoDbUri</name>
|
||||
<description>MongoDB server URI</description>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>mongoDbName</name>
|
||||
<value>mdstore</value>
|
||||
<description>MDStore database name</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>collection_java_xmx</name>
|
||||
<value>-Xmx200m</value>
|
||||
<description>Used to configure the heap size for the map JVM process. Should be 80% of mapreduce.map.memory.mb.</description>
|
||||
</property>
|
||||
|
||||
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
</global>
|
||||
|
||||
<start to="StartTransaction"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="StartTransaction">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<java-opts>${collection_java_xmx}</java-opts>
|
||||
<arg>--action</arg><arg>NEW_VERSION</arg>
|
||||
<arg>--mdStoreID</arg><arg>${targetMdStoreID}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
<capture-output/>
|
||||
</java>
|
||||
<ok to="CopyNativeRecords"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="CopyNativeRecords">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.migration.MongoDbMDStoreCollectionReader</main-class>
|
||||
<java-opts>${collection_java_xmx}</java-opts>
|
||||
<arg>--mongoDbUri</arg><arg>${mongoDbUri}</arg>
|
||||
<arg>--mongoDbName</arg><arg>${mongoDbName}</arg>
|
||||
<arg>--sourceMDStoreId</arg><arg>${sourceMDStoreId}</arg>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--workflowId</arg><arg>${workflowId}</arg>
|
||||
<arg>--dnetMessageManagerURL</arg><arg>${dnetMessageManagerURL}</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
</java>
|
||||
<ok to="MigrateNativeStoreSparkJob"/>
|
||||
<error to="RollBack"/>
|
||||
</action>
|
||||
|
||||
<action name="MigrateNativeStoreSparkJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Migrate Native MetadataStore</name>
|
||||
<class>eu.dnetlib.dhp.migration.MigrateNativeStoreSparkJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--encoding</arg><arg>${metadataEncoding}</arg>
|
||||
<arg>--datasourceId</arg><arg>${datasourceId}</arg>
|
||||
<arg>--datasourceName</arg><arg>${datasourceName}</arg>
|
||||
<arg>--nsPrefix</arg><arg>${nsPrefix}</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
</spark>
|
||||
<ok to="CommitVersion"/>
|
||||
<error to="RollBack"/>
|
||||
</action>
|
||||
|
||||
<action name="CommitVersion">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<java-opts>${collection_java_xmx}</java-opts>
|
||||
<arg>--action</arg><arg>COMMIT</arg>
|
||||
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
</java>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="RollBack">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
|
||||
<java-opts>${collection_java_xmx}</java-opts>
|
||||
<arg>--action</arg><arg>ROLLBACK</arg>
|
||||
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
|
||||
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
|
||||
</java>
|
||||
<ok to="Kill"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
Loading…
Reference in New Issue