introduced parameter 'numParitions', driving the hive DB table data partitioning. Currently specified only for table 'project'

This commit is contained in:
Claudio Atzori 2020-07-23 08:54:10 +02:00
parent 9ab594ccf6
commit 56bbfdc65d
3 changed files with 24 additions and 5 deletions

View File

@ -9,6 +9,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -42,6 +43,12 @@ public class GraphHiveTableImporterJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
int numPartitions = Optional
.ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf)
.orElse(-1);
log.info("numPartitions: {}", numPartitions);
String inputPath = parser.get("inputPath"); String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath); log.info("inputPath: {}", inputPath);
@ -60,16 +67,21 @@ public class GraphHiveTableImporterJob {
conf.set("hive.metastore.uris", hiveMetastoreUris); conf.set("hive.metastore.uris", hiveMetastoreUris);
runWithSparkHiveSession( runWithSparkHiveSession(
conf, isSparkSessionManaged, spark -> loadGraphTable(spark, inputPath, hiveDbName, clazz)); conf, isSparkSessionManaged, spark -> loadGraphTable(spark, inputPath, hiveDbName, clazz, numPartitions));
} }
// protected for testing // protected for testing
private static <T extends Oaf> void loadGraphTable(SparkSession spark, String inputPath, String hiveDbName, private static <T extends Oaf> void loadGraphTable(SparkSession spark, String inputPath, String hiveDbName,
Class<T> clazz) { Class<T> clazz, int numPartitions) {
spark Dataset<String> dataset = spark.read().textFile(inputPath);
.read()
.textFile(inputPath) if (numPartitions > 0) {
log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions);
dataset = dataset.repartition(numPartitions);
}
dataset
.map((MapFunction<String, T>) s -> OBJECT_MAPPER.readValue(s, clazz), Encoders.bean(clazz)) .map((MapFunction<String, T>) s -> OBJECT_MAPPER.readValue(s, clazz), Encoders.bean(clazz))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -282,6 +282,7 @@
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg> <arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg> <arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg> <arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>100</arg>
</spark> </spark>
<ok to="join_import"/> <ok to="join_import"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -5,6 +5,12 @@
"paramDescription": "when true will stop SparkSession after job execution", "paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false "paramRequired": false
}, },
{
"paramName": "np",
"paramLongName": "numPartitions",
"paramDescription": "number of dataset partitions",
"paramRequired": false
},
{ {
"paramName": "in", "paramName": "in",
"paramLongName": "inputPath", "paramLongName": "inputPath",