diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java index f88f7457f..76e1d57a1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java @@ -9,6 +9,7 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -42,6 +43,12 @@ public class GraphHiveTableImporterJob { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + int numPartitions = Optional + .ofNullable(parser.get("numPartitions")) + .map(Integer::valueOf) + .orElse(-1); + log.info("numPartitions: {}", numPartitions); + String inputPath = parser.get("inputPath"); log.info("inputPath: {}", inputPath); @@ -60,16 +67,21 @@ public class GraphHiveTableImporterJob { conf.set("hive.metastore.uris", hiveMetastoreUris); runWithSparkHiveSession( - conf, isSparkSessionManaged, spark -> loadGraphTable(spark, inputPath, hiveDbName, clazz)); + conf, isSparkSessionManaged, spark -> loadGraphTable(spark, inputPath, hiveDbName, clazz, numPartitions)); } // protected for testing private static void loadGraphTable(SparkSession spark, String inputPath, String hiveDbName, - Class clazz) { + Class clazz, int numPartitions) { - spark - .read() - .textFile(inputPath) + Dataset dataset = spark.read().textFile(inputPath); + + if (numPartitions > 0) { + log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions); + dataset = dataset.repartition(numPartitions); + } + + dataset .map((MapFunction) s -> OBJECT_MAPPER.readValue(s, clazz), Encoders.bean(clazz)) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index 8566d7667..09930336a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -282,6 +282,7 @@ --hiveDbName${hiveDbName} --classNameeu.dnetlib.dhp.schema.oaf.Project --hiveMetastoreUris${hiveMetastoreUris} + --numPartitions100 diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_table_importer_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_table_importer_parameters.json index 5b5b0743c..f38a0412c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_table_importer_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive_table_importer_parameters.json @@ -5,6 +5,12 @@ "paramDescription": "when true will stop SparkSession after job execution", "paramRequired": false }, + { + "paramName": "np", + "paramLongName": "numPartitions", + "paramDescription": "number of dataset partitions", + "paramRequired": false + }, { "paramName": "in", "paramLongName": "inputPath",