From ab37953332755ed53ebf95655547a4736d8f7395 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 14 May 2020 10:25:41 +0200 Subject: [PATCH] added global properties in wf definitions to avoid repeating name-node and job-tracker in the (many) distcp actions; reintroduced output directory removal at the beginning of each spark action --- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 2 + .../SparkCountryPropagationJob.java | 17 +- .../PrepareResultOrcidAssociationStep1.java | 4 +- .../PrepareResultOrcidAssociationStep2.java | 4 +- .../SparkOrcidToResultFromSemRelJob.java | 7 +- .../PrepareProjectResultsAssociation.java | 2 + .../PrepareResultCommunitySet.java | 4 +- ...kResultToCommunityFromOrganizationJob.java | 7 +- .../PrepareResultInstRepoAssociation.java | 43 +++-- ...arkResultToOrganizationFromIstRepoJob.java | 7 +- .../dhp/bulktag/oozie_app/workflow.xml | 27 ++- .../countrypropagation/oozie_app/workflow.xml | 20 ++- .../oozie_app/workflow.xml | 7 +- .../projecttoresult/oozie_app/workflow.xml | 29 ++-- .../oozie_app/workflow.xml | 29 ++-- .../oozie_app/workflow.xml | 52 +++--- .../raw/AbstractMdRecordToOafMapper.java | 164 +++++++++--------- 17 files changed, 213 insertions(+), 212 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 75d85e2ba..1c65e8ade 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.bulktag; +import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Optional; @@ -84,6 +85,7 @@ public class SparkBulkTagJob { conf, isSparkSessionManaged, spark -> { + removeOutputDir(spark, outputPath); execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 9dc17701b..974b3a3b1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -69,13 +69,16 @@ public class SparkCountryPropagationJob { runWithSparkSession( conf, isSparkSessionManaged, - spark -> execPropagation( - spark, - sourcePath, - preparedInfoPath, - outputPath, - resultClazz, - saveGraph)); + spark -> { + removeOutputDir(spark, outputPath); + execPropagation( + spark, + sourcePath, + preparedInfoPath, + outputPath, + resultClazz, + saveGraph); + }); } private static void execPropagation( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java index 3e16b4b4b..400c8d8ef 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep1.java @@ -74,9 +74,7 @@ public class PrepareResultOrcidAssociationStep1 { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); prepareInfo( spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel); }); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java index 65d8811bc..2cea32e58 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/PrepareResultOrcidAssociationStep2.java @@ -50,9 +50,7 @@ public class PrepareResultOrcidAssociationStep2 { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); mergeInfo(spark, inputPath, outputPath); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java index ebb75a5a6..b34b29c48 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/orcidtoresultfromsemrel/SparkOrcidToResultFromSemRelJob.java @@ -70,11 +70,10 @@ public class SparkOrcidToResultFromSemRelJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java index 05dcdc692..c27da4258 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/projecttoresult/PrepareProjectResultsAssociation.java @@ -60,6 +60,8 @@ public class PrepareProjectResultsAssociation { conf, isSparkSessionManaged, spark -> { + removeOutputDir(spark, potentialUpdatePath); + removeOutputDir(spark, alreadyLinkedPath); prepareResultProjProjectResults( spark, inputPath, diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index e2d4d5687..90eb54e5f 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -55,9 +55,7 @@ public class PrepareResultCommunitySet { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } + removeOutputDir(spark, outputPath); prepareInfo(spark, inputPath, outputPath, organizationMap); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 71275cc7f..66297e177 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -68,11 +68,10 @@ public class SparkResultToCommunityFromOrganizationJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index f8fe1668f..5f549be53 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -58,30 +58,15 @@ public class PrepareResultInstRepoAssociation { isSparkSessionManaged, spark -> { readNeededResources(spark, inputPath); + + removeOutputDir(spark, datasourceOrganizationPath); prepareDatasourceOrganization(spark, datasourceOrganizationPath); + + removeOutputDir(spark, alreadyLinkedPath); prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath); }); } - private static void prepareAlreadyLinkedAssociation( - SparkSession spark, String alreadyLinkedPath) { - String query = "Select source resultId, collect_set(target) organizationSet " - + "from relation " - + "where datainfo.deletedbyinference = false " - + "and relClass = '" - + RELATION_RESULT_ORGANIZATION_REL_CLASS - + "' " - + "group by source"; - - spark - .sql(query) - .as(Encoders.bean(ResultOrganizationSet.class)) - // TODO retry to stick with datasets - .toJavaRDD() - .map(r -> OBJECT_MAPPER.writeValueAsString(r)) - .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); - } - private static void readNeededResources(SparkSession spark, String inputPath) { Dataset datasource = readPath(spark, inputPath + "/datasource", Datasource.class); datasource.createOrReplaceTempView("datasource"); @@ -119,4 +104,24 @@ public class PrepareResultInstRepoAssociation { .option("compression", "gzip") .json(datasourceOrganizationPath); } + + private static void prepareAlreadyLinkedAssociation( + SparkSession spark, String alreadyLinkedPath) { + String query = "Select source resultId, collect_set(target) organizationSet " + + "from relation " + + "where datainfo.deletedbyinference = false " + + "and relClass = '" + + RELATION_RESULT_ORGANIZATION_REL_CLASS + + "' " + + "group by source"; + + spark + .sql(query) + .as(Encoders.bean(ResultOrganizationSet.class)) + // TODO retry to stick with datasets + .toJavaRDD() + .map(r -> OBJECT_MAPPER.writeValueAsString(r)) + .saveAsTextFile(alreadyLinkedPath, GzipCodec.class); + } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 86634d43f..13577fa7c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -83,10 +83,8 @@ public class SparkResultToOrganizationFromIstRepoJob { conf, isSparkSessionManaged, spark -> { - if (isTest(parser)) { - removeOutputDir(spark, outputPath); - } - if (saveGraph) + removeOutputDir(spark, outputPath); + if (saveGraph) { execPropagation( spark, datasourceorganization, @@ -94,6 +92,7 @@ public class SparkResultToOrganizationFromIstRepoJob { inputPath, outputPath, resultClazz); + } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml index 754aba4f2..f019f8413 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/bulktag/oozie_app/workflow.xml @@ -18,6 +18,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -42,8 +53,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -53,8 +62,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -64,8 +71,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -75,8 +80,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -95,8 +98,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-publication @@ -124,8 +125,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-dataset @@ -153,8 +152,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-orp @@ -182,8 +179,6 @@ - ${jobTracker} - ${nameNode} yarn-cluster cluster bulkTagging-software diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index fc877071d..85116e4cc 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -19,6 +19,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -43,8 +54,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -54,18 +63,15 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -75,8 +81,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml index e4429b710..5ddc5fedf 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -57,6 +57,7 @@ + ${jobTracker} @@ -81,7 +82,6 @@ - @@ -230,8 +230,8 @@ - + @@ -271,6 +271,7 @@ + yarn @@ -302,6 +303,7 @@ + yarn @@ -333,6 +335,7 @@ + yarn diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml index 24e1d3b7f..9e91c06fb 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/projecttoresult/oozie_app/workflow.xml @@ -14,6 +14,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -42,8 +53,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -53,8 +62,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/publication ${nameNode}/${outputPath}/publication @@ -64,8 +71,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/dataset ${nameNode}/${outputPath}/dataset @@ -75,8 +80,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/otherresearchproduct ${nameNode}/${outputPath}/otherresearchproduct @@ -86,28 +89,24 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/software ${nameNode}/${outputPath}/software + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization + - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -117,8 +116,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml index d481cad05..6a329fdc4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttocommunityfromorganization/oozie_app/workflow.xml @@ -14,6 +14,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -38,8 +49,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -49,8 +58,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -60,8 +67,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -71,8 +76,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -101,8 +104,8 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/relation - --hive_metastore_uris${hive_metastore_uris} --outputPath${workingDir}/preparedInfo/resultCommunityList + --hive_metastore_uris${hive_metastore_uris} --organizationtoresultcommunitymap${organizationtoresultcommunitymap} @@ -136,9 +139,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/publication + --outputPath${outputPath}/publication --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${outputPath}/publication --saveGraph${saveGraph} @@ -165,9 +168,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/dataset + --outputPath${outputPath}/dataset --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${outputPath}/dataset --saveGraph${saveGraph} @@ -194,9 +197,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/otherresearchproduct + --outputPath${outputPath}/otherresearchproduct --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${outputPath}/otherresearchproduct --saveGraph${saveGraph} @@ -223,9 +226,9 @@ --preparedInfoPath${workingDir}/preparedInfo/resultCommunityList --sourcePath${sourcePath}/software + --outputPath${outputPath}/software --hive_metastore_uris${hive_metastore_uris} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${outputPath}/software --saveGraph${saveGraph} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml index a1b7f4ad7..e0563abae 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/oozie_app/workflow.xml @@ -10,6 +10,17 @@ + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -38,8 +49,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/relation ${nameNode}/${outputPath}/relation @@ -49,8 +58,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/publication ${nameNode}/${outputPath}/publication @@ -60,8 +67,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/dataset ${nameNode}/${outputPath}/dataset @@ -71,8 +76,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/otherresearchproduct ${nameNode}/${outputPath}/otherresearchproduct @@ -82,8 +85,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/software ${nameNode}/${outputPath}/software @@ -93,8 +94,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/organization ${nameNode}/${outputPath}/organization @@ -104,8 +103,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/project ${nameNode}/${outputPath}/project @@ -115,8 +112,6 @@ - ${jobTracker} - ${nameNode} ${nameNode}/${sourcePath}/datasource ${nameNode}/${outputPath}/datasource @@ -125,6 +120,7 @@ + yarn @@ -176,12 +172,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/publication - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication @@ -206,12 +202,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/dataset - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset @@ -236,12 +232,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/otherresearchproduct - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct @@ -266,12 +262,12 @@ --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --sourcePath${sourcePath}/software - --hive_metastore_uris${hive_metastore_uris} - --saveGraph${saveGraph} - --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/relation --datasourceOrganizationPath${workingDir}/preparedInfo/datasourceOrganization --alreadyLinkedPath${workingDir}/preparedInfo/alreadyLinked + --hive_metastore_uris${hive_metastore_uris} + --saveGraph${saveGraph} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Software diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index be0b91022..b9c4e6c80 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -64,7 +64,7 @@ public abstract class AbstractMdRecordToOafMapper { } protected static final Qualifier MAIN_TITLE_QUALIFIER = qualifier( - "main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); + "main title", "main title", "dnet:dataCite_title", "dnet:dataCite_title"); protected AbstractMdRecordToOafMapper(final Map code2name) { this.code2name = code2name; @@ -75,20 +75,20 @@ public abstract class AbstractMdRecordToOafMapper { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); final Document doc = DocumentHelper - .parseText( - xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)); + .parseText( + xml.replaceAll(DATACITE_SCHEMA_KERNEL_4, DATACITE_SCHEMA_KERNEL_3)); final String type = doc.valueOf("//dr:CobjCategory/@type"); final KeyValue collectedFrom = getProvenanceDatasource( - doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name"); + doc, "//oaf:collectedFrom/@id", "//oaf:collectedFrom/@name"); if (collectedFrom == null) { return null; } final KeyValue hostedBy = StringUtils.isBlank(doc.valueOf("//oaf:hostedBy/@id")) - ? collectedFrom - : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name"); + ? collectedFrom + : getProvenanceDatasource(doc, "//oaf:hostedBy/@id", "//oaf:hostedBy/@name"); if (hostedBy == null) { return null; @@ -112,17 +112,17 @@ public abstract class AbstractMdRecordToOafMapper { } return keyValue( - createOpenaireId(10, dsId, true), - dsName); + createOpenaireId(10, dsId, true), + dsName); } protected List createOafs( - final Document doc, - final String type, - final KeyValue collectedFrom, - final KeyValue hostedBy, - final DataInfo info, - final long lastUpdateTimestamp) { + final Document doc, + final String type, + final KeyValue collectedFrom, + final KeyValue hostedBy, + final DataInfo info, + final long lastUpdateTimestamp) { final List oafs = new ArrayList<>(); @@ -179,10 +179,10 @@ public abstract class AbstractMdRecordToOafMapper { } private List addProjectRels( - final Document doc, - final KeyValue collectedFrom, - final DataInfo info, - final long lastUpdateTimestamp) { + final Document doc, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { final List res = new ArrayList<>(); @@ -196,15 +196,15 @@ public abstract class AbstractMdRecordToOafMapper { final String projectId = createOpenaireId(40, originalId, true); res - .add( - getRelation( - docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info, - lastUpdateTimestamp)); + .add( + getRelation( + docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, collectedFrom, info, + lastUpdateTimestamp)); res - .add( - getRelation( - projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info, - lastUpdateTimestamp)); + .add( + getRelation( + projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, collectedFrom, info, + lastUpdateTimestamp)); } } @@ -212,7 +212,7 @@ public abstract class AbstractMdRecordToOafMapper { } protected Relation getRelation(String source, String target, String relType, String subRelType, String relClass, - KeyValue collectedFrom, DataInfo info, long lastUpdateTimestamp) { + KeyValue collectedFrom, DataInfo info, long lastUpdateTimestamp) { final Relation rel = new Relation(); rel.setRelType(relType); rel.setSubRelType(subRelType); @@ -226,27 +226,27 @@ public abstract class AbstractMdRecordToOafMapper { } protected abstract List addOtherResultRels( - final Document doc, - final KeyValue collectedFrom, - final DataInfo info, - final long lastUpdateTimestamp); + final Document doc, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp); private void populateResultFields( - final Result r, - final Document doc, - final KeyValue collectedFrom, - final KeyValue hostedBy, - final DataInfo info, - final long lastUpdateTimestamp) { + final Result r, + final Document doc, + final KeyValue collectedFrom, + final KeyValue hostedBy, + final DataInfo info, + final long lastUpdateTimestamp) { r.setDataInfo(info); r.setLastupdatetimestamp(lastUpdateTimestamp); r.setId(createOpenaireId(50, doc.valueOf("//dri:objIdentifier"), false)); r.setOriginalId(Arrays.asList(doc.valueOf("//dri:objIdentifier"))); r.setCollectedfrom(Arrays.asList(collectedFrom)); r - .setPid( - prepareListStructProps( - doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info)); + .setPid( + prepareListStructProps( + doc, "//oaf:identifier", "@identifierType", "dnet:pid_types", "dnet:pid_types", info)); r.setDateofcollection(doc.valueOf("//dr:dateOfCollection")); r.setDateoftransformation(doc.valueOf("//dr:dateOfTransformation")); r.setExtraInfo(new ArrayList<>()); // NOT PRESENT IN MDSTORES @@ -289,7 +289,7 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract Qualifier prepareResourceType(Document doc, DataInfo info); protected abstract List prepareInstances( - Document doc, DataInfo info, KeyValue collectedfrom, KeyValue hostedby); + Document doc, DataInfo info, KeyValue collectedfrom, KeyValue hostedby); protected abstract List> prepareSources(Document doc, DataInfo info); @@ -314,13 +314,13 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract List prepareAuthors(Document doc, DataInfo info); protected abstract List> prepareOtherResearchProductTools( - Document doc, DataInfo info); + Document doc, DataInfo info); protected abstract List> prepareOtherResearchProductContactGroups( - Document doc, DataInfo info); + Document doc, DataInfo info); protected abstract List> prepareOtherResearchProductContactPersons( - Document doc, DataInfo info); + Document doc, DataInfo info); protected abstract Qualifier prepareSoftwareProgrammingLanguage(Document doc, DataInfo info); @@ -329,7 +329,7 @@ public abstract class AbstractMdRecordToOafMapper { protected abstract List prepareSoftwareLicenses(Document doc, DataInfo info); protected abstract List> prepareSoftwareDocumentationUrls( - Document doc, DataInfo info); + Document doc, DataInfo info); protected abstract List prepareDatasetGeoLocations(Document doc, DataInfo info); @@ -359,37 +359,37 @@ public abstract class AbstractMdRecordToOafMapper { final String edition = n.valueOf("@edition"); if (StringUtils.isNotBlank(name)) { return journal( - name, - issnPrinted, - issnOnline, - issnLinking, - ep, - iss, - sp, - vol, - edition, - null, - null, - info); + name, + issnPrinted, + issnOnline, + issnLinking, + ep, + iss, + sp, + vol, + edition, + null, + null, + info); } } return null; } protected Qualifier prepareQualifier( - final Node node, final String xpath, final String schemeId, final String schemeName) { + final Node node, final String xpath, final String schemeId, final String schemeName) { final String classId = node.valueOf(xpath); final String className = code2name.get(classId); return qualifier(classId, className, schemeId, schemeName); } protected List prepareListStructProps( - final Node node, - final String xpath, - final String xpathClassId, - final String schemeId, - final String schemeName, - final DataInfo info) { + final Node node, + final String xpath, + final String xpathClassId, + final String schemeId, + final String schemeName, + final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; @@ -401,7 +401,7 @@ public abstract class AbstractMdRecordToOafMapper { } protected List prepareListStructProps( - final Node node, final String xpath, final Qualifier qualifier, final DataInfo info) { + final Node node, final String xpath, final Qualifier qualifier, final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; @@ -411,19 +411,19 @@ public abstract class AbstractMdRecordToOafMapper { } protected List prepareListStructProps( - final Node node, final String xpath, final DataInfo info) { + final Node node, final String xpath, final DataInfo info) { final List res = new ArrayList<>(); for (final Object o : node.selectNodes(xpath)) { final Node n = (Node) o; res - .add( - structuredProperty( - n.getText(), - n.valueOf("@classid"), - n.valueOf("@classname"), - n.valueOf("@schemeid"), - n.valueOf("@schemename"), - info)); + .add( + structuredProperty( + n.getText(), + n.valueOf("@classid"), + n.valueOf("@classname"), + n.valueOf("@schemeid"), + n.valueOf("@schemename"), + info)); } return res; } @@ -450,7 +450,7 @@ public abstract class AbstractMdRecordToOafMapper { if (n == null) { return dataInfo( - false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9"); + false, null, false, false, REPOSITORY_PROVENANCE_ACTIONS, "0.9"); } final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); @@ -464,12 +464,12 @@ public abstract class AbstractMdRecordToOafMapper { final String trust = n.valueOf("./oaf:trust"); return dataInfo( - deletedbyinference, - inferenceprovenance, - inferred, - false, - qualifier(paClassId, paClassName, paSchemeId, paSchemeName), - trust); + deletedbyinference, + inferenceprovenance, + inferred, + false, + qualifier(paClassId, paClassName, paSchemeId, paSchemeName), + trust); } protected Field prepareField(final Node node, final String xpath, final DataInfo info) { @@ -477,7 +477,7 @@ public abstract class AbstractMdRecordToOafMapper { } protected List> prepareListFields( - final Node node, final String xpath, final DataInfo info) { + final Node node, final String xpath, final DataInfo info) { return listFields(info, prepareListString(node, xpath)); }