first attempt to extend the model adding the result type in the relation
This commit is contained in:
parent
669e5b645c
commit
6f065371b6
|
@ -25,6 +25,17 @@ public class Relation implements Serializable {
|
||||||
@JsonSchema(description = "The reason why OpenAIRE holds the relation ")
|
@JsonSchema(description = "The reason why OpenAIRE holds the relation ")
|
||||||
private Provenance provenance;
|
private Provenance provenance;
|
||||||
|
|
||||||
|
@JsonSchema(description = "The result type of the target for this relation")
|
||||||
|
private String targetType;
|
||||||
|
|
||||||
|
public String getTargetType() {
|
||||||
|
return targetType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetType(String targetType) {
|
||||||
|
this.targetType = targetType;
|
||||||
|
}
|
||||||
|
|
||||||
public String getSource() {
|
public String getSource() {
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,36 +78,46 @@ public class SparkSelectRelation implements Serializable {
|
||||||
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference()
|
||||||
&& !removeSet.contains(r.getRelClass()));
|
&& !removeSet.contains(r.getRelClass()));
|
||||||
|
|
||||||
Dataset<String> resultIds = Utils
|
Dataset<Tuple2<String, String>> resultIds = Utils
|
||||||
.readPath(spark, outputPath + "/publication", Result.class)
|
.readPath(spark, outputPath + "/publication", Result.class)
|
||||||
|
|
||||||
.map((MapFunction<Result, String>) p -> p.getId(), Encoders.STRING())
|
.map(
|
||||||
|
(MapFunction<Result, Tuple2<String, String>>) p -> new Tuple2<>(p.getId(), p.getType()),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
|
||||||
.union(
|
.union(
|
||||||
Utils
|
Utils
|
||||||
.readPath(spark, outputPath + "/dataset", Result.class)
|
.readPath(spark, outputPath + "/dataset", Result.class)
|
||||||
|
|
||||||
.map((MapFunction<Result, String>) d -> d.getId(), Encoders.STRING()))
|
.map(
|
||||||
|
(MapFunction<Result, Tuple2<String, String>>) d -> new Tuple2<>(d.getId(), d.getType()),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING())))
|
||||||
.union(
|
.union(
|
||||||
Utils
|
Utils
|
||||||
.readPath(spark, outputPath + "/software", Result.class)
|
.readPath(spark, outputPath + "/software", Result.class)
|
||||||
|
|
||||||
.map((MapFunction<Result, String>) s -> s.getId(), Encoders.STRING()))
|
.map(
|
||||||
|
(MapFunction<Result, Tuple2<String, String>>) s -> new Tuple2<>(s.getId(), s.getType()),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING())))
|
||||||
.union(
|
.union(
|
||||||
Utils
|
Utils
|
||||||
.readPath(spark, outputPath + "/otherresearchproduct", Result.class)
|
.readPath(spark, outputPath + "/otherresearchproduct", Result.class)
|
||||||
|
|
||||||
.map((MapFunction<Result, String>) o -> o.getId(), Encoders.STRING()));
|
.map(
|
||||||
|
(MapFunction<Result, Tuple2<String, String>>) o -> new Tuple2<>(o.getId(), o.getType()),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.STRING())));
|
||||||
|
|
||||||
// select result -> result relations
|
// select result -> result relations
|
||||||
Dataset<Relation> relResultResult = relation
|
Dataset<Relation> relResultResult = relation
|
||||||
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("value")))
|
.joinWith(resultIds, relation.col("source").equalTo(resultIds.col("_1")))
|
||||||
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class));
|
.map(
|
||||||
|
(MapFunction<Tuple2<Relation, Tuple2<String, String>>, Relation>) t2 -> t2._1(),
|
||||||
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
relResultResult
|
relResultResult
|
||||||
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("value")))
|
.joinWith(resultIds, relResultResult.col("target").equalTo(resultIds.col("_1")))
|
||||||
.map((MapFunction<Tuple2<Relation, String>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
|
.map((MapFunction<Tuple2<Relation, Tuple2<String, String>>, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> {
|
||||||
.map((MapFunction<Relation, eu.dnetlib.dhp.eosc.model.Relation>) rel -> {
|
|
||||||
eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation();
|
eu.dnetlib.dhp.eosc.model.Relation relNew = new eu.dnetlib.dhp.eosc.model.Relation();
|
||||||
|
Relation rel = t2._1();
|
||||||
relNew
|
relNew
|
||||||
.setSource(
|
.setSource(
|
||||||
|
|
||||||
|
@ -124,6 +134,7 @@ public class SparkSelectRelation implements Serializable {
|
||||||
.newInstance(
|
.newInstance(
|
||||||
rel.getRelClass(),
|
rel.getRelClass(),
|
||||||
rel.getSubRelType()));
|
rel.getSubRelType()));
|
||||||
|
relNew.setTargetType(t2._2()._2());
|
||||||
|
|
||||||
Optional<DataInfo> odInfo = Optional.ofNullable(rel.getDataInfo());
|
Optional<DataInfo> odInfo = Optional.ofNullable(rel.getDataInfo());
|
||||||
if (odInfo.isPresent()) {
|
if (odInfo.isPresent()) {
|
||||||
|
|
Loading…
Reference in New Issue