From 815b9f4d56ec2f42f75fc3e93f2bd14b8e4b5c0b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 20 Apr 2021 17:24:45 +0200 Subject: [PATCH] [openorgs dedup] fixed workflow parameter declarations. Introduced support for resuming the execution from intermediate steps --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 2 +- .../dhp/oa/dedup/SparkCreateSimRels.java | 2 +- .../dhp/oa/dedup/SparkPrepareNewOrgs.java | 21 -------- .../oa/dedup/openorgs/oozie_app/workflow.xml | 54 +++++++++++++++---- 4 files changed, 47 insertions(+), 32 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 229214229..bfc605039 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -154,7 +154,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { (FlatMapFunction) cc -> ccToMergeRel(cc, dedupConf), Encoders.bean(Relation.class)); - mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); + mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index 96693ebf0..884967364 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -109,7 +109,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { .rdd(), Encoders.bean(Relation.class)); - saveParquet(simRels, outputPath, SaveMode.Append); + saveParquet(simRels, outputPath, SaveMode.Overwrite); log.info("Generated " + simRels.count() + " Similarity Relations"); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index 52ef4b39f..657d5a832 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -69,10 +69,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .map(Integer::valueOf) .orElse(NUM_CONNECTIONS); - final String apiUrl = Optional - .ofNullable(parser.get("apiUrl")) - .orElse(""); - final String dbUrl = parser.get("dbUrl"); final String dbTable = parser.get("dbTable"); final String dbUser = parser.get("dbUser"); @@ -83,7 +79,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); log.info("numPartitions: '{}'", numConnections); - log.info("apiUrl: '{}'", apiUrl); log.info("dbUrl: '{}'", dbUrl); log.info("dbUser: '{}'", dbUser); log.info("table: '{}'", dbTable); @@ -106,10 +101,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .write() .mode(SaveMode.Append) .jdbc(dbUrl, dbTable, connectionProperties); - - if (!apiUrl.isEmpty()) - updateSimRels(apiUrl); - } public static Dataset createNewOrgs( @@ -198,18 +189,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { } - private static String updateSimRels(final String apiUrl) throws IOException { - - log.info("Updating simrels on the portal"); - - final HttpGet req = new HttpGet(apiUrl); - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - return IOUtils.toString(response.getEntity().getContent()); - } - } - } - private static boolean filterRels(Relation rel, String entityType) { switch (entityType) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index 060d979da..30442406c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -12,6 +12,22 @@ actionSetId id of the actionSet + + apiUrl + OpenOrgs API to finalise the suggestions import procedure + + + dbUrl + jdbc URL of the OpenOrgs database + + + dbUser + username to access the OpenOrgs database + + + dbPwd + password to access the OpenOrgs database + workingPath path for the working directory @@ -75,7 +91,17 @@ - + + + + + ${wf:conf('resumeFrom') eq 'CreateSimRels'} + ${wf:conf('resumeFrom') eq 'CreateMergeRels'} + ${wf:conf('resumeFrom') eq 'PrepareOrgRels'} + ${wf:conf('resumeFrom') eq 'update_openorgs'} + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -83,8 +109,8 @@ - - + + @@ -109,7 +135,7 @@ --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetIdOpenorgs} + --actionSetId${actionSetId} --workingPath${workingPath} --numPartitions1000 @@ -138,7 +164,7 @@ --graphBasePath${graphBasePath} --isLookUpUrl${isLookUpUrl} --workingPath${workingPath} - --actionSetId${actionSetIdOpenorgs} + --actionSetId${actionSetId} --numPartitions1000 @@ -165,7 +191,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetIdOpenorgs} + --actionSetId${actionSetId} --cutConnectedComponent${cutConnectedComponent} @@ -192,7 +218,7 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetIdOpenorgs} + --actionSetId${actionSetId} --dbUrl${dbUrl} --dbTable${dbTable} --dbUser${dbUser} @@ -223,14 +249,24 @@ --graphBasePath${graphBasePath} --workingPath${workingPath} --isLookUpUrl${isLookUpUrl} - --actionSetId${actionSetIdOpenorgs} - --apiUrl${apiUrl} + --actionSetId${actionSetId} --dbUrl${dbUrl} --dbTable${dbTable} --dbUser${dbUser} --dbPwd${dbPwd} --numConnections20 + + + + + + + ${jobTracker} + ${nameNode} + /usr/bin/curl + ${apiUrl} +