1
0
Fork 0

reverted CreateRelatedEntitiesJob_phase1 to its previous state

This commit is contained in:
Claudio Atzori 2020-07-13 22:54:04 +02:00
parent 8e97598eb4
commit 7d6e269b40
1 changed files with 2 additions and 15 deletions

View File

@ -116,23 +116,10 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache(); .cache();
final String relatedEntityPath = outputPath + "_relatedEntity"; Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false") .filter("dataInfo.invisible == false")
.map( .map(
(MapFunction<E, RelatedEntity>) value -> asRelatedEntity(value, clazz), (MapFunction<E, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)),
Encoders.kryo(RelatedEntity.class))
.repartition(5000)
.write()
.mode(SaveMode.Overwrite)
.parquet(relatedEntityPath);
Dataset<Tuple2<String, RelatedEntity>> entities = spark
.read()
.load(relatedEntityPath)
.as(Encoders.kryo(RelatedEntity.class))
.map(
(MapFunction<RelatedEntity, Tuple2<String, RelatedEntity>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
.cache(); .cache();