Merge pull request 'Extend the relation inside the result' (#6) from eoscExtendRelation into eoscDump
Reviewed-on: #6
This commit is contained in:
commit
64f10b6d31
|
@ -25,6 +25,17 @@ public class Relation implements Serializable {
|
|||
@JsonSchema(description = "The reason why OpenAIRE holds the relation ")
|
||||
private Provenance provenance;
|
||||
|
||||
@JsonSchema(description = "The result type of the target for this relation")
|
||||
private String targetType;
|
||||
|
||||
public String getTargetType() {
|
||||
return targetType;
|
||||
}
|
||||
|
||||
public void setTargetType(String targetType) {
|
||||
this.targetType = targetType;
|
||||
}
|
||||
|
||||
public String getSource() {
|
||||
return source;
|
||||
}
|
||||
|
|
|
@ -78,36 +78,46 @@ public class SparkSelectRelation implements Serializable {
|
|||
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
|
||||
&& !removeSet.contains(r.getRelClass()));
|
||||
|
||||
Dataset<String> resultIds = Utils
|
||||
Dataset<Tuple2<String, String>> resultIds = Utils
|
||||
.readPath(spark, outputPath + "/publication", Result.class)
|
||||
|
||||
.map((MapFunction<Result, String>) p -> p.getId(), Encoders.STRING())
|
||||
.map(
|
||||
(MapFunction<Result, Tuple2<String, String>>) p -> new Tuple2<>(p.getId(), p.getType()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, outputPath + "/dataset", Result.class)
|
||||
|
||||
.map((MapFunction<Result, String>) d -> d.getId(), Encoders.STRING()))
|
||||
.map(
|
||||
(MapFunction<Result, Tuple2<String, String>>) d -> new Tuple2<>(d.getId(), d.getType()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING())))
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, outputPath + "/software", Result.class)
|
||||
|
||||
.map((MapFunction<Result, String>) s -> s.getId(), Encoders.STRING()))
|
||||
.map(
|
||||
(MapFunction<Result, Tuple2<String, String>>) s -> new Tuple2<>(s.getId(), s.getType()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING())))
|
||||
.union(
|
||||
Utils
|
||||
.readPath(spark, outputPath + "/otherresearchproduct", Result.class)
|
||||
|
||||
.map((MapFunction<Result, String>) o -> o.getId(), Encoders.STRING()));
|
||||
.map(
|
||||
(MapFunction<Result, Tuple2<String, String>>) o -> new Tuple2<>(o.getId(), o.getType()),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.STRING())));
|
||||
|
||||
// select result -> result relations
|
||||
Dataset<Relation> relResultResult = relation
|
||||
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value")))
|
||||
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
|
||||
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("_1")))
|
||||
.map(
|
||||
(MapFunction<Tuple2<Relation, Tuple2<String, String>>, Relation>) t2 -> t2._1(),
|
||||
Encoders.bean(Relation.class));
|
||||
|
||||
relResultResult
|
||||
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value")))
|
||||
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
|
||||
.map((MapFunction<Relation, eu.dnetlib.dhp.eosc.model.Relation>) rel -> {
|
||||
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("_1")))
|
||||
.map((MapFunction<Tuple2<Relation, Tuple2<String, String>>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> {
|
||||
eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation();
|
||||
Relation rel = t2._1();
|
||||
relNew
|
||||
.setSource(
|
||||
|
||||
|
@ -124,6 +134,7 @@ public class SparkSelectRelation implements Serializable {
|
|||
.newInstance(
|
||||
rel.getRelClass(),
|
||||
rel.getSubRelType()));
|
||||
relNew.setTargetType(t2._2()._2());
|
||||
|
||||
Optional<DataInfo> odInfo = Optional.ofNullable(rel.getDataInfo());
|
||||
if (odInfo.isPresent()) {
|
||||
|
|
Loading…
Reference in New Issue