dataset based provision WIP, fixed spark2EventLogDir

This commit is contained in:
Claudio Atzori 2020-04-02 16:15:50 +02:00
parent 9c7092416a
commit daa26acc9d
2 changed files with 30 additions and 20 deletions

View File

@ -86,7 +86,6 @@ public class GraphJoiner_v2 implements Serializable {
Dataset<TypedRow> publication = readPathEntity(jsc, getInputPath(), "publication"); Dataset<TypedRow> publication = readPathEntity(jsc, getInputPath(), "publication");
// create the union between all the entities // create the union between all the entities
Dataset<Tuple2<String, TypedRow>> entities =
datasource datasource
.union(organization) .union(organization)
.union(project) .union(project)
@ -94,16 +93,29 @@ public class GraphJoiner_v2 implements Serializable {
.union(otherresearchproduct) .union(otherresearchproduct)
.union(software) .union(software)
.union(publication) .union(publication)
.map((MapFunction<TypedRow, Tuple2<String, TypedRow>>) value -> new Tuple2<>( .repartition(20000)
value.getId(), .write()
value), .parquet(getOutPath() + "/entities");
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.cache();
Dataset<Tuple2<String, TypedRow>> entities = getSpark()
.read()
.load(getOutPath() + "/entities")
.map((MapFunction<Row, Tuple2<String, TypedRow>>) r -> {
TypedRow t = new TypedRow();
t.setId(r.getAs("id"));
t.setDeleted(r.getAs("deleted"));
t.setType(r.getAs("type"));
t.setOaf(r.getAs("oaf"));
return new Tuple2<>(t.getId(), t);
}, Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)));
System.out.println("Entities, number of partitions: " + entities.rdd().getNumPartitions());
System.out.println("Entities schema:"); System.out.println("Entities schema:");
entities.printSchema(); entities.printSchema();
// reads the relationships
/*
// reads the relationships
Dataset<Relation> rels = readPathRelation(jsc, getInputPath()) Dataset<Relation> rels = readPathRelation(jsc, getInputPath())
.groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class)) .groupByKey((MapFunction<Relation, SortableRelationKey>) t -> SortableRelationKey.from(t), Encoders.kryo(SortableRelationKey.class))
.flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class)) .flatMapGroups((FlatMapGroupsFunction<SortableRelationKey, Relation, Relation>) (key, values) -> Iterators.limit(values, MAX_RELS), Encoders.bean(Relation.class))
@ -126,7 +138,7 @@ public class GraphJoiner_v2 implements Serializable {
e.setRelation(t._1()._2()); e.setRelation(t._1()._2());
e.setTarget(asRelatedEntity(t._2()._2())); e.setTarget(asRelatedEntity(t._2()._2()));
return e; return e;
}, Encoders.bean(EntityRelEntity.class)) }, Encoders.kryo(EntityRelEntity.class))
.map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) e -> new Tuple2<>(e.getRelation().getSource(), e), .map((MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) e -> new Tuple2<>(e.getRelation().getSource(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
@ -160,13 +172,11 @@ public class GraphJoiner_v2 implements Serializable {
final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId); final XmlRecordFactory recordFactory = new XmlRecordFactory(accumulators, contextMapper, false, schemaLocation, otherDsTypeId);
grouped grouped
.map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING()) .map((MapFunction<JoinedEntity, String>) value -> recordFactory.build(value), Encoders.STRING())
.write()
.text(getOutPath() + "/xml");
/*
.javaRDD() .javaRDD()
.mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> new Tuple2<>(t._1(), t._2())) .mapToPair((PairFunction<Tuple2<String, String>, String, String>) t -> new Tuple2<>(t._1(), t._2()))
.saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .saveAsHadoopFile(getOutPath() + "/xml", Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
*/ */
return this; return this;

View File

@ -25,6 +25,6 @@
</property> </property>
<property> <property>
<name>spark2EventLogDir</name> <name>spark2EventLogDir</name>
<value>/user/spark/applicationHistory</value> <value>/user/spark/spark2ApplicationHistory</value>
</property> </property>
</configuration> </configuration>