diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index f3188007a..b08e593f7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -116,23 +116,10 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - final String relatedEntityPath = outputPath + "_relatedEntity"; - readPathEntity(spark, inputEntityPath, clazz) + Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( - (MapFunction) value -> asRelatedEntity(value, clazz), - Encoders.kryo(RelatedEntity.class)) - .repartition(5000) - .write() - .mode(SaveMode.Overwrite) - .parquet(relatedEntityPath); - - Dataset> entities = spark - .read() - .load(relatedEntityPath) - .as(Encoders.kryo(RelatedEntity.class)) - .map( - (MapFunction>) e -> new Tuple2<>(e.getId(), e), + (MapFunction>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) .cache();