From 9c7092416a9a535792389ac72206868ccc4eabc5 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 1 Apr 2020 19:07:30 +0200 Subject: [PATCH] dataset based provision WIP --- .../dhp/oa/provision/GraphJoiner_v2.java | 25 ++++------------- .../SparkXmlRecordBuilderJob_v2.java | 10 +++---- .../input_params_build_adjacency_lists.json | 3 +- .../oa/provision/oozie_app/config-default.xml | 6 +--- .../dhp/oa/provision/oozie_app/workflow.xml | 28 ++++++++++++------- 5 files changed, 32 insertions(+), 40 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java index 236ef93ec..d9f79a967 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java @@ -45,19 +45,12 @@ import static eu.dnetlib.dhp.oa.provision.utils.GraphMappingUtils.asRelatedEntit */ public class GraphJoiner_v2 implements Serializable { - public static final int LIMIT = 1000000; private Map accumulators = Maps.newHashMap(); public static final int MAX_RELS = 100; public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd"; - private static final StructType KV_SCHEMA = StructType$.MODULE$.apply( - Arrays.asList( - StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), - StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty()) - )); - private SparkSession spark; private ContextMapper contextMapper; @@ -105,7 +98,6 @@ public class GraphJoiner_v2 implements Serializable { value.getId(), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .limit(LIMIT) .cache(); System.out.println("Entities schema:"); @@ -115,7 +107,6 @@ public class GraphJoiner_v2 implements Serializable { Dataset rels = readPathRelation(jsc, getInputPath()) .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) - .limit(LIMIT) .cache(); System.out.println("Relation schema:"); @@ -169,7 +160,6 @@ public class GraphJoiner_v2 implements Serializable { final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); grouped .map((MapFunction) value -> recordFactory.build(value), Encoders.STRING()) - .limit(LIMIT) .write() .text(getOutPath() + "/xml"); /* @@ -245,13 +235,11 @@ public class GraphJoiner_v2 implements Serializable { * @return the JavaPairRDD indexed by entity identifier */ private Dataset readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { - RDD rdd = sc.textFile(inputPath + "/" + type) - .map((Function) s -> RowFactory.create("", s)) + RDD rdd = sc.textFile(inputPath + "/" + type) .rdd(); - return getSpark().createDataFrame(rdd, KV_SCHEMA) - .map((MapFunction) row -> { - final String s = row.getAs("value"); + return getSpark().createDataset(rdd, Encoders.STRING()) + .map((MapFunction) s -> { final DocumentContext json = JsonPath.parse(s); final TypedRow t = new TypedRow(); t.setId(json.read("$.id")); @@ -270,12 +258,11 @@ public class GraphJoiner_v2 implements Serializable { * @return the JavaRDD containing all the relationships */ private Dataset readPathRelation(final JavaSparkContext sc, final String inputPath) { - final RDD rdd = sc.textFile(inputPath + "/relation") - .map((Function) s -> RowFactory.create("", s)) + final RDD rdd = sc.textFile(inputPath + "/relation") .rdd(); - return getSpark().createDataFrame(rdd, KV_SCHEMA) - .map((MapFunction) value -> new ObjectMapper().readValue(value.getAs("value"), Relation.class), Encoders.bean(Relation.class)); + return getSpark().createDataset(rdd, Encoders.STRING()) + .map((MapFunction) s -> new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class)); } private ObjectMapper getObjectMapper() { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java index 3b119cebb..e4124e52f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlRecordBuilderJob_v2.java @@ -17,23 +17,23 @@ public class SparkXmlRecordBuilderJob_v2 { SparkXmlRecordBuilderJob_v2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); parser.parseArgument(args); - final String master = parser.get("master"); - try(SparkSession spark = getSession(master)) { + try(SparkSession spark = getSession(parser)) { final String inputPath = parser.get("sourcePath"); final String outputPath = parser.get("outputPath"); final String isLookupUrl = parser.get("isLookupUrl"); final String otherDsTypeId = parser.get("otherDsTypeId"); + new GraphJoiner_v2(spark, ContextMapper.fromIS(isLookupUrl), otherDsTypeId, inputPath, outputPath) .adjacencyLists(); } } - private static SparkSession getSession(String master) { + private static SparkSession getSession(ArgumentApplicationParser parser) { final SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.set("spark.sql.shuffle.partitions", "500"); + conf.set("spark.sql.shuffle.partitions", parser.get("sparkSqlShufflePartitions")); conf.registerKryoClasses(new Class[]{ Author.class, Context.class, @@ -74,7 +74,7 @@ public class SparkXmlRecordBuilderJob_v2 { .builder() .config(conf) .appName(SparkXmlRecordBuilderJob_v2.class.getSimpleName()) - .master(master) + .master(parser.get("master")) .getOrCreate(); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json index a5d20a55f..bbac579fe 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json @@ -3,5 +3,6 @@ {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, {"paramName":"o", "paramLongName":"outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true}, {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequence file to read", "paramRequired": true}, - {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true} + {"paramName":"t", "paramLongName":"otherDsTypeId", "paramDescription": "list of datasource types to populate field datasourcetypeui", "paramRequired": true}, + {"paramName":"sp", "paramLongName":"sparkSqlShufflePartitions", "paramDescription": "Configures the number of partitions to use when shuffling data for joins or aggregations", "paramRequired": true} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml index 624d3ea76..c0364c2cf 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml @@ -19,13 +19,9 @@ hive_metastore_uris thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 - - hive_db_name - openaire - spark2YarnHistoryServerAddress - http://iis-cdh5-test-gw.ocean.icm.edu.pl:18088 + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 spark2EventLogDir diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index e981c450e..f4bd3f19e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -2,19 +2,27 @@ - hive_db_name - the target hive database name - - - sparkDriverMemory + sparkDriverMemoryForJoining memory for driver process - sparkExecutorMemory + sparkExecutorMemoryForJoining memory for individual executor - sparkExecutorCores + sparkExecutorCoresForJoining + number of cores used by single executor + + + sparkDriverMemoryForIndexing + memory for driver process + + + sparkExecutorMemoryForIndexing + memory for individual executor + + + sparkExecutorCoresForIndexing number of cores used by single executor @@ -75,13 +83,13 @@ --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.network.timeout=10000000 -mt yarn -is ${isLookupUrl} -t ${otherDsTypeId} - --sourcePath${sourcePath} - --outputPath${outputPath} + -s${sourcePath} + -o${outputPath} + -sp${sparkSqlShufflePartitions}