1
0
Fork 0

adjusting workflow definition

This commit is contained in:
Miriam Baglioni 2023-12-09 15:20:11 +01:00
parent 616622d2bb
commit d4eedada71
11 changed files with 66 additions and 41 deletions

View File

@ -169,7 +169,9 @@ public class Utils implements Serializable {
} }
public static List<String> getCommunityIdList(String baseURL) throws IOException { public static List<String> getCommunityIdList(String baseURL) throws IOException {
return getValidCommunities(baseURL).stream() return getValidCommunities(baseURL)
.map(community -> community.getId()).collect(Collectors.toList()); .stream()
.map(community -> community.getId())
.collect(Collectors.toList());
} }
} }

View File

@ -105,7 +105,6 @@ public class SparkBulkTagJob {
Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap(); Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
for (String ds : datasources.collectAsList()) { for (String ds : datasources.collectAsList()) {
// final String dsId = ds.substring(3);
if (!dsm.containsKey(ds)) { if (!dsm.containsKey(ds)) {
ArrayList<Pair<String, SelectionConstraints>> eoscList = new ArrayList<>(); ArrayList<Pair<String, SelectionConstraints>> eoscList = new ArrayList<>();
dsm.put(ds, eoscList); dsm.put(ds, eoscList);
@ -116,13 +115,11 @@ public class SparkBulkTagJob {
private static boolean isOKDatasource(Datasource ds) { private static boolean isOKDatasource(Datasource ds) {
final String compatibility = ds.getOpenairecompatibility().getClassid(); final String compatibility = ds.getOpenairecompatibility().getClassid();
boolean isOk = (compatibility.equalsIgnoreCase(OPENAIRE_3) || return (compatibility.equalsIgnoreCase(OPENAIRE_3) ||
compatibility.equalsIgnoreCase(OPENAIRE_4) || compatibility.equalsIgnoreCase(OPENAIRE_4) ||
compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || compatibility.equalsIgnoreCase(OPENAIRE_CRIS) ||
compatibility.equalsIgnoreCase(OPENAIRE_DATA)) && compatibility.equalsIgnoreCase(OPENAIRE_DATA)) &&
ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC)); ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC));
return isOk;
} }
private static <R extends Result> void execBulkTag( private static <R extends Result> void execBulkTag(
@ -151,12 +148,12 @@ public class SparkBulkTagJob {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + e.name());//writing the tagging in the working dir for entity .json(outputPath + e.name());// writing the tagging in the working dir for entity
readPath(spark, outputPath + e.name(), resultClazz) //copy the tagging in the actual result output path readPath(spark, outputPath + e.name(), resultClazz) // copy the tagging in the actual result output path
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath + e.name()); .json(inputPath + e.name());
}); });

View File

@ -66,7 +66,7 @@ public class PrepareDatasourceCountryAssociation {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
//removeOutputDir(spark, outputPath); // removeOutputDir(spark, outputPath);
prepareDatasourceCountryAssociation( prepareDatasourceCountryAssociation(
spark, spark,
Arrays.asList(parser.get("whitelist").split(";")), Arrays.asList(parser.get("whitelist").split(";")),

View File

@ -100,7 +100,7 @@ public class SparkCountryPropagationJob {
readPath(spark, outputPath, resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(sourcePath); .json(sourcePath);
} }

View File

@ -96,7 +96,7 @@ public class SparkResultToCommunityFromOrganizationJob {
readPath(spark, outputPath + e.name(), resultClazz) readPath(spark, outputPath + e.name(), resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath + e.name()); .json(inputPath + e.name());
} }
}); });

View File

@ -106,7 +106,7 @@ public class SparkResultToCommunityFromProject implements Serializable {
readPath(spark, outputPath + e.name(), resultClazz) readPath(spark, outputPath + e.name(), resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath + e.name()); .json(inputPath + e.name());
} }
}); });

View File

@ -8,7 +8,6 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.api.Utils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
@ -17,6 +16,7 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson; import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList; import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;

View File

@ -104,7 +104,7 @@ public class SparkResultToCommunityThroughSemRelJob {
readPath(spark, outputPath, resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath);
} }

View File

