fixed NPE, moved class to generate tar

This commit is contained in:
Miriam Baglioni 2023-08-07 13:56:58 +02:00
parent e9aca6b702
commit 24be522e7c
6 changed files with 50 additions and 33 deletions

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.common;
package eu.dnetlib.dhp.oa.common;
import java.io.BufferedInputStream;
import java.io.IOException;

View File

@ -15,7 +15,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MakeTarArchive;
import eu.dnetlib.dhp.oa.common.MakeTarArchive;
public class MakeTar implements Serializable {

View File

@ -515,14 +515,20 @@ public class ResultMapper implements Serializable {
setCommonValue(i, instance);
instance
.setCollectedfrom(
CfHbKeyValue
.newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue()));
if (Optional.ofNullable(i.getCollectedfrom()).isPresent() &&
Optional.ofNullable(i.getCollectedfrom().getKey()).isPresent() &&
StringUtils.isNotBlank(i.getCollectedfrom().getKey()))
instance
.setCollectedfrom(
CfHbKeyValue
.newInstance(i.getCollectedfrom().getKey().substring(3), i.getCollectedfrom().getValue()));
instance
.setHostedby(
CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue()));
if (Optional.ofNullable(i.getHostedby()).isPresent() &&
Optional.ofNullable(i.getHostedby().getKey()).isPresent() &&
StringUtils.isNotBlank(i.getHostedby().getKey()))
instance
.setHostedby(
CfHbKeyValue.newInstance(i.getHostedby().getKey().substring(3), i.getHostedby().getValue()));
return instance;

View File

@ -219,9 +219,9 @@ public class QueryInformationSystem {
}
if (funding.toLowerCase().contains("h2020")) {
nsp = "corda__h2020::";
} else if (funding.toLowerCase().contains("he")){
} else if (funding.toLowerCase().contains("he")) {
nsp = "corda_____he::";
}else{
} else {
nsp = "corda_______::";
}
break;

View File

@ -9,12 +9,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,23 +66,36 @@ public class SparkSelectValidRelationsJob implements Serializable {
}
private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath){
final StructType structureSchema = new StructType().fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
private static void selectValidRelation2(SparkSession spark, String inputPath, String outputPath) {
final StructType structureSchema = new StructType()
.fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
org.apache.spark.sql.Dataset<Row> df =spark.createDataFrame(new ArrayList<Row>(), structureSchema);
List<String> entities = Arrays.asList("publication", "dataset","otherresearchproduct","software","organization","project","datasource");
for(String e : entities)
df = df.union(spark.read().schema(structureSchema).json(inputPath + "/" + e).filter("dataInfo.deletedbyinference != true"));
org.apache.spark.sql.Dataset<Row> df = spark.createDataFrame(new ArrayList<Row>(), structureSchema);
List<String> entities = Arrays
.asList(
"publication", "dataset", "otherresearchproduct", "software", "organization", "project", "datasource");
for (String e : entities)
df = df
.union(
spark
.read()
.schema(structureSchema)
.json(inputPath + "/" + e)
.filter("dataInfo.deletedbyinference != true and dataInfo.invisible != true"));
org.apache.spark.sql.Dataset<Row> relations = spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputPath + "/relation")
.filter("dataInfo.deletedbyinference == false");
org.apache.spark.sql.Dataset<Row> relations = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(inputPath + "/relation")
.filter("dataInfo.deletedbyinference == false");
relations.join(df, relations.col("source").equalTo(df.col("id")), "leftsemi")
.join(df, relations.col("target").equalTo(df.col("id")), "leftsemi")
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
relations
.join(df, relations.col("source").equalTo(df.col("id")), "leftsemi")
.join(df, relations.col("target").equalTo(df.col("id")), "leftsemi")
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
}

View File

@ -58,11 +58,12 @@ public class ProjectsSubsetSparkJob implements Serializable {
String projectListPath) {
Dataset<String> projectList = spark.read().textFile(projectListPath);
Dataset<Project> projects;
projects = Utils.readPath(spark, inputPath, Project.class)
.map((MapFunction<Project, Project>) p -> {
p.setId("40|" + p.getId());
return p;
}, Encoders.bean(Project.class));
projects = Utils
.readPath(spark, inputPath, Project.class)
.map((MapFunction<Project, Project>) p -> {
p.setId("40|" + p.getId());
return p;
}, Encoders.bean(Project.class));
projects
.joinWith(projectList, projects.col("id").equalTo(projectList.col("value")), "left")
.map((MapFunction<Tuple2<Project, String>, Project>) t2 -> {