diff --git a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java index 9a8df9d3e7..fec4a08ceb 100644 --- a/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java +++ b/dhp-workflows/dhp-propagation/src/main/java/eu/dnetlib/dhp/countrypropagation/PrepareResultCountrySet.java @@ -1,4 +1,134 @@ + package eu.dnetlib.dhp.countrypropagation; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.*; + public class PrepareResultCountrySet { + private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkCountryPropagationJob2.class + .getResourceAsStream( + "/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String datasourcecountrypath = parser.get("preparedInfoPath"); + log.info("preparedInfoPath: {}", datasourcecountrypath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class resultClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + getPotentialResultToUpdate( + spark, + inputPath, + datasourcecountrypath, + resultClazz); + }); + + } + + private static void getPotentialResultToUpdate( + SparkSession spark, + String inputPath, + String datasourcecountrypath, + Class resultClazz) { + + Dataset result = readPathEntity(spark, inputPath, resultClazz); + result.createOrReplaceTempView("result"); + // log.info("number of results: {}", result.count()); + createCfHbforresult(spark); + Dataset datasourcecountryassoc = readAssocDatasourceCountry(spark, datasourcecountrypath); + countryPropagationAssoc(spark, datasourcecountryassoc) + .map((MapFunction) value -> { + R ret = resultClazz.newInstance(); + ret.setId(value.getResultId()); + ret + .setCountry( + value + .getCountrySet() + .stream() + .map(c -> getCountry(c.getClassid(), c.getClassname())) + .collect(Collectors.toList())); + return ret; + }, Encoders.bean(resultClazz)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Append) + .json(inputPath); + } + + private static Dataset countryPropagationAssoc( + SparkSession spark, + Dataset datasource_country) { + + // Dataset datasource_country = broadcast_datasourcecountryassoc.value(); + datasource_country.createOrReplaceTempView("datasource_country"); + log.info("datasource_country number : {}", datasource_country.count()); + + String query = "SELECT id resultId, collect_set(country) countrySet " + + "FROM ( SELECT id, country " + + "FROM datasource_country " + + "JOIN cfhb " + + " ON cf = dataSourceId " + + "UNION ALL " + + "SELECT id , country " + + "FROM datasource_country " + + "JOIN cfhb " + + " ON hb = dataSourceId ) tmp " + + "GROUP BY id"; + Dataset potentialUpdates = spark + .sql(query) + .as(Encoders.bean(ResultCountrySet.class)); + // log.info("potential update number : {}", potentialUpdates.count()); + return potentialUpdates; + } + + private static Dataset readAssocDatasourceCountry( + SparkSession spark, String relationPath) { + return spark + .read() + .textFile(relationPath) + .map( + value -> OBJECT_MAPPER.readValue(value, DatasourceCountry.class), + Encoders.bean(DatasourceCountry.class)); + } } diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json index 912c887432..984b40774e 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json @@ -9,7 +9,7 @@ "paramName":"h", "paramLongName":"hive_metastore_uris", "paramDescription": "the hive metastore uris", - "paramRequired": true + "paramRequired": false }, { "paramName":"sg", @@ -33,7 +33,7 @@ "paramName": "p", "paramLongName": "preparedInfoPath", "paramDescription": "the path where prepared info have been stored", - "paramRequired": true + "paramRequired": false }, { "paramName": "ssm", diff --git a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml index 70ac77434c..469aa25623 100644 --- a/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-propagation/src/main/resources/eu/dnetlib/dhp/countrypropagation/oozie_app/workflow.xml @@ -27,6 +27,10 @@ + + + + @@ -45,6 +49,10 @@ + + + + @@ -89,6 +97,50 @@ + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/publication + ${nameNode}/${workingDir}/publication + + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/dataset + ${nameNode}/${workingDir}/dataset + + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/otherresearchproduct + ${nameNode}/${workingDir}/otherresearchproduct + + + + + + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/software + ${nameNode}/${workingDir}/software + + + + + @@ -113,9 +165,139 @@ --hive_metastore_uris${hive_metastore_uris} --outputPath${workingDir}/preparedInfo - + + + + + + + + + + + + yarn + cluster + prepareResultCountry-Publication + eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet + dhp-propagation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + + --sourcePath${workingDir}/publication + --hive_metastore_uris${hive_metastore_uris} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + prepareResultCountry-Dataset + eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet + dhp-propagation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + + --sourcePath${workingDir}/dataset + --hive_metastore_uris${hive_metastore_uris} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + prepareResultCountry-ORP + eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet + dhp-propagation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + + --sourcePath${workingDir}/otherresearchproduct + --hive_metastore_uris${hive_metastore_uris} + --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + + yarn + cluster + prepareResultCountry-Software + eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet + dhp-propagation-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.dynamicAllocation.enabled=true + --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + + --sourcePath${workingDir}/software + --hive_metastore_uris${hive_metastore_uris} + --resultTableNameeu.dnetlib.dhp.schema.oaf.Software + --preparedInfoPath${workingDir}/preparedInfo + + + + + + + @@ -128,26 +310,27 @@ yarn cluster countryPropagationForPublications - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 dhp-propagation-${projectVersion}.jar - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} + + --driver-memory=${sparkDriverMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=3840 - --sourcePath${sourcePath}/publication - --hive_metastore_uris${hive_metastore_uris} + --sourcePath${workingDir}/publication --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Publication --outputPath${outputPath}/publication - --preparedInfoPath${workingDir}/preparedInfo @@ -158,7 +341,7 @@ yarn cluster countryPropagationForDataset - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -171,13 +354,13 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false - --sourcePath${sourcePath}/dataset - --hive_metastore_uris${hive_metastore_uris} + --sourcePath${workingDir}/dataset --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Dataset --outputPath${outputPath}/dataset - --preparedInfoPath${workingDir}/preparedInfo @@ -188,7 +371,7 @@ yarn cluster countryPropagationForORP - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -201,13 +384,13 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false - --sourcePath${sourcePath}/otherresearchproduct - --hive_metastore_uris${hive_metastore_uris} + --sourcePath${workingDir}/otherresearchproduct --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct --outputPath${outputPath}/otherresearchproduct - --preparedInfoPath${workingDir}/preparedInfo @@ -218,7 +401,7 @@ yarn cluster countryPropagationForSoftware - eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob2 + eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob3 dhp-propagation-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -231,13 +414,13 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false - --sourcePath${sourcePath}/software - --hive_metastore_uris${hive_metastore_uris} + --sourcePath${workingDir}/software --saveGraph${saveGraph} --resultTableNameeu.dnetlib.dhp.schema.oaf.Software --outputPath${outputPath}/software - --preparedInfoPath${workingDir}/preparedInfo