diff --git a/dhp-workflows/dhp-graph-mapper/pom.xml b/dhp-workflows/dhp-graph-mapper/pom.xml index c1abc68712..664de5ff17 100644 --- a/dhp-workflows/dhp-graph-mapper/pom.xml +++ b/dhp-workflows/dhp-graph-mapper/pom.xml @@ -32,11 +32,6 @@ 1.0.4-SNAPSHOT - - de.javakaffee - kryo-serializers - - diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java index ac1b0a860d..debe4436ad 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/graph/SparkGraphImporterJob.java @@ -3,11 +3,11 @@ package eu.dnetlib.dhp.graph; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import scala.Tuple2; @@ -19,29 +19,27 @@ public class SparkGraphImporterJob { parser.parseArgument(args); final SparkSession spark = SparkSession .builder() - .appName("ImportGraph") + .appName(SparkGraphImporterJob.class.getSimpleName()) .master(parser.get("master")) + .config("hive.metastore.uris", parser.get("hive.metastore.uris")) + .enableHiveSupport() .getOrCreate(); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final String inputPath = parser.get("sourcePath"); - final String outputPath = parser.get("targetPath"); - - final String filter = parser.get("filter"); // Read the input file and convert it into RDD of serializable object - final JavaRDD> inputRDD = sc.sequenceFile(inputPath, Text.class, Text.class) - .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); - GraphMappingUtils.types.forEach((name, clazz) -> { - if (StringUtils.isNotBlank(filter) || filter.toLowerCase().contains(name)) { - spark.createDataset(inputRDD - .filter(s -> s._1().equals(clazz.getName())) - .map(Tuple2::_2) - .map(s -> new ObjectMapper().readValue(s, clazz)) - .rdd(), Encoders.bean(clazz)) - .write() - .save(outputPath + "/" + name); - } + final JavaRDD> inputRDD = sc.sequenceFile(inputPath + "/" + name, Text.class, Text.class) + .map(item -> new Tuple2<>(item._1.toString(), item._2.toString())); + spark.createDataset(inputRDD + .filter(s -> s._1().equals(clazz.getName())) + .map(Tuple2::_2) + .map(s -> new ObjectMapper().readValue(s, clazz)) + .rdd(), Encoders.bean(clazz)) + .write() + .mode(SaveMode.Overwrite) + .saveAsTable("openaire." + name); }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json index ca7283b755..7f76c064fc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/input_graph_parameters.json @@ -1,6 +1,5 @@ [ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, - {"paramName":"f", "paramLongName":"filter", "paramDescription": "csv of typology of dataframe to be generated", "paramRequired": false}, - {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path where store DataFrames on HDFS", "paramRequired": true} + {"paramName":"h", "paramLongName":"hive.metastore.uris","paramDescription": "the hive metastore uris", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml index b2b577712e..a5201a743a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/config-default.xml @@ -16,7 +16,7 @@ spark2 - filter - + hive.metastore.uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml index 8d90af1858..ed23690874 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/graph/oozie_app/workflow.xml @@ -4,10 +4,6 @@ sourcePath the source path - - targetPath - the target path - sparkDriverMemory memory for driver process @@ -20,10 +16,6 @@ sparkExecutorCores number of cores used by single executor - - filter - csv string to filter the entities to generate - @@ -41,12 +33,10 @@ MapGraphIntoDataFrame eu.dnetlib.dhp.graph.SparkGraphImporterJob dhp-graph-mapper-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" + --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster --sourcePath${sourcePath} - --targetPath${targetPath} - --filter${filter} - + --hive.metastore.uris${hive.metastore.uris} diff --git a/pom.xml b/pom.xml index 1b746c0bac..29f4e94273 100644 --- a/pom.xml +++ b/pom.xml @@ -164,12 +164,6 @@ 1.1.6 - - de.javakaffee - kryo-serializers - 0.45 - - net.schmizz sshj