@ -0,0 +1,15 @@
sourcePath=/tmp/beta_provision/graph/09_graph_dedup_enriched
resumeFrom=OrcidPropagation
allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo
allowedsemrelsresultproject=isSupplementedBy;isSupplementTo
allowedsemrelscommunitysemrel=isSupplementedBy;isSupplementTo
datasourceWhitelistForCountryPropagation=10|openaire____::3795d6478e30e2c9f787d427ff160944;10|opendoar____::16e6a3326dd7d868cbc926602a61e4d0;10|eurocrisdris::fe4903425d9040f680d8610d9079ea14
allowedtypes=pubsrepository::institutional
outputPath=/tmp/miriam/enrichment_one_step
organizationtoresultcommunitymap={"20|corda__h2020::3fb05a9524c3f790391261347852f638":["mes","euromarine"], "20|corda__h2020::e8dbe14cca9bf6fce09d468872f813f8":["mes","euromarine"], "20|snsf________::9b253f265e3bef5cae6d881fdf61aceb":["mes","euromarine"],"20|ukri________::e054eea0a47665af8c3656b5785ccf76":["mes","euromarine"],"20|corda__h2020::edc18d67c9b11fb616ca9f6e1db1b151":["mes","euromarine"],"20|ukri________::d5736d9da90521ddcdc7828a05a85e9a":["mes","euromarine"],"20|corda__h2020::f5d418d3aa1cf817ddefcc3fdc039f27":["mes","euromarine"],"20|snsf________::8fa091f8f25a846779acb4ea97b50aef":["mes","euromarine"],"20|corda__h2020::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|corda_______::81e020977211c2c40fae2e1a50bffd71":["mes","euromarine"],"20|snsf________::31d0a100e54e3cdb3c6f52d91e638c78":["mes","euromarine"],"20|corda__h2020::ea379ef91b8cc86f9ac5edc4169292db":["mes","euromarine"],"20|corda__h2020::f75ee2ee48e5cb0ec8c8d30aaa8fef70":["mes","euromarine"],"20|ukri________::e16010089551a1a9182a94604fc0ea59":["mes","euromarine"],"20|corda__h2020::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|corda_______::38531a2cce7c5c347ffc439b07c1f43b":["mes","euromarine"],"20|grid________::b2cbbf5eadbbf87d534b022bad3191d7":["mes","euromarine"],"20|snsf________::74730ef1439d7f7636a8be58a6b471b8":["mes","euromarine"],"20|nsf_________::ad72e19043a5a467e35f9b444d11563e":["mes","euromarine"],"20|ukri________::0fc3e92500290902a2d38ec2445e74c3":["mes","euromarine"],"20|grid________::ad2c29905da0eb3c06b3fa80cacd89ea":["mes","euromarine"],"20|corda__h2020::30b53e4d63d3724f00acb9cbaca40860":["mes","euromarine"],"20|corda__h2020::f60f84bee14ad93f0db0e49af1d5c317":["mes","euromarine"], "20|corda__h2020::7bf251ac3765b5e89d82270a1763d09f":["mes","euromarine"], "20|corda__h2020::65531bd11be9935948c7f2f4db1c1832":["mes","euromarine"], "20|corda__h2020::e0e98f86bbc76638bbb72a8fe2302946":["mes","euromarine"], "20|snsf________::3eb43582ac27601459a8d8b3e195724b":["mes","euromarine"], "20|corda__h2020::af2481dab65d06c8ea0ae02b5517b9b6":["mes","euromarine"], "20|corda__h2020::c19d05cfde69a50d3ebc89bd0ee49929":["mes","euromarine"], "20|corda__h2020::af0bfd9fc09f80d9488f56d71a9832f0":["mes","euromarine"], "20|ukri________::f33c02afb0dc66c49d0ed97ca5dd5cb0":["beopen"], "20|grid________::a867f78acdc5041b34acfe4f9a349157":["beopen"], "20|grid________::7bb116a1a9f95ab812bf9d2dea2be1ff":["beopen"], "20|corda__h2020::6ab0e0739dbe625b99a2ae45842164ad":["beopen"], "20|corda__h2020::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda_______::8ba50792bc5f4d51d79fca47d860c602":["beopen"], "20|corda__h2020::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::e70e9114979e963eef24666657b807c3":["beopen"], "20|corda_______::15911e01e9744d57205825d77c218737":["beopen"], "20|opendoar____::056a41e24e2a9a67215e87bbee6a80ab":["beopen"], "20|opendoar____::7f67f2e6c6fbb0628f8160fcd3d92ae3":["beopen"], "20|grid________::a8ecfd7c084e561168bcbe6bf0daf3e3":["beopen"], "20|corda_______::7bbe6cc5d8ec1864739a04b0d020c9e9":["beopen"], "20|corda_______::3ff558e30c2e434d688539548300b050":["beopen"], "20|corda__h2020::5ffee5b3b83b33a8cf0e046877bd3a39":["beopen"], "20|corda__h2020::5187217e2e806a6df3579c46f82401bc":["beopen"], "20|grid________::5fa7e2709bcd945e26bfa18689adeec1":["beopen"], "20|corda_______::d8696683c53027438031a96ad27c3c07":["beopen"], "20|corda__h2020::d8696683c53027438031a96ad27c3c07":["beopen"], "20|ukri________::23a79ebdfa59790864e4a485881568c1":["beopen"], "20|corda__h2020::b76cf8fe49590a966953c37e18608af9":["beopen"], "20|grid________::d2f0204126ee709244a488a4cd3b91c2":["beopen"], "20|corda__h2020::05aba9d2ed17533d15221e5655ac11e6":["beopen"], "20|grid________::802401579481dc32062bdee69f5e6a34":["beopen"], "20|corda__h2020::3f6d9d54cac975a517ba6b252c81582d":["beopen"], "20|openorgs____::d11f981828c485cd23d93f7f24f24db1":["eut"], "20|openorgs____::e66fe5dd092752e1dd6fd29fc699933a":["eut"], "20|openorgs____::526468206bca24c1c90da6a312295cf4":["eut"], "20|openorgs____::08e311e656e65ccb32e07c66b15b6ff7":["eut"], "20|openorgs____::55a1f889758964b77682904218fdb298":["eut"], "20|openorgs____::530092b6970d60a5329beb9f39e8d7d4":["eut"], "20|openorgs____::aadafa39392b3e200102596a3a4aad9d":["eut"], "20|openorgs____::c3fe999c74fad308132b8a5971367dce":["eut"], "20|openorgs____::1624ff7c01bb641b91f4518539a0c28a":["aurora"], "20|openorgs____::cdda7cfe17c89eb50628ec2eb1f8acd2":["aurora"], "20|openorgs____::818b75030e0e40612d69e049843ede7e":["aurora"], "20|openorgs____::0b0102bae51f4f4ef5ba57fbe1523b92":["aurora"], "20|openorgs____::ed47496b44722f0e9d7b98898189be0d":["aurora"], "20|openorgs____::eb0669daa9efeb898a3090d8aac7c953":["aurora"], "20|openorgs____::eb391317ed0dc684aa81ac16265de041":["aurora"], "20|openorgs____::f7cfcc98245e22c7d6e321cde930e746":["aurora"], "20|openorgs____::f33179d3306ba2599f7a898b056b604f":["aurora"], "20|pending_org_::75c41e6dd18466709ef359323d96fa05":["aurora"]}
pathMap={"author" : "$['author'][*]['fullname']", "title" : "$['title'][*]['value']", "orcid":"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']", "contributor" : "$['contributor'][*]['value']", "description" : "$['description'][*]['value']"}
blacklist=empty
allowedpids=orcid;orcid_pending
baseURL = https://services.openaire.eu/openaire/community/

