diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index b2abbc156..a33a45517 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -101,13 +101,22 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - final Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) + readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( (MapFunction>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)), Encoders .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) - .cache(); + .write() + .mode(SaveMode.Overwrite) + .save("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()); + + final Dataset> entities = spark + .read() + .load("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()) + .as( + Encoders + .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))); relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") @@ -140,7 +149,8 @@ public class CreateRelatedEntitiesJob_phase1 { re.setId(entity.getId()); re.setType(EntityType.fromClass(clazz).name()); - re.setPid(entity.getPid()); + if (entity.getPid() != null) + re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList())); re.setCollectedfrom(entity.getCollectedfrom()); switch (EntityType.fromClass(clazz)) {