dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-04 17:41:31 +02:00
parent 3d1b637cab
commit eb2f5f3198
2 changed files with 151 additions and 9 deletions

View File

@ -119,7 +119,7 @@ public class CreateRelatedEntitiesJob_phase2 {
return re; return re;
}, Encoders.bean(EntityRelEntity.class)) }, Encoders.bean(EntityRelEntity.class))
.write() .write()
.mode(SaveMode.Append) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);
} }

View File

@ -98,10 +98,20 @@
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg> <arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg> <arg>--outputPath</arg><arg>${workingDir}/relation</arg>
</spark> </spark>
<ok to="prepare_publication_table"/> <ok to="fork_join_related_entities"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<fork name="fork_join_related_entities">
<path start="join_relation_publication"/>
<path start="join_relation_dataset"/>
<path start="join_relation_otherresearchproduct"/>
<path start="join_relation_software"/>
<path start="join_relation_datasource"/>
<path start="join_relation_organization"/>
<path start="join_relation_project"/>
</fork>
<action name="join_relation_publication"> <action name="join_relation_publication">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -124,7 +134,7 @@
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg> <arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark> </spark>
<ok to="join_relation_dataset"/> <ok to="join_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
@ -150,15 +160,147 @@
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg> <arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg> <arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark> </spark>
<ok to="join_all_entities"/> <ok to="join_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_relation_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = otherresearchproduct.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<error to="Kill"/>
</action>
<action name="join_relation_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = software.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<error to="Kill"/>
</action>
<action name="join_relation_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = datasource.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<error to="Kill"/>
</action>
<action name="join_relation_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = organization.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<error to="Kill"/>
</action>
<action name="join_relation_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = project.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relations</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
</spark>
<ok to="join_relation"/>
<error to="Kill"/>
</action>
<join name="join_relation" to="join_all_entities"/>
<action name="join_all_entities"> <action name="join_all_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Join[relation.target = dataset.id]</name> <name>Join[entities.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class> <class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar> <jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -171,8 +313,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}</arg> <arg>--inputEntityPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg> <arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
</spark> </spark>
<ok to="adjancency_lists"/> <ok to="adjancency_lists"/>
@ -196,7 +338,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--inputPath</arg> <arg>${${workingDir}/join_entities</arg> <arg>--inputPath</arg> <arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/joined</arg> <arg>--outputPath</arg><arg>${workingDir}/joined</arg>
</spark> </spark>
<ok to="convert_to_xml"/> <ok to="convert_to_xml"/>
@ -220,7 +362,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${${workingDir}/joined</arg> <arg>--inputPath</arg><arg>${workingDir}/joined</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg> <arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg> <arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
@ -246,8 +388,8 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--isLookupUrl</arg> <arg>${isLookupUrl}</arg>
<arg>--inputPath</arg><arg>${workingDir}/xml</arg> <arg>--inputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg> <arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg> <arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg> <arg>--batchSize</arg><arg>${batchSize}</arg>
</spark> </spark>