[Enrichment WF] third attempt to make it run in a single step

This commit is contained in:
Miriam Baglioni 2022-04-11 09:58:39 +02:00
parent 9bd5310112
commit a6c26a9e0e
43 changed files with 78 additions and 49 deletions

View File

@ -113,10 +113,10 @@ public class SparkBulkTagJob {
.json(outputPath); .json(outputPath);
readPath(spark, outputPath, resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath);
} }

View File

@ -97,11 +97,11 @@ public class SparkCountryPropagationJob {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath); .json(outputPath);
readPath(spark,outputPath,resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(sourcePath); .json(sourcePath);
} }

View File

@ -56,12 +56,6 @@ public class SparkOrcidToResultFromSemRelJob {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("saveGraph: {}", saveGraph);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName); Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -72,9 +66,7 @@ public class SparkOrcidToResultFromSemRelJob {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
if (saveGraph) { execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
}
}); });
} }

View File

@ -100,11 +100,11 @@ public class SparkResultToCommunityFromOrganizationJob {
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath); .json(outputPath);
readPath(spark,outputPath,resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath);
} }
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() { private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() {

View File

@ -102,10 +102,10 @@ public class SparkResultToCommunityThroughSemRelJob {
.json(outputPath); .json(outputPath);
readPath(spark, outputPath, resultClazz) readPath(spark, outputPath, resultClazz)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath); .json(inputPath);
} }
private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() { private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {

View File

@ -124,10 +124,10 @@ public class SparkResultToOrganizationFromIstRepoJob {
.json(outputPath); .json(outputPath);
readPath(spark, outputPath, Relation.class) readPath(spark, outputPath, Relation.class)
.write() .write()
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression","gzip") .option("compression", "gzip")
.json(inputPath.substring(0, inputPath.indexOf("/") + 1) + "relation"); .json(inputPath.substring(0, inputPath.indexOf("/") + 1) + "relation");
} }
private static FlatMapFunction<Tuple2<KeyValueSet, KeyValueSet>, Relation> createRelationFn() { private static FlatMapFunction<Tuple2<KeyValueSet, KeyValueSet>, Relation> createRelationFn() {

View File

@ -5,12 +5,6 @@
"paramDescription": "the path of the sequencial file to read", "paramDescription": "the path of the sequencial file to read",
"paramRequired": true "paramRequired": true
}, },
{
"paramName":"sg",
"paramLongName":"saveGraph",
"paramDescription": "true if the new version of the graph must be saved",
"paramRequired": false
},
{ {
"paramName":"h", "paramName":"h",
"paramLongName":"hive_metastore_uris", "paramLongName":"hive_metastore_uris",

View File

@ -2,7 +2,7 @@
orcid_propagation classpath eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app orcid_propagation classpath eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app
bulk_tagging classpath eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app bulk_tagging classpath eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app
affiliation_inst_repo classpath eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app affiliation_inst_repo classpath eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfrominstrepo/oozie_app
affiliation_semantic_relation classpath eu/dnetlib/dhp/wf/subworkflows/resuttoorganizationfromsemrel/oozie_app affiliation_semantic_relation classpath eu/dnetlib/dhp/wf/subworkflows/resulttoorganizationfromsemrel/oozie_app
community_organization classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app community_organization classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/oozie_app
result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app
community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app

View File

@ -1,4 +1,4 @@
<workflow-app name="dump_graph" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="enrichment_main" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
@ -6,7 +6,7 @@
<description>the source path</description> <description>the source path</description>
</property> </property>
<property> <property>
<name>allowedsemrelsorginstrepo</name> <name>allowedsemrelsorcidprop</name>
<description>the semantic relationships allowed for propagation</description> <description>the semantic relationships allowed for propagation</description>
</property> </property>
<property> <property>
@ -18,7 +18,7 @@
<description>the semantic relationships allowed for propagation</description> <description>the semantic relationships allowed for propagation</description>
</property> </property>
<property> <property>
<name>whitelist</name> <name>datasourceWhitelistForCountryPropagation</name>
<description>the white list</description> <description>the white list</description>
</property> </property>
<property> <property>
@ -118,7 +118,13 @@
<decision name="resumeFrom"> <decision name="resumeFrom">
<switch> <switch>
<case to="send_zenodo">${wf:conf('onlyUpload') eq true}</case> <case to="bulk_tagging">${wf:conf('resumeFrom') eq 'BulkTagging'}</case>
<case to="affiliation_inst_repo">${wf:conf('resumeFrom') eq 'AffiliationInstitutionalRepository'}</case>
<case to="affiliation_semantic_relation">${wf:conf('resumeFrom') eq 'AffiliationSemanticRelation'}</case>
<case to="community_organization">${wf:conf('resumeFrom') eq 'CommunityOrganization'}</case>
<case to="result_project">${wf:conf('resumeFrom') eq 'ResultProject'}</case>
<case to="community_sem_rel">${wf:conf('resumeFrom') eq 'CommunitySemanticRelation'}</case>
<case to="country_propagation">${wf:conf('resumeFrom') eq 'CountryPropagation'}</case>
<default to="orcid_propagation"/> <default to="orcid_propagation"/>
</switch> </switch>
</decision> </decision>
@ -136,7 +142,7 @@
</property> </property>
<property> <property>
<name>allowedsemrels</name> <name>allowedsemrels</name>
<value>${allowedsemrelsorginstrepo}</value> <value>${allowedsemrelsorcidprop}</value>
</property> </property>
<property> <property>
<name>outputPath</name> <name>outputPath</name>
@ -308,7 +314,7 @@
</property> </property>
<property> <property>
<name>whitelist</name> <name>whitelist</name>
<value>${whitelist}</value> <value>${datasourceWhitelistForCountryPropagation}</value>
</property> </property>
<property> <property>
<name>allowedtypes</name> <name>allowedtypes</name>

View File

@ -36,7 +36,49 @@
<path start="copy_datasources"/> <path start="copy_datasources"/>
</fork> </fork>
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_prepare_assoc_step1"/> <join name="copy_wait" to="fork_prepare_assoc_step1"/>

View File

@ -80,7 +80,6 @@ public class OrcidPropagationJobTest {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath, "-sourcePath", sourcePath,
"-hive_metastore_uris", "", "-hive_metastore_uris", "",
"-saveGraph", "true",
"-resultTableName", Dataset.class.getCanonicalName(), "-resultTableName", Dataset.class.getCanonicalName(),
"-outputPath", workingDir.toString() + "/dataset", "-outputPath", workingDir.toString() + "/dataset",
"-possibleUpdatesPath", possibleUpdatesPath "-possibleUpdatesPath", possibleUpdatesPath
@ -125,8 +124,6 @@ public class OrcidPropagationJobTest {
.getPath(), .getPath(),
"-hive_metastore_uris", "-hive_metastore_uris",
"", "",
"-saveGraph",
"true",
"-resultTableName", "-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", "-outputPath",
@ -193,8 +190,6 @@ public class OrcidPropagationJobTest {
.getPath(), .getPath(),
"-hive_metastore_uris", "-hive_metastore_uris",
"", "",
"-saveGraph",
"true",
"-resultTableName", "-resultTableName",
"eu.dnetlib.dhp.schema.oaf.Dataset", "eu.dnetlib.dhp.schema.oaf.Dataset",
"-outputPath", "-outputPath",