diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index a33a45517..48e5945c0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -52,8 +52,11 @@ public class CreateRelatedEntitiesJob_phase1 { final String jsonConfiguration = IOUtils .toString( - PrepareRelationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + Objects + .requireNonNull( + CreateRelatedEntitiesJob_phase1.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -75,6 +78,7 @@ public class CreateRelatedEntitiesJob_phase1 { final String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); + @SuppressWarnings("unchecked") final Class entityClazz = (Class) Class.forName(graphTableClassName); final SparkConf conf = new SparkConf(); @@ -101,22 +105,12 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - readPathEntity(spark, inputEntityPath, clazz) + final Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( (MapFunction>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)), - Encoders - .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) - .write() - .mode(SaveMode.Overwrite) - .save("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()); - - final Dataset> entities = spark - .read() - .load("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()) - .as( - Encoders - .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) + .cache(); relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") @@ -149,8 +143,10 @@ public class CreateRelatedEntitiesJob_phase1 { re.setId(entity.getId()); re.setType(EntityType.fromClass(clazz).name()); - if (entity.getPid() != null) + // TODO move the max number of PIDs to eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits + if (Objects.nonNull(entity.getPid())) { re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList())); + } re.setCollectedfrom(entity.getCollectedfrom()); switch (EntityType.fromClass(clazz)) { @@ -212,7 +208,7 @@ public class CreateRelatedEntitiesJob_phase1 { final List> f = p.getFundingtree(); if (!f.isEmpty()) { - re.setFundingtree(f.stream().map(s -> s.getValue()).collect(Collectors.toList())); + re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList())); } break; } @@ -227,15 +223,16 @@ public class CreateRelatedEntitiesJob_phase1 { return Optional .ofNullable(f) .filter(Objects::nonNull) - .map(x -> x.getValue()) + .map(Field::getValue) .orElse(defaultValue); } /** - * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * file * - * @param spark - * @param relationPath + * @param spark the SparkSession + * @param relationPath the path storing the relation objects * @return the Dataset containing all the relationships */ private static Dataset readPathRelation(