actionsets migration workflow moved in dhp-workflows/dhp-actionmanager

This commit is contained in:
Claudio Atzori 2020-04-20 15:24:33 +02:00
parent d714bfb4d4
commit 9147af7fed
12 changed files with 357 additions and 268 deletions

View File

@ -47,6 +47,16 @@
<artifactId>jaxen</artifactId> <artifactId>jaxen</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-data-protos</artifactId>
</dependency>
<dependency> <dependency>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId> <artifactId>dhp-schemas</artifactId>
@ -57,6 +67,44 @@
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-api</artifactId> <artifactId>dnet-actionmanager-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-actionmanager-common</artifactId>
<exclusions>
<exclusion>
<groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaireplus-mapping-utils</artifactId>
</exclusion>
<exclusion>
<groupId>saxonica</groupId>
<artifactId>saxon</artifactId>
</exclusion>
<exclusion>
<groupId>saxonica</groupId>
<artifactId>saxon-dom</artifactId>
</exclusion>
<exclusion>
<groupId>jgrapht</groupId>
<artifactId>jgrapht</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.*</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>apache</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.migration.actions; package eu.dnetlib.dhp.actionmanager.migration;
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier;
import java.util.Comparator; import java.util.Comparator;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.migration.actions; package eu.dnetlib.dhp.actionmanager.migration;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -9,35 +9,36 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MigrateActionSet { public class MigrateActionSet {
private static final Log log = LogFactory.getLog(MigrateActionSet.class); private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class);
private static final String SEPARATOR = "/"; private static final String SEPARATOR = "/";
private static final String TARGET_PATHS = "target_paths"; private static final String TARGET_PATHS = "target_paths";
private static final String RAWSET_PREFIX = "rawset_"; private static final String RAWSET_PREFIX = "rawset_";
private static Boolean DEFAULT_TRANSFORM_ONLY = false;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = final ArgumentApplicationParser parser =
new ArgumentApplicationParser( new ArgumentApplicationParser(
IOUtils.toString( IOUtils.toString(
MigrateActionSet.class.getResourceAsStream( MigrateActionSet.class.getResourceAsStream(
"/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json"))); "/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
new MigrateActionSet().run(parser); new MigrateActionSet().run(parser);
@ -56,11 +57,11 @@ public class MigrateActionSet {
final String transform_only_s = parser.get("transform_only"); final String transform_only_s = parser.get("transform_only");
log.info("transform only param: " + transform_only_s); log.info("transform only param: {}", transform_only_s);
final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only")); final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only"));
log.info("transform only: " + transformOnly); log.info("transform only: {}", transformOnly);
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
@ -79,22 +80,19 @@ public class MigrateActionSet {
final List<Path> sourcePaths = getSourcePaths(sourceNN, isLookUp); final List<Path> sourcePaths = getSourcePaths(sourceNN, isLookUp);
log.info( log.info(
String.format( "paths to process:\n{}",
"paths to process:\n%s", sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n")));
sourcePaths.stream()
.map(p -> p.toString())
.collect(Collectors.joining("\n"))));
for (Path source : sourcePaths) { for (Path source : sourcePaths) {
if (!sourceFS.exists(source)) { if (!sourceFS.exists(source)) {
log.warn(String.format("skipping unexisting path: %s", source)); log.warn("skipping unexisting path: {}", source);
} else { } else {
LinkedList<String> pathQ = LinkedList<String> pathQ =
Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath())); Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath()));
final String rawSet = pathQ.pollLast(); final String rawSet = pathQ.pollLast();
log.info(String.format("got RAWSET: %s", rawSet)); log.info("got RAWSET: {}", rawSet);
if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) { if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) {
@ -109,7 +107,7 @@ public class MigrateActionSet {
+ SEPARATOR + SEPARATOR
+ rawSet); + rawSet);
log.info(String.format("using TARGET PATH: %s", targetPath)); log.info("using TARGET PATH: {}", targetPath);
if (!transformOnly) { if (!transformOnly) {
if (targetFS.exists(targetPath)) { if (targetFS.exists(targetPath)) {

View File

@ -1,4 +1,9 @@
package eu.dnetlib.dhp.migration.actions; package eu.dnetlib.dhp.actionmanager.migration;
import static eu.dnetlib.data.proto.KindProtos.Kind.entity;
import static eu.dnetlib.data.proto.KindProtos.Kind.relation;
import static eu.dnetlib.data.proto.TypeProtos.*;
import static eu.dnetlib.data.proto.TypeProtos.Type.*;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.googlecode.protobuf.format.JsonFormat; import com.googlecode.protobuf.format.JsonFormat;

View File

@ -0,0 +1,174 @@
package eu.dnetlib.dhp.actionmanager.migration;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
public class TransformActions implements Serializable {
private static final Logger log = LoggerFactory.getLogger(TransformActions.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String SEPARATOR = "/";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
IOUtils.toString(
MigrateActionSet.class.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json")));
parser.parseArgument(args);
Boolean isSparkSessionManaged =
Optional.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String inputPaths = parser.get("inputPaths");
if (StringUtils.isBlank(inputPaths)) {
throw new RuntimeException("empty inputPaths");
}
log.info("inputPaths: {}", inputPaths);
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> transformActions(inputPaths, targetBaseDir, spark));
}
private static void transformActions(
String inputPaths, String targetBaseDir, SparkSession spark) throws IOException {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
LinkedList<String> pathQ =
Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
final String rawset = pathQ.pollLast();
final String actionSetDirectory = pathQ.pollLast();
final Path targetDirectory =
new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset);
if (fs.exists(targetDirectory)) {
log.info("found target directory '{}", targetDirectory);
fs.delete(targetDirectory, true);
log.info("deleted target directory '{}", targetDirectory);
}
log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory);
sc.sequenceFile(sourcePath, Text.class, Text.class)
.map(
a ->
eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(
a._2().toString()))
.map(a -> doTransform(a))
// .filter(Objects::isNull)
// .filter(a -> a.getPayload() == null)
.mapToPair(
a ->
new Tuple2<>(
a.getClazz().toString(),
OBJECT_MAPPER.writeValueAsString(a)))
.mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2())))
.saveAsNewAPIHadoopFile(
targetDirectory.toString(),
Text.class,
Text.class,
SequenceFileOutputFormat.class,
sc.hadoopConfiguration());
}
}
private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
throws InvalidProtocolBufferException {
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
final Oaf oaf = ProtoConverter.convert(proto_oaf);
switch (proto_oaf.getKind()) {
case entity:
switch (proto_oaf.getEntity().getType()) {
case datasource:
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
case organization:
return new AtomicAction<>(Organization.class, (Organization) oaf);
case project:
return new AtomicAction<>(Project.class, (Project) oaf);
case result:
final String resulttypeid =
proto_oaf
.getEntity()
.getResult()
.getMetadata()
.getResulttype()
.getClassid();
switch (resulttypeid) {
case "publication":
return new AtomicAction<>(Publication.class, (Publication) oaf);
case "software":
return new AtomicAction<>(Software.class, (Software) oaf);
case "other":
return new AtomicAction<>(
OtherResearchProduct.class, (OtherResearchProduct) oaf);
case "dataset":
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
default:
// can be an update, where the resulttype is not specified
return new AtomicAction<>(Result.class, (Result) oaf);
}
default:
throw new IllegalArgumentException(
"invalid entity type: " + proto_oaf.getEntity().getType());
}
case relation:
return new AtomicAction<>(Relation.class, (Relation) oaf);
default:
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
}
}
private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
String XQUERY =
"collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
return isLookUp.getResourceProfileByQuery(XQUERY);
}
}

View File

@ -0,0 +1,56 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "is",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "sn",
"paramLongName": "sourceNameNode",
"paramDescription": "nameNode of the source cluster",
"paramRequired": true
},
{
"paramName": "tn",
"paramLongName": "targetNameNode",
"paramDescription": "namoNode of the target cluster",
"paramRequired": true
},
{
"paramName": "w",
"paramLongName": "workingDirectory",
"paramDescription": "working directory",
"paramRequired": true
},
{
"paramName": "nm",
"paramLongName": "distcp_num_maps",
"paramDescription": "maximum number of map tasks used in the distcp process",
"paramRequired": true
},
{
"paramName": "mm",
"paramLongName": "distcp_memory_mb",
"paramDescription": "memory for distcp action copying actionsets from remote cluster",
"paramRequired": true
},
{
"paramName": "tt",
"paramLongName": "distcp_task_timeout",
"paramDescription": "timeout for distcp copying actions from remote cluster",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transform_only",
"paramDescription": "activate tranform-only mode. Only apply transformation step",
"paramRequired": true
}
]

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "is",
"paramLongName": "isLookupUrl",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "inputPaths",
"paramDescription": "URL of the isLookUp Service",
"paramRequired": true
}
]

