enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
6 changed files with 8 additions and 78 deletions
Showing only changes of commit d13e3d3f68 - Show all commits

View File

@ -44,9 +44,6 @@ public class GenerateEventsJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
@ -70,7 +67,7 @@ public class GenerateEventsJob {
ClusterUtils.removeDir(spark, eventsPath); ClusterUtils.removeDir(spark, eventsPath);
final Dataset<ResultGroup> groups = ClusterUtils final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, graphPath + "/relation", ResultGroup.class); .readPath(spark, workingPath + "/relation", ResultGroup.class);
final Dataset<Event> events = groups final Dataset<Event> events = groups
.map( .map(

View File

@ -43,9 +43,6 @@ public class JoinEntitiesJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
@ -59,16 +56,16 @@ public class JoinEntitiesJob {
ClusterUtils.removeDir(spark, joinedEntitiesPath); ClusterUtils.removeDir(spark, joinedEntitiesPath);
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
.readPath(spark, graphPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<OaBrokerMainEntity> r1 = join( final Dataset<OaBrokerMainEntity> r1 = join(
r0, ClusterUtils.readPath(spark, graphPath + "/relatedProjects", RelatedProject.class)); r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class));
final Dataset<OaBrokerMainEntity> r2 = join( final Dataset<OaBrokerMainEntity> r2 = join(
r1, ClusterUtils.readPath(spark, graphPath + "/relatedDatasets", RelatedDataset.class)); r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class));
final Dataset<OaBrokerMainEntity> r3 = join( final Dataset<OaBrokerMainEntity> r3 = join(
r2, ClusterUtils.readPath(spark, graphPath + "/relatedPublications", RelatedPublication.class)); r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class));
final Dataset<OaBrokerMainEntity> r4 = join( final Dataset<OaBrokerMainEntity> r4 = join(
r3, ClusterUtils.readPath(spark, graphPath + "/relatedSoftwares", RelatedSoftware.class)); r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class));
r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath); r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);

View File

@ -58,7 +58,7 @@ public class PrepareGroupsJob {
ClusterUtils.removeDir(spark, groupsPath); ClusterUtils.removeDir(spark, groupsPath);
final Dataset<OaBrokerMainEntity> results = ClusterUtils final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, graphPath + "/joinedEntities", OaBrokerMainEntity.class); .readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)

View File

@ -283,7 +283,6 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>

View File

@ -1,10 +1,4 @@
[ [
{
"paramName": "g",
"paramLongName": "graphPath",
"paramDescription": "the path where there the graph is stored",
"paramRequired": true
},
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "workingPath",

View File

@ -73,68 +73,12 @@
</configuration> </configuration>
</global> </global>
<start to="ensure_working_path"/> <start to="join_entities"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="ensure_working_path">
<fs>
<mkdir path='${workingPath}'/>
</fs>
<ok to="prepare_related_publications"/>
<error to="Kill"/>
</action>
<action name="prepare_related_publications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedPublicationsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_related_datasets"/>
<error to="Kill"/>
</action>
<action name="prepare_related_datasets">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedDatasetsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="join_entities"/>
<error to="Kill"/>
</action>
<action name="join_entities"> <action name="join_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -201,7 +145,6 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>