From 24be522e7c8be704f015132e2ba519536a459a76 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 7 Aug 2023 13:56:58 +0200 Subject: [PATCH] fixed NPE, moved class to generate tar --- .../dhp/{ => oa}/common/MakeTarArchive.java | 2 +- .../eu/dnetlib/dhp/oa/graph/dump/MakeTar.java | 2 +- .../dhp/oa/graph/dump/ResultMapper.java | 20 ++++++--- .../dump/complete/QueryInformationSystem.java | 4 +- .../SparkSelectValidRelationsJob.java | 44 ++++++++++++------- .../ProjectsSubsetSparkJob.java | 11 ++--- 6 files changed, 50 insertions(+), 33 deletions(-) rename dump/src/main/java/eu/dnetlib/dhp/{ => oa}/common/MakeTarArchive.java (99%) diff --git a/dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java b/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java similarity index 99% rename from dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java rename to dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java index 43e61e5..20058bc 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/common/MakeTarArchive.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/common/MakeTarArchive.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.common; +package eu.dnetlib.dhp.oa.common; import java.io.BufferedInputStream; import java.io.IOException; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java index cb2e29b..44f4ef6 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/MakeTar.java @@ -15,7 +15,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.MakeTarArchive; +import eu.dnetlib.dhp.oa.common.MakeTarArchive; public class MakeTar implements Serializable { diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java index 52584f9..75f01a6 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ResultMapper.java @@ -515,14 +515,20 @@ public class ResultMapper implements Serializable { setCommonValue(i, instance); - instance - .setCollectedfrom( - CfHbKeyValue - .newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue())); + if (Optional.ofNullable(i.getCollectedfrom()).isPresent() && + Optional.ofNullable(i.getCollectedfrom().getKey()).isPresent() && + StringUtils.isNotBlank(i.getCollectedfrom().getKey())) + instance + .setCollectedfrom( + CfHbKeyValue + .newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue())); - instance - .setHostedby( - CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue())); + if (Optional.ofNullable(i.getHostedby()).isPresent() && + Optional.ofNullable(i.getHostedby().getKey()).isPresent() && + StringUtils.isNotBlank(i.getHostedby().getKey())) + instance + .setHostedby( + CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue())); return instance; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java index 262fe1d..b982b26 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/QueryInformationSystem.java @@ -219,9 +219,9 @@ public class QueryInformationSystem { } if (funding.toLowerCase().contains("h2020")) { nsp = "corda__h2020::"; - } else if (funding.toLowerCase().contains("he")){ + } else if (funding.toLowerCase().contains("he")) { nsp = "corda_____he::"; - }else{ + } else { nsp = "corda_______::"; } break; diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java index d188685..12c2c00 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java @@ -9,12 +9,9 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; - import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; - import org.apache.spark.sql.*; - import org.apache.spark.sql.types.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,23 +66,36 @@ public class SparkSelectValidRelationsJob implements Serializable { } - private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath){ - final StructType structureSchema = new StructType().fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>"); + private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath) { + final StructType structureSchema = new StructType() + .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>"); - org.apache.spark.sql.Dataset df =spark.createDataFrame(new ArrayList(), structureSchema); - List entities = Arrays.asList("publication", "dataset","otherresearchproduct","software","organization","project","datasource"); - for(String e : entities) - df = df.union(spark.read().schema(structureSchema).json(inputPath + "/" + e).filter("dataInfo.deletedbyinference != true")); + org.apache.spark.sql.Dataset df = spark.createDataFrame(new ArrayList(), structureSchema); + List entities = Arrays + .asList( + "publication", "dataset", "otherresearchproduct", "software", "organization", "project", "datasource"); + for (String e : entities) + df = df + .union( + spark + .read() + .schema(structureSchema) + .json(inputPath + "/" + e) + .filter("dataInfo.deletedbyinference != true and dataInfo.invisible != true")); - org.apache.spark.sql.Dataset relations = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputPath + "/relation") - .filter("dataInfo.deletedbyinference == false"); + org.apache.spark.sql.Dataset relations = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(inputPath + "/relation") + .filter("dataInfo.deletedbyinference == false"); - relations.join(df, relations.col("source").equalTo(df.col("id")), "leftsemi") - .join(df, relations.col("target").equalTo(df.col("id")), "leftsemi") - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath); + relations + .join(df, relations.col("source").equalTo(df.col("id")), "leftsemi") + .join(df, relations.col("target").equalTo(df.col("id")), "leftsemi") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java index 7f83d3c..b95312f 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/projectssubset/ProjectsSubsetSparkJob.java @@ -58,11 +58,12 @@ public class ProjectsSubsetSparkJob implements Serializable { String projectListPath) { Dataset projectList = spark.read().textFile(projectListPath); Dataset projects; - projects = Utils.readPath(spark, inputPath, Project.class) - .map((MapFunction) p -> { - p.setId("40|" + p.getId()); - return p; - }, Encoders.bean(Project.class)); + projects = Utils + .readPath(spark, inputPath, Project.class) + .map((MapFunction) p -> { + p.setId("40|" + p.getId()); + return p; + }, Encoders.bean(Project.class)); projects .joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left") .map((MapFunction, Project>) t2 -> {