From d4eedada71436a7cae1a5ab154598503b8f36e91 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Sat, 9 Dec 2023 15:20:11 +0100 Subject: [PATCH] adjusting workflow definition --- .../main/java/eu/dnetlib/dhp/api/Utils.java | 6 ++-- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 17 +++++----- .../PrepareDatasourceCountryAssociation.java | 2 +- .../SparkCountryPropagationJob.java | 8 ++--- ...kResultToCommunityFromOrganizationJob.java | 8 ++--- .../SparkResultToCommunityFromProject.java | 8 ++--- .../PrepareResultCommunitySetStep1.java | 2 +- ...parkResultToCommunityThroughSemRelJob.java | 8 ++--- .../eu/dnetlib/dhp/wf/main/job.properties | 15 +++++++++ .../bulktag/oozie_app/workflow.xml | 2 +- .../oozie_app/workflow.xml | 31 +++++++++++++------ 11 files changed, 66 insertions(+), 41 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index bb30f55d6..06d0f95c2 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -169,7 +169,9 @@ public class Utils implements Serializable { } public static List getCommunityIdList(String baseURL) throws IOException { - return getValidCommunities(baseURL).stream() - .map(community -> community.getId()).collect(Collectors.toList()); + return getValidCommunities(baseURL) + .stream() + .map(community -> community.getId()) + .collect(Collectors.toList()); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 5745515ba..51307ccd1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -105,7 +105,6 @@ public class SparkBulkTagJob { Map>> dsm = cc.getEoscDatasourceMap(); for (String ds : datasources.collectAsList()) { - // final String dsId = ds.substring(3); if (!dsm.containsKey(ds)) { ArrayList> eoscList = new ArrayList<>(); dsm.put(ds, eoscList); @@ -116,13 +115,11 @@ public class SparkBulkTagJob { private static boolean isOKDatasource(Datasource ds) { final String compatibility = ds.getOpenairecompatibility().getClassid(); - boolean isOk = (compatibility.equalsIgnoreCase(OPENAIRE_3) || + return (compatibility.equalsIgnoreCase(OPENAIRE_3) || compatibility.equalsIgnoreCase(OPENAIRE_4) || compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || compatibility.equalsIgnoreCase(OPENAIRE_DATA)) && ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC)); - - return isOk; } private static void execBulkTag( @@ -151,13 +148,13 @@ public class SparkBulkTagJob { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + e.name());//writing the tagging in the working dir for entity + .json(outputPath + e.name());// writing the tagging in the working dir for entity - readPath(spark, outputPath + e.name(), resultClazz) //copy the tagging in the actual result output path - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + e.name()); + readPath(spark, outputPath + e.name(), resultClazz) // copy the tagging in the actual result output path + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + e.name()); }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java index b1720d19d..2ffe6f36d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareDatasourceCountryAssociation.java @@ -66,7 +66,7 @@ public class PrepareDatasourceCountryAssociation { conf, isSparkSessionManaged, spark -> { - //removeOutputDir(spark, outputPath); + // removeOutputDir(spark, outputPath); prepareDatasourceCountryAssociation( spark, Arrays.asList(parser.get("whitelist").split(";")), diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java index 2b0dd7628..17247f812 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/countrypropagation/SparkCountryPropagationJob.java @@ -98,10 +98,10 @@ public class SparkCountryPropagationJob { .json(outputPath); readPath(spark, outputPath, resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(sourcePath); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(sourcePath); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java index 9152b1f5a..adb7feef7 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/SparkResultToCommunityFromOrganizationJob.java @@ -94,10 +94,10 @@ public class SparkResultToCommunityFromOrganizationJob { .json(outputPath + e.name()); readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + e.name()); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + e.name()); } }); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java index 547891584..229ac7e32 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromproject/SparkResultToCommunityFromProject.java @@ -104,10 +104,10 @@ public class SparkResultToCommunityFromProject implements Serializable { .json(outputPath + e.name()); readPath(spark, outputPath + e.name(), resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + e.name()); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + e.name()); } }); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java index 73c4e2d7c..40c074a6e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/PrepareResultCommunitySetStep1.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import eu.dnetlib.dhp.api.Utils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.sql.*; @@ -17,6 +16,7 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; +import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.schema.oaf.Relation; diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java index bb7ff1fb7..a10737849 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromsemrel/SparkResultToCommunityThroughSemRelJob.java @@ -102,10 +102,10 @@ public class SparkResultToCommunityThroughSemRelJob { .json(outputPath); readPath(spark, outputPath, resultClazz) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath); } private static MapFunction, R> contextUpdaterFn() { diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties new file mode 100644 index 000000000..6b9b5063f --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -0,0 +1,15 @@ +sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched +resumeFrom=OrcidPropagation +allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo +allowedsemrelsresultproject=isSupplementedBy;isSupplementTo +allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo +datasourceWhitelistForCountryPropagation=10|openaire____::3795d6478e30e2c9f787d427ff160944;10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14 +allowedtypes=pubsrepository::institutional +outputPath=/tmp/miriam/enrichment_one_step +organizationtoresultcommunitymap={"20|corda__h2020::3fb05a9524c3f790391261347852f638":["mes","euromarine"], "20|corda__h2020::e8dbe14cca9bf6fce09d468872f813f8":["mes","euromarine"], "20|snsf________::9b253f265e3bef5cae6d881fdf61aceb":["mes","euromarine"],"20|ukri________::e054eea0a47665af8c3656b5785ccf76":["mes","euromarine"],"20|corda__h2020::edc18d67c9b11fb616ca9f6e1db1b151":["mes","euromarine"],"20|ukri________::d5736d9da90521ddcdc7828a05a85e9a":["mes","euromarine"],"20|corda__h2020::f5d418d3aa1cf817ddefcc3fdc039f27":["mes","euromarine"],"20|snsf________::8fa091f8f25a846779acb4ea97b50aef":["mes","euromarine"],"20|corda__h2020::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|corda_______::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|snsf________::31d0a100e54e3cdb3c6f52d91e638c78":["mes","euromarine"],"20|corda__h2020::ea379ef91b8cc86f9ac5edc4169292db":["mes","euromarine"],"20|corda__h2020::f75ee2ee48e5cb0ec8c8d30aaa8fef70":["mes","euromarine"],"20|ukri________::e16010089551a1a9182a94604fc0ea59":["mes","euromarine"],"20|corda__h2020::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|corda_______::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|grid________::b2cbbf5eadbbf87d534b022bad3191d7":["mes","euromarine"],"20|snsf________::74730ef1439d7f7636a8be58a6b471b8":["mes","euromarine"],"20|nsf_________::ad72e19043a5a467e35f9b444d11563e":["mes","euromarine"],"20|ukri________::0fc3e92500290902a2d38ec2445e74c3":["mes","euromarine"],"20|grid________::ad2c29905da0eb3c06b3fa80cacd89ea":["mes","euromarine"],"20|corda__h2020::30b53e4d63d3724f00acb9cbaca40860":["mes","euromarine"],"20|corda__h2020::f60f84bee14ad93f0db0e49af1d5c317":["mes","euromarine"], "20|corda__h2020::7bf251ac3765b5e89d82270a1763d09f":["mes","euromarine"], "20|corda__h2020::65531bd11be9935948c7f2f4db1c1832":["mes","euromarine"], "20|corda__h2020::e0e98f86bbc76638bbb72a8fe2302946":["mes","euromarine"], "20|snsf________::3eb43582ac27601459a8d8b3e195724b":["mes","euromarine"], "20|corda__h2020::af2481dab65d06c8ea0ae02b5517b9b6":["mes","euromarine"], "20|corda__h2020::c19d05cfde69a50d3ebc89bd0ee49929":["mes","euromarine"], "20|corda__h2020::af0bfd9fc09f80d9488f56d71a9832f0":["mes","euromarine"], "20|ukri________::f33c02afb0dc66c49d0ed97ca5dd5cb0":["beopen"], "20|grid________::a867f78acdc5041b34acfe4f9a349157":["beopen"], "20|grid________::7bb116a1a9f95ab812bf9d2dea2be1ff":["beopen"], "20|corda__h2020::6ab0e0739dbe625b99a2ae45842164ad":["beopen"], "20|corda__h2020::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda_______::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda__h2020::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::15911e01e9744d57205825d77c218737":["beopen"], "20|opendoar____::056a41e24e2a9a67215e87bbee6a80ab":["beopen"], "20|opendoar____::7f67f2e6c6fbb0628f8160fcd3d92ae3":["beopen"], "20|grid________::a8ecfd7c084e561168bcbe6bf0daf3e3":["beopen"], "20|corda_______::7bbe6cc5d8ec1864739a04b0d020c9e9":["beopen"], "20|corda_______::3ff558e30c2e434d688539548300b050":["beopen"], "20|corda__h2020::5ffee5b3b83b33a8cf0e046877bd3a39":["beopen"], "20|corda__h2020::5187217e2e806a6df3579c46f82401bc":["beopen"], "20|grid________::5fa7e2709bcd945e26bfa18689adeec1":["beopen"], "20|corda_______::d8696683c53027438031a96ad27c3c07":["beopen"], "20|corda__h2020::d8696683c53027438031a96ad27c3c07":["beopen"], "20|ukri________::23a79ebdfa59790864e4a485881568c1":["beopen"], "20|corda__h2020::b76cf8fe49590a966953c37e18608af9":["beopen"], "20|grid________::d2f0204126ee709244a488a4cd3b91c2":["beopen"], "20|corda__h2020::05aba9d2ed17533d15221e5655ac11e6":["beopen"], "20|grid________::802401579481dc32062bdee69f5e6a34":["beopen"], "20|corda__h2020::3f6d9d54cac975a517ba6b252c81582d":["beopen"], "20|openorgs____::d11f981828c485cd23d93f7f24f24db1":["eut"], "20|openorgs____::e66fe5dd092752e1dd6fd29fc699933a":["eut"], "20|openorgs____::526468206bca24c1c90da6a312295cf4":["eut"], "20|openorgs____::08e311e656e65ccb32e07c66b15b6ff7":["eut"], "20|openorgs____::55a1f889758964b77682904218fdb298":["eut"], "20|openorgs____::530092b6970d60a5329beb9f39e8d7d4":["eut"], "20|openorgs____::aadafa39392b3e200102596a3a4aad9d":["eut"], "20|openorgs____::c3fe999c74fad308132b8a5971367dce":["eut"], "20|openorgs____::1624ff7c01bb641b91f4518539a0c28a":["aurora"], "20|openorgs____::cdda7cfe17c89eb50628ec2eb1f8acd2":["aurora"], "20|openorgs____::818b75030e0e40612d69e049843ede7e":["aurora"], "20|openorgs____::0b0102bae51f4f4ef5ba57fbe1523b92":["aurora"], "20|openorgs____::ed47496b44722f0e9d7b98898189be0d":["aurora"], "20|openorgs____::eb0669daa9efeb898a3090d8aac7c953":["aurora"], "20|openorgs____::eb391317ed0dc684aa81ac16265de041":["aurora"], "20|openorgs____::f7cfcc98245e22c7d6e321cde930e746":["aurora"], "20|openorgs____::f33179d3306ba2599f7a898b056b604f":["aurora"], "20|pending_org_::75c41e6dd18466709ef359323d96fa05":["aurora"]} +pathMap={"author" : "$['author'][*]['fullname']", "title" : "$['title'][*]['value']", "orcid":"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']", "contributor" : "$['contributor'][*]['value']", "description" : "$['description'][*]['value']"} +blacklist=empty +allowedpids=orcid;orcid_pending +baseURL = https://services.openaire.eu/openaire/community/ + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index a735e2b0e..307997d4c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -51,7 +51,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --sourcePath${sourcePath}/ - --workingPath${workingDir}/bulktag/ + --outputPath${workingDir}/bulktag/ --pathMap${pathMap} --baseURL${baseURL} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml index 6d800d6e2..8e945ee5a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -80,7 +80,14 @@ - + + + + + + + + @@ -258,6 +265,7 @@ --sourcePath${sourcePath}/publication --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/publication + --hive_metastore_uris${hive_metastore_uris} @@ -288,6 +296,7 @@ --sourcePath${sourcePath}/dataset --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/dataset + --hive_metastore_uris${hive_metastore_uris} @@ -318,6 +327,7 @@ --sourcePath${sourcePath}/otherresearchproduct --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/otherresearchproduct + --hive_metastore_uris${hive_metastore_uris} @@ -348,21 +358,22 @@ --sourcePath${sourcePath}/software --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/software + --hive_metastore_uris${hive_metastore_uris} - + - - - - - - - - + + + + + + + +