[SKG-IF] fixing issue in deserialization
This commit is contained in:
parent
b176bbef1d
commit
187b91a699
|
@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -19,6 +20,9 @@ import org.apache.spark.sql.Dataset;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities;
|
import eu.dnetlib.dhp.oa.graph.dump.skgif.EmitFromEntities;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils;
|
import eu.dnetlib.dhp.oa.graph.dump.skgif.Utils;
|
||||||
|
@ -71,9 +75,9 @@ public class SelectConnectedEntities implements Serializable {
|
||||||
|
|
||||||
private static <R extends Result> void selectConnectedEntities(SparkSession spark, String inputPath,
|
private static <R extends Result> void selectConnectedEntities(SparkSession spark, String inputPath,
|
||||||
String filterPath,
|
String filterPath,
|
||||||
String workingDir) {
|
String workingDir) throws JsonProcessingException {
|
||||||
|
|
||||||
Dataset<Identifiers> resultIds = spark.emptyDataset(Encoders.bean(Identifiers.class));
|
Dataset<String> resultIds = spark.emptyDataset(Encoders.STRING());
|
||||||
for (EntityType entity : ModelSupport.entityTypes.keySet())
|
for (EntityType entity : ModelSupport.entityTypes.keySet())
|
||||||
if (ModelSupport.isResult(entity))
|
if (ModelSupport.isResult(entity))
|
||||||
resultIds = resultIds
|
resultIds = resultIds
|
||||||
|
@ -81,8 +85,11 @@ public class SelectConnectedEntities implements Serializable {
|
||||||
spark
|
spark
|
||||||
.read()
|
.read()
|
||||||
.parquet(filterPath + entity.name() + "_ids")
|
.parquet(filterPath + entity.name() + "_ids")
|
||||||
.as(Encoders.bean(Identifiers.class)));
|
.select("id")
|
||||||
|
.as(Encoders.STRING()));
|
||||||
|
|
||||||
|
// log.info("Number of identifiers in the result {}", resultIds.count());
|
||||||
|
log.info("Deserialization of the first id {}", new ObjectMapper().writeValueAsString(resultIds.first()));
|
||||||
Dataset<Relation> relation = Utils
|
Dataset<Relation> relation = Utils
|
||||||
.readPath(spark, inputPath + "relation", Relation.class)
|
.readPath(spark, inputPath + "relation", Relation.class)
|
||||||
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference());
|
.filter((FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference());
|
||||||
|
@ -104,17 +111,25 @@ public class SelectConnectedEntities implements Serializable {
|
||||||
|
|
||||||
// select relations having source in the set of identifiers selected for eosc
|
// select relations having source in the set of identifiers selected for eosc
|
||||||
Dataset<Relation> resultSource = resultIds
|
Dataset<Relation> resultSource = resultIds
|
||||||
.joinWith(relation, resultIds.col("id").equalTo(relation.col("source")))
|
.joinWith(relation, resultIds.col("_1").equalTo(relation.col("source")))
|
||||||
.map((MapFunction<Tuple2<Identifiers, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
|
.map((MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class));
|
||||||
|
|
||||||
// write relations having source and target in the set
|
// write relations having source and target in the set
|
||||||
resultIds
|
resultIds
|
||||||
.joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target")))
|
.joinWith(resultSource, resultIds.col("_1").equalTo(resultSource.col("target")))
|
||||||
.map((MapFunction<Tuple2<Identifiers, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class))
|
.map((MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class))
|
||||||
.write()
|
.write()
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(workingDir + "relation");
|
.json(workingDir + "relation");
|
||||||
|
log
|
||||||
|
.info(
|
||||||
|
"Number of relations in the result {}", resultIds
|
||||||
|
.joinWith(resultSource, resultIds.col("id").equalTo(resultSource.col("target")))
|
||||||
|
.map(
|
||||||
|
(MapFunction<Tuple2<String, Relation>, Relation>) t2 -> t2._2(),
|
||||||
|
Encoders.bean(Relation.class))
|
||||||
|
.count());
|
||||||
|
|
||||||
// write relations between results and organizations
|
// write relations between results and organizations
|
||||||
resultSource
|
resultSource
|
||||||
|
@ -125,7 +140,6 @@ public class SelectConnectedEntities implements Serializable {
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
.json(workingDir + "relation");
|
.json(workingDir + "relation");
|
||||||
|
|
||||||
// write relations between results and projects
|
|
||||||
resultSource
|
resultSource
|
||||||
.joinWith(projects, resultSource.col("target").equalTo(projects.col("id")))
|
.joinWith(projects, resultSource.col("target").equalTo(projects.col("id")))
|
||||||
.map((MapFunction<Tuple2<Relation, Project>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
|
.map((MapFunction<Tuple2<Relation, Project>, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class))
|
||||||
|
|
|
@ -212,7 +212,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
<!-- <arg>--workingDir</arg><arg>${workingDir}/graph/</arg>-->
|
<!-- <arg>--workingDir</arg><arg>${workingDir}/graph/</arg>-->
|
||||||
<arg>--workingDir</arg><arg>${nameNode}/user/miriam.baglioni/oa/graph/dump/temp/graph/</arg>
|
<arg>--workingDir</arg><arg>/user/miriam.baglioni/oa/graph/dump/temp/graph/</arg>
|
||||||
<arg>--filterPath</arg><arg>${filterPath}</arg>
|
<arg>--filterPath</arg><arg>${filterPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<!-- <ok to="emit_from_result"/>-->
|
<!-- <ok to="emit_from_result"/>-->
|
||||||
|
|
Loading…
Reference in New Issue