View File

@ -10,7 +10,6 @@
</property> </property>
<property> <property>
<name>workingDirectory</name> <name>workingDirectory</name>
<value>/tmp/actionsets</value>
<description>working directory</description> <description>working directory</description>
</property> </property>
<property> <property>
@ -44,6 +43,20 @@
<name>sparkExecutorCores</name> <name>sparkExecutorCores</name>
<description>number of cores used by single executor</description> <description>number of cores used by single executor</description>
</property> </property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property> <property>
<name>spark2YarnHistoryServerAddress</name> <name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description> <description>spark 2.* yarn history server address</description>
@ -66,23 +79,27 @@
<name>oozie.launcher.mapred.job.queue.name</name> <name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value> <value>${oozieLauncherQueueName}</value>
</property> </property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration> </configuration>
</global> </global>
<start to='migrate_actionsets' /> <start to="migrate_actionsets"/>
<action name='migrate_actionsets'> <action name="migrate_actionsets">
<java> <java>
<main-class>eu.dnetlib.dhp.migration.actions.MigrateActionSet</main-class> <main-class>eu.dnetlib.dhp.actionmanager.migration.MigrateActionSet</main-class>
<java-opt>-Dmapred.task.timeout=${distcp_task_timeout}</java-opt> <java-opt>-Dmapred.task.timeout=${distcp_task_timeout}</java-opt>
<arg>-is</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>-sn</arg><arg>${sourceNN}</arg> <arg>--sourceNameNode</arg><arg>${sourceNN}</arg>
<arg>-tn</arg><arg>${nameNode}</arg> <arg>--targetNameNode</arg><arg>${nameNode}</arg>
<arg>-w</arg><arg>${workingDirectory}</arg> <arg>--workingDirectory</arg><arg>${workingDirectory}</arg>
<arg>-nm</arg><arg>${distcp_num_maps}</arg> <arg>--distcp_num_maps</arg><arg>${distcp_num_maps}</arg>
<arg>-mm</arg><arg>${distcp_memory_mb}</arg> <arg>--distcp_memory_mb</arg><arg>${distcp_memory_mb}</arg>
<arg>-tt</arg><arg>${distcp_task_timeout}</arg> <arg>--distcp_task_timeout</arg><arg>${distcp_task_timeout}</arg>
<arg>-tr</arg><arg>${transform_only}</arg> <arg>--transform_only</arg><arg>${transform_only}</arg>
<capture-output/> <capture-output/>
</java> </java>
<ok to="transform_actions" /> <ok to="transform_actions" />
@ -94,19 +111,18 @@
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>transform_actions</name> <name>transform_actions</name>
<class>eu.dnetlib.dhp.migration.actions.TransformActions</class> <class>eu.dnetlib.dhp.actionmanager.migration.TransformActions</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-actionmanager-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores ${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
--executor-memory ${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>-is</arg><arg>${isLookupUrl}</arg>
<arg>--inputPaths</arg><arg>${wf:actionData('migrate_actionsets')['target_paths']}</arg> <arg>--inputPaths</arg><arg>${wf:actionData('migrate_actionsets')['target_paths']}</arg>
</spark> </spark>
<ok to="end"/> <ok to="end"/>

View File

@ -1,213 +0,0 @@
package eu.dnetlib.dhp.migration.actions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public class TransformActions implements Serializable {
private static final Log log = LogFactory.getLog(TransformActions.class);
private static final String SEPARATOR = "/";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
IOUtils.toString(
MigrateActionSet.class.getResourceAsStream(
"/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json")));
parser.parseArgument(args);
new TransformActions().run(parser);
}
private void run(ArgumentApplicationParser parser) throws ISLookUpException, IOException {
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: " + isLookupUrl);
final String inputPaths = parser.get("inputPaths");
if (StringUtils.isBlank(inputPaths)) {
throw new RuntimeException("empty inputPaths");
}
log.info("inputPaths: " + inputPaths);
final String targetBaseDir = getTargetBaseDir(isLookupUrl);
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
LinkedList<String> pathQ =
Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath));
final String rawset = pathQ.pollLast();
final String actionSetDirectory = pathQ.pollLast();
final Path targetDirectory =
new Path(
targetBaseDir
+ SEPARATOR
+ actionSetDirectory
+ SEPARATOR
+ rawset);
if (fs.exists(targetDirectory)) {
log.info(String.format("found target directory '%s", targetDirectory));
fs.delete(targetDirectory, true);
log.info(String.format("deleted target directory '%s", targetDirectory));
}
log.info(
String.format(
"transforming actions from '%s' to '%s'",
sourcePath, targetDirectory));
sc.sequenceFile(sourcePath, Text.class, Text.class)
.map(
a ->
eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(
a._2().toString()))
.map(a -> doTransform(a))
.filter(Objects::isNull)
.filter(a -> a.getPayload() == null)
.map(a -> new ObjectMapper().writeValueAsString(a))
.saveAsTextFile(targetDirectory.toString(), GzipCodec.class);
}
}
}
private Text transformAction(eu.dnetlib.actionmanager.actions.AtomicAction aa)
throws InvalidProtocolBufferException, JsonProcessingException {
final Text out = new Text();
final ObjectMapper mapper = new ObjectMapper();
if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
out.set(mapper.writeValueAsString(doTransform(aa)));
}
return out;
}
private AtomicAction<Relation> getRelationAtomicAction(String atomicaActionId) {
final String[] splitId = atomicaActionId.split("@");
String source = splitId[0];
String target = splitId[2];
String[] relSemantic = splitId[1].split("_");
Relation rel = new Relation();
rel.setSource(source);
rel.setTarget(target);
rel.setRelType(relSemantic[0]);
rel.setSubRelType(relSemantic[1]);
rel.setRelClass(relSemantic[2]);
DataInfo d = new DataInfo();
d.setDeletedbyinference(false);
d.setInferenceprovenance("deduplication");
d.setInferred(true);
d.setInvisible(false);
Qualifier provenanceaction = new Qualifier();
provenanceaction.setClassid("deduplication");
provenanceaction.setClassname("deduplication");
provenanceaction.setSchemeid("dnet:provenanceActions");
provenanceaction.setSchemename("dnet:provenanceActions");
d.setProvenanceaction(provenanceaction);
rel.setDataInfo(d);
return new AtomicAction<>(Relation.class, rel);
}
private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa)
throws InvalidProtocolBufferException {
final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
final Oaf oaf = ProtoConverter.convert(proto_oaf);
switch (proto_oaf.getKind()) {
case entity:
switch (proto_oaf.getEntity().getType()) {
case datasource:
return new AtomicAction<>(Datasource.class, (Datasource) oaf);
case organization:
return new AtomicAction<>(Organization.class, (Organization) oaf);
case project:
return new AtomicAction<>(Project.class, (Project) oaf);
case result:
final String resulttypeid =
proto_oaf
.getEntity()
.getResult()
.getMetadata()
.getResulttype()
.getClassid();
switch (resulttypeid) {
case "publication":
return new AtomicAction<>(Publication.class, (Publication) oaf);
case "software":
return new AtomicAction<>(Software.class, (Software) oaf);
case "other":
return new AtomicAction<>(
OtherResearchProduct.class, (OtherResearchProduct) oaf);
case "dataset":
return new AtomicAction<>(Dataset.class, (Dataset) oaf);
default:
// can be an update, where the resulttype is not specified
return new AtomicAction<>(Result.class, (Result) oaf);
}
default:
throw new IllegalArgumentException(
"invalid entity type: " + proto_oaf.getEntity().getType());
}
case relation:
return new AtomicAction<>(Relation.class, (Relation) oaf);
default:
throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
}
}
private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl);
String XQUERY =
"collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()";
return isLookUp.getResourceProfileByQuery(XQUERY);
}
private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
SparkConf conf = new SparkConf();
return SparkSession.builder()
.appName(TransformActions.class.getSimpleName())
.master(parser.get("master"))
.config(conf)
.enableHiveSupport()
.getOrCreate();
}
}

View File

@ -1,10 +0,0 @@
[
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true},
{"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true},
{"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true},
{"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true},
{"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true},
{"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true}
]

View File

@ -1,5 +0,0 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
{"paramName":"i", "paramLongName":"inputPaths", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}
]