From daa26acc9d2863fda801ae17087ce8f68082978f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 2 Apr 2020 16:15:50 +0200 Subject: [PATCH] dataset based provision WIP, fixed spark2EventLogDir --- .../dhp/oa/provision/GraphJoiner_v2.java | 48 +++++++++++-------- .../oa/provision/oozie_app/config-default.xml | 2 +- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java index d9f79a967..5eac12e5d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/GraphJoiner_v2.java @@ -86,24 +86,36 @@ public class GraphJoiner_v2 implements Serializable { Dataset publication = readPathEntity(jsc, getInputPath(), "publication"); // create the union between all the entities - Dataset> entities = - datasource - .union(organization) - .union(project) - .union(dataset) - .union(otherresearchproduct) - .union(software) - .union(publication) - .map((MapFunction>) value -> new Tuple2<>( - value.getId(), - value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .cache(); + datasource + .union(organization) + .union(project) + .union(dataset) + .union(otherresearchproduct) + .union(software) + .union(publication) + .repartition(20000) + .write() + .parquet(getOutPath() + "/entities"); + Dataset> entities = getSpark() + .read() + .load(getOutPath() + "/entities") + .map((MapFunction>) r -> { + TypedRow t = new TypedRow(); + t.setId(r.getAs("id")); + t.setDeleted(r.getAs("deleted")); + t.setType(r.getAs("type")); + t.setOaf(r.getAs("oaf")); + + return new Tuple2<>(t.getId(), t); + }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))); + + System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions()); System.out.println("Entities schema:"); entities.printSchema(); - // reads the relationships +/* + // reads the relationships Dataset rels = readPathRelation(jsc, getInputPath()) .groupByKey((MapFunction) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) .flatMapGroups((FlatMapGroupsFunction) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) @@ -126,7 +138,7 @@ public class GraphJoiner_v2 implements Serializable { e.setRelation(t._1()._2()); e.setTarget(asRelatedEntity(t._2()._2())); return e; - }, Encoders.bean(EntityRelEntity.class)) + }, Encoders.kryo(EntityRelEntity.class)) .map((MapFunction>) e -> new Tuple2<>(e.getRelation().getSource(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); @@ -160,14 +172,12 @@ public class GraphJoiner_v2 implements Serializable { final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); grouped .map((MapFunction) value -> recordFactory.build(value), Encoders.STRING()) - .write() - .text(getOutPath() + "/xml"); - /* .javaRDD() .mapToPair((PairFunction, String, String>) t -> new Tuple2<>(t._1(), t._2())) .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - */ + +*/ return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml index c0364c2cf..b1a494ac4 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/config-default.xml @@ -25,6 +25,6 @@ spark2EventLogDir - /user/spark/applicationHistory + /user/spark/spark2ApplicationHistory \ No newline at end of file