forked from D-Net/dnet-hadoop
Merge remote-tracking branch 'origin/master' into pubmed_to_production
This commit is contained in:
commit
96cd96f3c2
|
@ -14,6 +14,7 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -84,19 +85,26 @@ public class SparkCountryPropagationJob {
|
|||
Dataset<R> res = readPath(spark, sourcePath, resultClazz);
|
||||
|
||||
log.info("Reading prepared info: {}", preparedInfoPath);
|
||||
Dataset<ResultCountrySet> prepared = spark
|
||||
final Dataset<Row> preparedInfoRaw = spark
|
||||
.read()
|
||||
.json(preparedInfoPath)
|
||||
.as(Encoders.bean(ResultCountrySet.class));
|
||||
|
||||
res
|
||||
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
||||
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
.json(preparedInfoPath);
|
||||
|
||||
if (!preparedInfoRaw.isEmpty()) {
|
||||
final Dataset<ResultCountrySet> prepared = preparedInfoRaw.as(Encoders.bean(ResultCountrySet.class));
|
||||
res
|
||||
.joinWith(prepared, res.col("id").equalTo(prepared.col("resultId")), "left_outer")
|
||||
.map(getCountryMergeFn(), Encoders.bean(resultClazz))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
} else {
|
||||
res
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
}
|
||||
}
|
||||
|
||||
private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
|
||||
|
|
|
@ -105,13 +105,13 @@
|
|||
<join name="copy_wait" to="fork_exec_bulktag"/>
|
||||
|
||||
<fork name="fork_exec_bulktag">
|
||||
<path start="join_bulktag_publication"/>
|
||||
<path start="join_bulktag_dataset"/>
|
||||
<path start="join_bulktag_otherresearchproduct"/>
|
||||
<path start="join_bulktag_software"/>
|
||||
<path start="bulktag_publication"/>
|
||||
<path start="bulktag_dataset"/>
|
||||
<path start="bulktag_otherresearchproduct"/>
|
||||
<path start="bulktag_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="join_bulktag_publication">
|
||||
<action name="bulktag_publication">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
|
@ -138,7 +138,7 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_bulktag_dataset">
|
||||
<action name="bulktag_dataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
|
@ -165,7 +165,7 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_bulktag_otherresearchproduct">
|
||||
<action name="bulktag_otherresearchproduct">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
|
@ -192,7 +192,7 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_bulktag_software">
|
||||
<action name="bulktag_software">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
|
@ -269,7 +269,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<name>EOSC tagging publication</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
@ -296,7 +296,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<name>EOSC tagging dataset</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
@ -322,7 +322,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<name>EOSC tagging software</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
@ -348,7 +348,7 @@
|
|||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>EOSC_tagging</name>
|
||||
<name>EOSC tagging ORP</name>
|
||||
<class>eu.dnetlib.dhp.bulktag.eosc.SparkEoscBulkTag</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
|
|
|
@ -582,201 +582,7 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_clean_country" to="should_patch_datasource_ids"/>
|
||||
|
||||
<decision name="should_patch_datasource_ids">
|
||||
<switch>
|
||||
<case to="get_ds_master_duplicate">${wf:conf('shouldClean') eq true}</case>
|
||||
<default to="End"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="get_ds_master_duplicate">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.oa.graph.clean.MasterDuplicateAction</main-class>
|
||||
<arg>--postgresUrl</arg><arg>${postgresURL}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
<arg>--hdfsPath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
</java>
|
||||
<ok to="fork_patch_cfhb"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="fork_patch_cfhb">
|
||||
<path start="patch_publication_cfhb"/>
|
||||
<path start="patch_dataset_cfhb"/>
|
||||
<path start="patch_otherresearchproduct_cfhb"/>
|
||||
<path start="patch_software_cfhb"/>
|
||||
</fork>
|
||||
|
||||
<action name="patch_publication_cfhb">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>patch publication cfhb</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/publication</arg>
|
||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/publication</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/publication</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_cfhb"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="patch_dataset_cfhb">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>patch dataset cfhb</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/dataset</arg>
|
||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/dataset</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_cfhb"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="patch_otherresearchproduct_cfhb">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>patch otherresearchproduct cfhb</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/otherresearchproduct</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_cfhb"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="patch_software_cfhb">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>patch software cfhb</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.cfhb.CleanCfHbSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphOutputPath}/software</arg>
|
||||
<arg>--resolvedPath</arg><arg>${workingDir}/cfHbResolved/software</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/cfHbPatched/software</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean_cfhb"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_clean_cfhb" to="fork_copy_cfhb_patched_results"/>
|
||||
|
||||
<fork name="fork_copy_cfhb_patched_results">
|
||||
<path start="copy_cfhb_patched_publication"/>
|
||||
<path start="copy_cfhb_patched_dataset"/>
|
||||
<path start="copy_cfhb_patched_otherresearchproduct"/>
|
||||
<path start="copy_cfhb_patched_software"/>
|
||||
</fork>
|
||||
|
||||
<action name="copy_cfhb_patched_publication">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<prepare>
|
||||
<delete path="${graphOutputPath}/publication"/>
|
||||
</prepare>
|
||||
<arg>${workingDir}/cfHbPatched/publication</arg>
|
||||
<arg>${graphOutputPath}/publication</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_cfhb_patched_dataset">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<prepare>
|
||||
<delete path="${graphOutputPath}/dataset"/>
|
||||
</prepare>
|
||||
<arg>${workingDir}/cfHbPatched/dataset</arg>
|
||||
<arg>${graphOutputPath}/dataset</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_cfhb_patched_otherresearchproduct">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<prepare>
|
||||
<delete path="${graphOutputPath}/otherresearchproduct"/>
|
||||
</prepare>
|
||||
<arg>${workingDir}/cfHbPatched/otherresearchproduct</arg>
|
||||
<arg>${graphOutputPath}/otherresearchproduct</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_cfhb_patched_software">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<prepare>
|
||||
<delete path="${graphOutputPath}/software"/>
|
||||
</prepare>
|
||||
<arg>${workingDir}/cfHbPatched/software</arg>
|
||||
<arg>${graphOutputPath}/software</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="copy_wait" to="End"/>
|
||||
<join name="wait_clean_country" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
|
|
Loading…
Reference in New Issue