diff --git a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java index 31ee0ac..91ce474 100644 --- a/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java +++ b/dump-schema/src/main/java/eu/dnetlib/dhp/eosc/model/Relation.java @@ -25,6 +25,17 @@ public class Relation implements Serializable { @JsonSchema(description = "The reason why OpenAIRE holds the relation ") private Provenance provenance; + @JsonSchema(description = "The result type of the target for this relation") + private String targetType; + + public String getTargetType() { + return targetType; + } + + public void setTargetType(String targetType) { + this.targetType = targetType; + } + public String getSource() { return source; } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java index d7be673..6f02871 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkSelectRelation.java @@ -78,36 +78,46 @@ public class SparkSelectRelation implements Serializable { (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && !removeSet.contains(r.getRelClass())); - Dataset resultIds = Utils + Dataset> resultIds = Utils .readPath(spark, outputPath + "/publication", Result.class) - .map((MapFunction) p -> p.getId(), Encoders.STRING()) + .map( + (MapFunction>) p -> new Tuple2<>(p.getId(), p.getType()), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .union( Utils .readPath(spark, outputPath + "/dataset", Result.class) - .map((MapFunction) d -> d.getId(), Encoders.STRING())) + .map( + (MapFunction>) d -> new Tuple2<>(d.getId(), d.getType()), + Encoders.tuple(Encoders.STRING(), Encoders.STRING()))) .union( Utils .readPath(spark, outputPath + "/software", Result.class) - .map((MapFunction) s -> s.getId(), Encoders.STRING())) + .map( + (MapFunction>) s -> new Tuple2<>(s.getId(), s.getType()), + Encoders.tuple(Encoders.STRING(), Encoders.STRING()))) .union( Utils .readPath(spark, outputPath + "/otherresearchproduct", Result.class) - .map((MapFunction) o -> o.getId(), Encoders.STRING())); + .map( + (MapFunction>) o -> new Tuple2<>(o.getId(), o.getType()), + Encoders.tuple(Encoders.STRING(), Encoders.STRING()))); // select result -> result relations Dataset relResultResult = relation - .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value"))) - .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); + .joinWith(resultIds, relation.col("source").equalTo(resultIds.col("_1"))) + .map( + (MapFunction>, Relation>) t2 -> t2._1(), + Encoders.bean(Relation.class)); relResultResult - .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value"))) - .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)) - .map((MapFunction) rel -> { + .joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("_1"))) + .map((MapFunction>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> { eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation(); + Relation rel = t2._1(); relNew .setSource( @@ -124,6 +134,7 @@ public class SparkSelectRelation implements Serializable { .newInstance( rel.getRelClass(), rel.getSubRelType())); + relNew.setTargetType(t2._2()._2()); Optional odInfo = Optional.ofNullable(rel.getDataInfo()); if (odInfo.isPresent()) {