From ff30f99c65c04bad95af6dc887797781f6d322df Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 15 Apr 2020 16:16:20 +0200 Subject: [PATCH 1/3] using newline delimited json files for the raw graph materialization. Introduced contentPath parameter --- .../raw/DispatchEntitiesApplication.java | 6 ++-- .../oa/graph/raw/MergeClaimsApplication.java | 10 +++++- .../oa/graph/raw_all/oozie_app/workflow.xml | 32 +++++++++++-------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java index 0b47db588..4812f1c30 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/DispatchEntitiesApplication.java @@ -64,6 +64,7 @@ public class DispatchEntitiesApplication { log.info(String.format("Processing entities (%s) in file: %s", type, sourcePath)); + /* spark.read() .textFile(sourcePath) .filter((FilterFunction) value -> isEntityType(value, type)) @@ -73,14 +74,13 @@ public class DispatchEntitiesApplication { .mode(SaveMode.Overwrite) .parquet(targetPath + "/" + type); - /* + */ + JavaSparkContext.fromSparkContext(spark.sparkContext()) .textFile(sourcePath) .filter(l -> isEntityType(l, type)) .map(l -> StringUtils.substringAfter(l, "|")) .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ??? - - */ } private static boolean isEntityType(final String line, final String type) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 85e4f3663..4b209c68a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -100,16 +100,24 @@ public class MergeClaimsApplication { return opRaw.isPresent() ? opRaw.get()._2() : opClaim.isPresent() ? opClaim.get()._2() : null; }, Encoders.bean(clazz)) .filter(Objects::nonNull) + .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) .write() .mode(SaveMode.Overwrite) - .parquet(outPath); + .option("compression", "gzip") + .text(outPath); } private static Dataset readFromPath(SparkSession spark, String path, Class clazz) { + return spark.read() + .textFile(path) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)) + .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); + /* return spark.read() .load(path) .as(Encoders.bean(clazz)) .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); + */ } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index d33bb0211..9f91380ab 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -10,6 +10,10 @@ false should import content from the aggregator or reuse a previous version + + contentPath + path location to store (or reuse) content from the aggregator + postgresURL the postgres URL to access to the database @@ -108,10 +112,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - -p${workingDir}/db_claims + -p${contentPath}/db_claims -pgurl${postgresURL} -pguser${postgresUser} -pgpasswd${postgresPassword} @@ -124,10 +128,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/odf_claims + -p${contentPath}/odf_claims -mongourl${mongoURL} -mongodb${mongoDb} -fODF @@ -141,10 +145,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/oaf_claims + -p${contentPath}/oaf_claims -mongourl${mongoURL} -mongodb${mongoDb} -fOAF @@ -158,10 +162,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication - -p${workingDir}/db_records + -p${contentPath}/db_records -pgurl${postgresURL} -pguser${postgresUser} -pgpasswd${postgresPassword} @@ -173,10 +177,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/odf_records + -p${contentPath}/odf_records -mongourl${mongoURL} -mongodb${mongoDb} -fODF @@ -190,10 +194,10 @@ - + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication - -p${workingDir}/oaf_records + -p${contentPath}/oaf_records -mongourl${mongoURL} -mongodb${mongoDb} -fOAF @@ -227,7 +231,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -s${workingDir}/db_claims,${workingDir}/oaf_claims,${workingDir}/odf_claims + -s${contentPath}/db_claims,${contentPath}/oaf_claims,${contentPath}/odf_claims -t${workingDir}/entities_claim -pgurl${postgresURL} -pguser${postgresUser} @@ -276,7 +280,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - -s${workingDir}/db_records,${workingDir}/oaf_records,${workingDir}/odf_records + -s${contentPath}/db_records,${contentPath}/oaf_records,${contentPath}/odf_records -t${workingDir}/entities -pgurl${postgresURL} -pguser${postgresUser} From c439d0c6bbeeb527c675d8e816b55719ee6b3d07 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 15 Apr 2020 16:18:33 +0200 Subject: [PATCH 2/3] PromoteActionPayloadForGraphTableJob reads directly the content pointed by the input path, adjusted promote action tests (ISLookup mock) --- .../dnetlib/dhp/actionmanager/ISClient.java | 12 +++-- .../PartitionActionSetsByPayloadTypeJob.java | 46 +++++++++++------ .../PromoteActionPayloadForGraphTableJob.java | 8 +++ ...rtitionActionSetsByPayloadTypeJobTest.java | 51 +++++++++++++++---- 4 files changed, 88 insertions(+), 29 deletions(-) diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java index 1cb1eb4bc..994108b1c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/ISClient.java @@ -29,17 +29,21 @@ public class ISClient implements Serializable { private static final String INPUT_ACTION_SET_ID_SEPARATOR = ","; - public static List getLatestRawsetPaths(String isLookupUrl, String setIds) { + private ISLookUpService isLookup; + + public ISClient(String isLookupUrl) { + isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); + } + + public List getLatestRawsetPaths(String setIds) { - ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl); - ISClient isClient = new ISClient(); List ids = Lists.newArrayList(Splitter.on(INPUT_ACTION_SET_ID_SEPARATOR) .omitEmptyStrings() .trimResults() .split(setIds)); return ids.stream() - .map(id -> isClient.getSet(isLookup, id)) + .map(id -> getSet(isLookup, id)) .map(as -> as.getPathToLatest()) .collect(Collectors.toCollection(ArrayList::new)); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java index 8ba331626..31a4da190 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.java @@ -41,6 +41,15 @@ public class PartitionActionSetsByPayloadTypeJob { StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty()) )); + private ISClient isClient; + + public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) { + this.isClient = new ISClient(isLookupUrl); + } + + public PartitionActionSetsByPayloadTypeJob() { + } + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils.toString( PromoteActionPayloadForGraphTableJob.class @@ -63,7 +72,12 @@ public class PartitionActionSetsByPayloadTypeJob { String isLookupUrl = parser.get("isLookupUrl"); logger.info("isLookupUrl: {}", isLookupUrl); - List inputActionSetPaths = ISClient.getLatestRawsetPaths(isLookupUrl, inputActionSetIds); + new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath); + } + + protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) { + + List inputActionSetPaths = getIsClient().getLatestRawsetPaths(inputActionSetIds); logger.info("inputActionSetPaths: {}", String.join(",", inputActionSetPaths)); SparkConf conf = new SparkConf(); @@ -95,21 +109,15 @@ public class PartitionActionSetsByPayloadTypeJob { String path) { logger.info("Reading actions from path: {}", path); - List files = HdfsSupport.listFiles(path, spark.sparkContext().hadoopConfiguration()); - logger.info("Found files: {}", String.join(",", files)); - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return files - .stream() - .map(file -> { - JavaRDD rdd = sc - .sequenceFile(file, Text.class, Text.class) - .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); - return spark.createDataFrame(rdd, KV_SCHEMA) - .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) - .select(expr("atomic_action.*")); - }) - .reduce(spark.createDataFrame(Collections.emptyList(), ATOMIC_ACTION_SCHEMA), Dataset::union); + + JavaRDD rdd = sc + .sequenceFile(path, Text.class, Text.class) + .map(x -> RowFactory.create(x._1().toString(), x._2().toString())); + + return spark.createDataFrame(rdd, KV_SCHEMA) + .withColumn("atomic_action", from_json(col("value"), ATOMIC_ACTION_SCHEMA)) + .select(expr("atomic_action.*")); } private static void saveActions(Dataset actionDS, @@ -121,4 +129,12 @@ public class PartitionActionSetsByPayloadTypeJob { .mode(SaveMode.Append) .parquet(path); } + + public ISClient getIsClient() { + return isClient; + } + + public void setIsClient(ISClient isClient) { + this.isClient = isClient; + } } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 68bb35c2b..20b75842c 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -119,10 +120,17 @@ public class PromoteActionPayloadForGraphTableJob { String path, Class rowClazz) { logger.info("Reading graph table from path: {}", path); + + return spark.read() + .textFile(path) + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz)); + + /* return spark .read() .parquet(path) .as(Encoders.bean(rowClazz)); + */ } private static Dataset readActionPayload(SparkSession spark, diff --git a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java index 2761a7c89..bd5dc9a5d 100644 --- a/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java +++ b/dhp-workflows/dhp-actionmanager/src/test/java/eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.actionmanager.partition; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import eu.dnetlib.dhp.actionmanager.ISClient; import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest; import eu.dnetlib.dhp.schema.oaf.*; import org.apache.hadoop.conf.Configuration; @@ -15,7 +17,11 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.*; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; import scala.Tuple2; import scala.collection.mutable.Seq; @@ -31,6 +37,7 @@ import static org.apache.spark.sql.functions.*; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static scala.collection.JavaConversions.mutableSeqAsJavaList; +@ExtendWith(MockitoExtension.class) public class PartitionActionSetsByPayloadTypeJobTest { private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader(); @@ -64,20 +71,29 @@ public class PartitionActionSetsByPayloadTypeJobTest { @Nested class Main { + @Mock + private ISClient isClient; + @Test public void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception { // given - Path inputActionSetsDir = workingDir.resolve("input").resolve("action_sets"); + Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets"); Path outputDir = workingDir.resolve("output"); - Map> oafsByClassName = createActionSets(inputActionSetsDir); + Map> oafsByClassName = createActionSets(inputActionSetsBaseDir); + + List inputActionSetsPaths = resolveInputActionSetPaths(inputActionSetsBaseDir); // when - PartitionActionSetsByPayloadTypeJob.main(new String[]{ - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputActionSetPaths", inputActionSetsDir.toString(), - "-outputPath", outputDir.toString() - }); + Mockito.when(isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(inputActionSetsPaths); + + PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob(); + job.setIsClient(isClient); + job.run( + Boolean.FALSE, + "", // it can be empty we're mocking the response from isClient to resolve the paths + outputDir.toString() + ); // then Files.exists(outputDir); @@ -94,10 +110,19 @@ public class PartitionActionSetsByPayloadTypeJobTest { } } + private List resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException { + Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir(); + return Files + .list(inputActionSetJsonDumpsDir) + .map(path -> { + String inputActionSetId = path.getFileName().toString(); + return inputActionSetsBaseDir.resolve(inputActionSetId).toString(); + }) + .collect(Collectors.toCollection(ArrayList::new)); + } + private static Map> createActionSets(Path inputActionSetsDir) throws IOException { - Path inputActionSetJsonDumpsDir = Paths - .get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/")) - .getFile()); + Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir(); Map> oafsByType = new HashMap<>(); Files @@ -138,6 +163,12 @@ public class PartitionActionSetsByPayloadTypeJobTest { return oafsByType; } + private static Path getInputActionSetJsonDumpsDir() { + return Paths + .get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/")) + .getFile()); + } + private static Dataset readActionsFromJsonDump(String path) { return spark .read() From da019495d76ae9221b1c0556a8260420047fa1d0 Mon Sep 17 00:00:00 2001 From: pjacewicz Date: Wed, 15 Apr 2020 17:56:57 +0200 Subject: [PATCH 3/3] [dhp-actionmanager] target dir removal added for distcp actions --- .../dhp/actionmanager/wf/dataset/oozie_app/workflow.xml | 6 ++++++ .../dhp/actionmanager/wf/datasource/oozie_app/workflow.xml | 3 +++ .../actionmanager/wf/organization/oozie_app/workflow.xml | 3 +++ .../wf/otherresearchproduct/oozie_app/workflow.xml | 6 ++++++ .../dhp/actionmanager/wf/project/oozie_app/workflow.xml | 3 +++ .../dhp/actionmanager/wf/publication/oozie_app/workflow.xml | 6 ++++++ .../dhp/actionmanager/wf/relation/oozie_app/workflow.xml | 3 +++ .../dhp/actionmanager/wf/software/oozie_app/workflow.xml | 6 ++++++ 8 files changed, 36 insertions(+) diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml index 27c272aea..f95349935 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/dataset/oozie_app/workflow.xml @@ -118,6 +118,9 @@ + + + -pb ${inputGraphRootPath}/dataset ${workingDir}/dataset @@ -166,6 +169,9 @@ + + + -pb ${workingDir}/dataset ${outputGraphRootPath}/dataset diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml index 800f29b17..c85ba4ac1 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/datasource/oozie_app/workflow.xml @@ -113,6 +113,9 @@ + + + -pb ${inputGraphRootPath}/datasource ${outputGraphRootPath}/datasource diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml index a6c6220e4..412cad70b 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/organization/oozie_app/workflow.xml @@ -113,6 +113,9 @@ + + + -pb ${inputGraphRootPath}/organization ${outputGraphRootPath}/organization diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml index 4b65de3df..0deb1b945 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/otherresearchproduct/oozie_app/workflow.xml @@ -117,6 +117,9 @@ + + + -pb ${inputGraphRootPath}/otherresearchproduct ${workingDir}/otherresearchproduct @@ -165,6 +168,9 @@ + + + -pb ${workingDir}/otherresearchproduct ${outputGraphRootPath}/otherresearchproduct diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml index c73fea96e..daf48e9d7 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/project/oozie_app/workflow.xml @@ -113,6 +113,9 @@ + + + -pb ${inputGraphRootPath}/project ${outputGraphRootPath}/project diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml index 35be128da..70400a123 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app/workflow.xml @@ -118,6 +118,9 @@ + + + -pb ${inputGraphRootPath}/publication ${workingDir}/publication @@ -166,6 +169,9 @@ + + + -pb ${workingDir}/publication ${outputGraphRootPath}/publication diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml index f64301c70..a7dce8f2f 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app/workflow.xml @@ -114,6 +114,9 @@ + + + -pb ${inputGraphRootPath}/relation ${outputGraphRootPath}/relation diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml index 397184c91..396e27721 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/software/oozie_app/workflow.xml @@ -117,6 +117,9 @@ + + + -pb ${inputGraphRootPath}/software ${workingDir}/software @@ -165,6 +168,9 @@ + + + -pb ${workingDir}/software ${outputGraphRootPath}/software