copy or process the person records throughout the graph pipeline

This commit is contained in:
Claudio Atzori 2024-07-30 14:25:31 +02:00
parent 64740475d0
commit 9486e21a44
8 changed files with 215 additions and 0 deletions

View File

@ -363,6 +363,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
// nothing to clean here
} else if (value instanceof Project) {
// nothing to clean here
} else if (value instanceof Person) {
// nothing to clean here
} else if (value instanceof Organization) {
Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {

View File

@ -7,3 +7,4 @@ promote_action_payload_for_project_table classpath eu/dnetlib/dhp/actionmanager/
promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app
promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app
promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app
promote_action_payload_for_person_table classpath eu/dnetlib/dhp/actionmanager/wf/person/oozie_app

View File

@ -148,6 +148,7 @@
<path start="PromoteActionPayloadForPublicationTable"/>
<path start="PromoteActionPayloadForRelationTable"/>
<path start="PromoteActionPayloadForSoftwareTable"/>
<path start="PromoteActionPayloadForPersonTable"/>
</fork>
<action name="PromoteActionPayloadForDatasetTable">
@ -270,6 +271,21 @@
<error to="Kill"/>
</action>
<action name="PromoteActionPayloadForPersonTable">
<sub-workflow>
<app-path>${wf:appPath()}/promote_action_payload_for_person_table</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>inputActionPayloadRootPath</name>
<value>${workingDir}/action_payload_by_type</value>
</property>
</configuration>
</sub-workflow>
<ok to="JoinPromote"/>
<error to="Kill"/>
</action>
<join name="JoinPromote" to="End"/>
<end name="End"/>

View File

@ -0,0 +1,130 @@
<workflow-app name="promote_action_payload_for_person_table" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>activePromotePersonActionPayload</name>
<description>when true will promote actions with eu.dnetlib.dhp.schema.oaf.Person payload</description>
</property>
<property>
<name>inputGraphRootPath</name>
<description>root location of input materialized graph</description>
</property>
<property>
<name>inputActionPayloadRootPath</name>
<description>root location of action payloads to promote</description>
</property>
<property>
<name>outputGraphRootPath</name>
<description>root location for output materialized graph</description>
</property>
<property>
<name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="DecisionPromotePersonActionPayload"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="DecisionPromotePersonActionPayload">
<switch>
<case to="PromotePersonActionPayloadForPersonTable">
${(activePromotePersonActionPayload eq "true") and
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputGraphRootPath')),'/'),'person')) eq "true") and
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")}
</case>
<default to="SkipPromotePersonActionPayloadForPersonTable"/>
</switch>
</decision>
<action name="PromotePersonActionPayloadForPersonTable">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>PromotePersonActionPayloadForPersonTable</name>
<class>eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob</class>
<jar>dhp-actionmanager-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--inputActionPayloadPath</arg><arg>${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/person</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="SkipPromotePersonActionPayloadForPersonTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/person"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/person</arg>
<arg>${outputGraphRootPath}/person</arg>
</distcp>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -63,6 +63,7 @@
<path start="copy_software"/>
<path start="copy_datasource"/>
<path start="copy_project"/>
<path start="copy_person"/>
<path start="copy_organization"/>
</fork>
@ -120,6 +121,15 @@
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/person</arg>
<arg>${nameNode}/${outputPath}/person</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>

View File

@ -34,6 +34,7 @@
<path start="copy_organization"/>
<path start="copy_projects"/>
<path start="copy_datasources"/>
<path start="copy_persons"/>
</fork>
<action name="copy_relation">
@ -80,6 +81,17 @@
<error to="Kill"/>
</action>
<action name="copy_persons">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/person</arg>
<arg>${nameNode}/${outputPath}/person</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_prepare_assoc_step1"/>
<fork name="fork_prepare_assoc_step1">

View File

@ -89,6 +89,14 @@
<arg>${nameNode}/${graphPath}/project</arg>
<arg>${nameNode}/${targetPath}/project</arg>
</distcp>
<ok to="copy_person"/>
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphPath}/person</arg>
<arg>${nameNode}/${targetPath}/person</arg>
</distcp>
<ok to="copy_relation"/>
<error to="Kill"/>
</action>

View File

@ -142,6 +142,7 @@
<path start="clean_datasource"/>
<path start="clean_organization"/>
<path start="clean_project"/>
<path start="clean_person"/>
<path start="clean_relation"/>
</fork>
@ -390,6 +391,41 @@
<error to="Kill"/>
</action>
<action name="clean_person">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean person</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=2000
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--deepClean</arg><arg>${shouldClean}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_relation">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>