diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml index aa2078be5..22ca7504d 100644 --- a/dhp-workflows/dhp-actionmanager/pom.xml +++ b/dhp-workflows/dhp-actionmanager/pom.xml @@ -47,6 +47,16 @@ jaxen + + org.apache.hadoop + hadoop-distcp + + + + eu.dnetlib + dnet-openaire-data-protos + + eu.dnetlib.dhp dhp-schemas @@ -57,6 +67,44 @@ eu.dnetlib dnet-actionmanager-api + + eu.dnetlib + dnet-actionmanager-common + + + eu.dnetlib + dnet-openaireplus-mapping-utils + + + saxonica + saxon + + + saxonica + saxon-dom + + + jgrapht + jgrapht + + + net.sf.ehcache + ehcache + + + org.springframework + spring-test + + + org.apache.* + * + + + apache + * + + + diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java similarity index 96% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java rename to dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java index 126a97e39..8998a090d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/LicenseComparator.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/LicenseComparator.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration.actions; +package eu.dnetlib.dhp.actionmanager.migration; import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; import java.util.Comparator; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java similarity index 88% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java rename to dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java index 680f7759c..db1bed48a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/MigrateActionSet.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/MigrateActionSet.java @@ -1,4 +1,4 @@ -package eu.dnetlib.dhp.migration.actions; +package eu.dnetlib.dhp.actionmanager.migration; import com.google.common.base.Splitter; import com.google.common.collect.Lists; @@ -9,35 +9,36 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; -import java.util.*; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MigrateActionSet { - private static final Log log = LogFactory.getLog(MigrateActionSet.class); + private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class); private static final String SEPARATOR = "/"; private static final String TARGET_PATHS = "target_paths"; private static final String RAWSET_PREFIX = "rawset_"; - private static Boolean DEFAULT_TRANSFORM_ONLY = false; - public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( MigrateActionSet.class.getResourceAsStream( - "/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json"))); + "/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json"))); parser.parseArgument(args); new MigrateActionSet().run(parser); @@ -56,11 +57,11 @@ public class MigrateActionSet { final String transform_only_s = parser.get("transform_only"); - log.info("transform only param: " + transform_only_s); + log.info("transform only param: {}", transform_only_s); final Boolean transformOnly = Boolean.valueOf(parser.get("transform_only")); - log.info("transform only: " + transformOnly); + log.info("transform only: {}", transformOnly); ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); @@ -79,22 +80,19 @@ public class MigrateActionSet { final List sourcePaths = getSourcePaths(sourceNN, isLookUp); log.info( - String.format( - "paths to process:\n%s", - sourcePaths.stream() - .map(p -> p.toString()) - .collect(Collectors.joining("\n")))); + "paths to process:\n{}", + sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n"))); for (Path source : sourcePaths) { if (!sourceFS.exists(source)) { - log.warn(String.format("skipping unexisting path: %s", source)); + log.warn("skipping unexisting path: {}", source); } else { LinkedList pathQ = Lists.newLinkedList(Splitter.on(SEPARATOR).split(source.toUri().getPath())); final String rawSet = pathQ.pollLast(); - log.info(String.format("got RAWSET: %s", rawSet)); + log.info("got RAWSET: {}", rawSet); if (StringUtils.isNotBlank(rawSet) && rawSet.startsWith(RAWSET_PREFIX)) { @@ -109,7 +107,7 @@ public class MigrateActionSet { + SEPARATOR + rawSet); - log.info(String.format("using TARGET PATH: %s", targetPath)); + log.info("using TARGET PATH: {}", targetPath); if (!transformOnly) { if (targetFS.exists(targetPath)) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java similarity index 98% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java rename to dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java index 5188cf55a..adeffa9fc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/ProtoConverter.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java @@ -1,4 +1,9 @@ -package eu.dnetlib.dhp.migration.actions; +package eu.dnetlib.dhp.actionmanager.migration; + +import static eu.dnetlib.data.proto.KindProtos.Kind.entity; +import static eu.dnetlib.data.proto.KindProtos.Kind.relation; +import static eu.dnetlib.data.proto.TypeProtos.*; +import static eu.dnetlib.data.proto.TypeProtos.Type.*; import com.google.common.collect.Lists; import com.googlecode.protobuf.format.JsonFormat; diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java new file mode 100644 index 000000000..3226361cb --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/TransformActions.java @@ -0,0 +1,179 @@ +package eu.dnetlib.dhp.actionmanager.migration; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.protobuf.InvalidProtocolBufferException; +import eu.dnetlib.data.proto.OafProtos; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.io.Serializable; +import java.util.LinkedList; +import java.util.Objects; +import java.util.Optional; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +public class TransformActions implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(TransformActions.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final String SEPARATOR = "/"; + + public static void main(String[] args) throws Exception { + final ArgumentApplicationParser parser = + new ArgumentApplicationParser( + IOUtils.toString( + MigrateActionSet.class.getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json"))); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = + Optional.ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String isLookupUrl = parser.get("isLookupUrl"); + log.info("isLookupUrl: {}", isLookupUrl); + + final String inputPaths = parser.get("inputPaths"); + + if (StringUtils.isBlank(inputPaths)) { + throw new RuntimeException("empty inputPaths"); + } + log.info("inputPaths: {}", inputPaths); + + final String targetBaseDir = getTargetBaseDir(isLookupUrl); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> transformActions(inputPaths, targetBaseDir, spark)); + } + + private static void transformActions( + String inputPaths, String targetBaseDir, SparkSession spark) throws IOException { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); + + for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { + + LinkedList pathQ = + Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath)); + + final String rawset = pathQ.pollLast(); + final String actionSetDirectory = pathQ.pollLast(); + + final Path targetDirectory = + new Path(targetBaseDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawset); + + if (fs.exists(targetDirectory)) { + log.info("found target directory '{}", targetDirectory); + fs.delete(targetDirectory, true); + log.info("deleted target directory '{}", targetDirectory); + } + + log.info("transforming actions from '{}' to '{}'", sourcePath, targetDirectory); + + sc.sequenceFile(sourcePath, Text.class, Text.class) + .map( + a -> + eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON( + a._2().toString())) + .map(TransformActions::doTransform) + .filter(Objects::nonNull) + .mapToPair( + a -> + new Tuple2<>( + a.getClazz().toString(), + OBJECT_MAPPER.writeValueAsString(a))) + .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) + .saveAsNewAPIHadoopFile( + targetDirectory.toString(), + Text.class, + Text.class, + SequenceFileOutputFormat.class, + sc.hadoopConfiguration()); + } + } + + private static AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) + throws InvalidProtocolBufferException { + + // dedup similarity relations had empty target value, don't migrate them + if (aa.getTargetValue().length == 0) { + return null; + } + final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue()); + final Oaf oaf = ProtoConverter.convert(proto_oaf); + switch (proto_oaf.getKind()) { + case entity: + switch (proto_oaf.getEntity().getType()) { + case datasource: + return new AtomicAction<>(Datasource.class, (Datasource) oaf); + case organization: + return new AtomicAction<>(Organization.class, (Organization) oaf); + case project: + return new AtomicAction<>(Project.class, (Project) oaf); + case result: + final String resulttypeid = + proto_oaf + .getEntity() + .getResult() + .getMetadata() + .getResulttype() + .getClassid(); + switch (resulttypeid) { + case "publication": + return new AtomicAction<>(Publication.class, (Publication) oaf); + case "software": + return new AtomicAction<>(Software.class, (Software) oaf); + case "other": + return new AtomicAction<>( + OtherResearchProduct.class, (OtherResearchProduct) oaf); + case "dataset": + return new AtomicAction<>(Dataset.class, (Dataset) oaf); + default: + // can be an update, where the resulttype is not specified + return new AtomicAction<>(Result.class, (Result) oaf); + } + default: + throw new IllegalArgumentException( + "invalid entity type: " + proto_oaf.getEntity().getType()); + } + case relation: + return new AtomicAction<>(Relation.class, (Relation) oaf); + default: + throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind()); + } + } + + private static String getTargetBaseDir(String isLookupUrl) throws ISLookUpException { + ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); + String XQUERY = + "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()"; + return isLookUp.getResourceProfileByQuery(XQUERY); + } +} diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json new file mode 100644 index 000000000..c7b931c44 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json @@ -0,0 +1,56 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "sn", + "paramLongName": "sourceNameNode", + "paramDescription": "nameNode of the source cluster", + "paramRequired": true + }, + { + "paramName": "tn", + "paramLongName": "targetNameNode", + "paramDescription": "namoNode of the target cluster", + "paramRequired": true + }, + { + "paramName": "w", + "paramLongName": "workingDirectory", + "paramDescription": "working directory", + "paramRequired": true + }, + { + "paramName": "nm", + "paramLongName": "distcp_num_maps", + "paramDescription": "maximum number of map tasks used in the distcp process", + "paramRequired": true + }, + { + "paramName": "mm", + "paramLongName": "distcp_memory_mb", + "paramDescription": "memory for distcp action copying actionsets from remote cluster", + "paramRequired": true + }, + { + "paramName": "tt", + "paramLongName": "distcp_task_timeout", + "paramDescription": "timeout for distcp copying actions from remote cluster", + "paramRequired": true + }, + { + "paramName": "tr", + "paramLongName": "transform_only", + "paramDescription": "activate tranform-only mode. Only apply transformation step", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json new file mode 100644 index 000000000..85c39c5b3 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/migration/transform_actionsets_parameters.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "inputPaths", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/config-default.xml rename to dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml similarity index 63% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml rename to dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml index ed01c8de4..d8888de9d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/migration/oozie_app/workflow.xml @@ -10,7 +10,6 @@ workingDirectory - /tmp/actionsets working directory @@ -44,6 +43,20 @@ sparkExecutorCores number of cores used by single executor + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + spark2YarnHistoryServerAddress spark 2.* yarn history server address @@ -66,23 +79,27 @@ oozie.launcher.mapred.job.queue.name ${oozieLauncherQueueName} + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + - + - + - eu.dnetlib.dhp.migration.actions.MigrateActionSet + eu.dnetlib.dhp.actionmanager.migration.MigrateActionSet -Dmapred.task.timeout=${distcp_task_timeout} - -is${isLookupUrl} - -sn${sourceNN} - -tn${nameNode} - -w${workingDirectory} - -nm${distcp_num_maps} - -mm${distcp_memory_mb} - -tt${distcp_task_timeout} - -tr${transform_only} + --isLookupUrl${isLookupUrl} + --sourceNameNode${sourceNN} + --targetNameNode${nameNode} + --workingDirectory${workingDirectory} + --distcp_num_maps${distcp_num_maps} + --distcp_memory_mb${distcp_memory_mb} + --distcp_task_timeout${distcp_task_timeout} + --transform_only${transform_only} @@ -94,19 +111,18 @@ yarn cluster transform_actions - eu.dnetlib.dhp.migration.actions.TransformActions - dhp-aggregation-${projectVersion}.jar + eu.dnetlib.dhp.actionmanager.migration.TransformActions + dhp-actionmanager-${projectVersion}.jar - --executor-cores ${sparkExecutorCores} - --executor-memory ${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" - --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -mtyarn - -is${isLookupUrl} + --isLookupUrl${isLookupUrl} --inputPaths${wf:actionData('migrate_actionsets')['target_paths']} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java deleted file mode 100644 index 81c2e7705..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java +++ /dev/null @@ -1,213 +0,0 @@ -package eu.dnetlib.dhp.migration.actions; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; -import eu.dnetlib.data.proto.OafProtos; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import java.io.IOException; -import java.io.Serializable; -import java.util.LinkedList; -import java.util.Objects; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; - -public class TransformActions implements Serializable { - - private static final Log log = LogFactory.getLog(TransformActions.class); - private static final String SEPARATOR = "/"; - - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = - new ArgumentApplicationParser( - IOUtils.toString( - MigrateActionSet.class.getResourceAsStream( - "/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json"))); - parser.parseArgument(args); - - new TransformActions().run(parser); - } - - private void run(ArgumentApplicationParser parser) throws ISLookUpException, IOException { - - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: " + isLookupUrl); - - final String inputPaths = parser.get("inputPaths"); - - if (StringUtils.isBlank(inputPaths)) { - throw new RuntimeException("empty inputPaths"); - } - log.info("inputPaths: " + inputPaths); - - final String targetBaseDir = getTargetBaseDir(isLookupUrl); - - try (SparkSession spark = getSparkSession(parser)) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - for (String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { - - LinkedList pathQ = - Lists.newLinkedList(Splitter.on(SEPARATOR).split(sourcePath)); - - final String rawset = pathQ.pollLast(); - final String actionSetDirectory = pathQ.pollLast(); - - final Path targetDirectory = - new Path( - targetBaseDir - + SEPARATOR - + actionSetDirectory - + SEPARATOR - + rawset); - - if (fs.exists(targetDirectory)) { - log.info(String.format("found target directory '%s", targetDirectory)); - fs.delete(targetDirectory, true); - log.info(String.format("deleted target directory '%s", targetDirectory)); - } - - log.info( - String.format( - "transforming actions from '%s' to '%s'", - sourcePath, targetDirectory)); - - sc.sequenceFile(sourcePath, Text.class, Text.class) - .map( - a -> - eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON( - a._2().toString())) - .map(a -> doTransform(a)) - .filter(Objects::isNull) - .filter(a -> a.getPayload() == null) - .map(a -> new ObjectMapper().writeValueAsString(a)) - .saveAsTextFile(targetDirectory.toString(), GzipCodec.class); - } - } - } - - private Text transformAction(eu.dnetlib.actionmanager.actions.AtomicAction aa) - throws InvalidProtocolBufferException, JsonProcessingException { - final Text out = new Text(); - final ObjectMapper mapper = new ObjectMapper(); - if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) { - out.set(mapper.writeValueAsString(doTransform(aa))); - } - return out; - } - - private AtomicAction getRelationAtomicAction(String atomicaActionId) { - final String[] splitId = atomicaActionId.split("@"); - - String source = splitId[0]; - String target = splitId[2]; - - String[] relSemantic = splitId[1].split("_"); - - Relation rel = new Relation(); - rel.setSource(source); - rel.setTarget(target); - rel.setRelType(relSemantic[0]); - rel.setSubRelType(relSemantic[1]); - rel.setRelClass(relSemantic[2]); - - DataInfo d = new DataInfo(); - d.setDeletedbyinference(false); - d.setInferenceprovenance("deduplication"); - d.setInferred(true); - d.setInvisible(false); - Qualifier provenanceaction = new Qualifier(); - - provenanceaction.setClassid("deduplication"); - provenanceaction.setClassname("deduplication"); - provenanceaction.setSchemeid("dnet:provenanceActions"); - provenanceaction.setSchemename("dnet:provenanceActions"); - - d.setProvenanceaction(provenanceaction); - - rel.setDataInfo(d); - - return new AtomicAction<>(Relation.class, rel); - } - - private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) - throws InvalidProtocolBufferException { - final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue()); - final Oaf oaf = ProtoConverter.convert(proto_oaf); - switch (proto_oaf.getKind()) { - case entity: - switch (proto_oaf.getEntity().getType()) { - case datasource: - return new AtomicAction<>(Datasource.class, (Datasource) oaf); - case organization: - return new AtomicAction<>(Organization.class, (Organization) oaf); - case project: - return new AtomicAction<>(Project.class, (Project) oaf); - case result: - final String resulttypeid = - proto_oaf - .getEntity() - .getResult() - .getMetadata() - .getResulttype() - .getClassid(); - switch (resulttypeid) { - case "publication": - return new AtomicAction<>(Publication.class, (Publication) oaf); - case "software": - return new AtomicAction<>(Software.class, (Software) oaf); - case "other": - return new AtomicAction<>( - OtherResearchProduct.class, (OtherResearchProduct) oaf); - case "dataset": - return new AtomicAction<>(Dataset.class, (Dataset) oaf); - default: - // can be an update, where the resulttype is not specified - return new AtomicAction<>(Result.class, (Result) oaf); - } - default: - throw new IllegalArgumentException( - "invalid entity type: " + proto_oaf.getEntity().getType()); - } - case relation: - return new AtomicAction<>(Relation.class, (Relation) oaf); - default: - throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind()); - } - } - - private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException { - ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService(isLookupUrl); - String XQUERY = - "collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()"; - return isLookUp.getResourceProfileByQuery(XQUERY); - } - - private static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - return SparkSession.builder() - .appName(TransformActions.class.getSimpleName()) - .master(parser.get("master")) - .config(conf) - .enableHiveSupport() - .getOrCreate(); - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json deleted file mode 100644 index c4910ec61..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_actionsets_parameters.json +++ /dev/null @@ -1,10 +0,0 @@ -[ - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"sn", "paramLongName":"sourceNameNode", "paramDescription": "nameNode of the source cluster", "paramRequired": true}, - {"paramName":"tn", "paramLongName":"targetNameNode", "paramDescription": "namoNode of the target cluster", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workingDirectory", "paramDescription": "working directory", "paramRequired": true}, - {"paramName":"nm", "paramLongName":"distcp_num_maps", "paramDescription": "maximum number of map tasks used in the distcp process", "paramRequired": true}, - {"paramName":"mm", "paramLongName":"distcp_memory_mb", "paramDescription": "memory for distcp action copying actionsets from remote cluster", "paramRequired": true}, - {"paramName":"tt", "paramLongName":"distcp_task_timeout", "paramDescription": "timeout for distcp copying actions from remote cluster", "paramRequired": true}, - {"paramName":"tr", "paramLongName":"transform_only", "paramDescription": "activate tranform-only mode. Only apply transformation step", "paramRequired": true} -] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json deleted file mode 100644 index ce72f53ca..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/transform_actionsets_parameters.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"i", "paramLongName":"inputPaths", "paramDescription": "URL of the isLookUp Service", "paramRequired": true} -]