View File

@ -51,7 +51,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg> <arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--workingPath</arg><arg>${workingDir}/bulktag/</arg> <arg>--outputPath</arg><arg>${workingDir}/bulktag/</arg>
<arg>--pathMap</arg><arg>${pathMap}</arg> <arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--baseURL</arg><arg>${baseURL}</arg> <arg>--baseURL</arg><arg>${baseURL}</arg>
</spark> </spark>

View File

@ -80,7 +80,14 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="copy_wait" to="prepare_relations"/> <join name="copy_wait" to="fork_prepare_assoc_step1"/>
<fork name="fork_prepare_assoc_step1">
<path start="join_prepare_publication"/>
<path start="join_prepare_dataset"/>
<path start="join_prepare_otherresearchproduct"/>
<path start="join_prepare_software"/>
</fork>
<action name="join_prepare_publication"> <action name="join_prepare_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -258,6 +265,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/publication</arg> <arg>--sourcePath</arg><arg>${sourcePath}/publication</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${outputPath}/publication</arg> <arg>--outputPath</arg><arg>${outputPath}/publication</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -288,6 +296,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg> <arg>--sourcePath</arg><arg>${sourcePath}/dataset</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg> <arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -318,6 +327,7 @@
<arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg> <arg>--sourcePath</arg><arg>${sourcePath}/otherresearchproduct</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg> <arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
@ -348,21 +358,22 @@
<arg>--sourcePath</arg><arg>${sourcePath}/software</arg> <arg>--sourcePath</arg><arg>${sourcePath}/software</arg>
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg> <arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${outputPath}/software</arg> <arg>--outputPath</arg><arg>${outputPath}/software</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
</spark> </spark>
<ok to="wait2"/> <ok to="wait2"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<join name="wait2" to="reset_workingDir"/> <join name="wait2" to="End"/>
<action name="reset_workingDir"> <!-- <action name="reset_workingDir">-->
<fs> <!-- <fs>-->
<delete path="${workingDir}"/> <!-- <delete path="${workingDir}"/>-->
<mkdir path="${workingDir}"/> <!-- <mkdir path="${workingDir}"/>-->
</fs> <!-- </fs>-->
<ok to="End"/> <!-- <ok to="End"/>-->
<error to="Kill"/> <!-- <error to="Kill"/>-->
</action> <!-- </action>-->
<end name="End"/> <end name="End"/>