From 9a06a552c4d285cd6603f82e7a9cc62156ad772e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 17 Nov 2023 16:49:06 +0100 Subject: [PATCH] fixed last issues --- ...ExtendEoscResultWithOrganizationStep2.java | 66 ++++++++++++++----- .../dump/eosc/SparkUpdateProjectInfo.java | 5 +- ...ganizationprojectrelations_parameters.json | 6 ++ 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 422ba5c..ab8b368 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -10,7 +10,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.ForeachFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; @@ -20,8 +19,7 @@ import org.apache.spark.sql.SparkSession; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; +import org.spark_project.jetty.util.StringUtil; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.eosc.model.Affiliation; @@ -132,13 +130,14 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) .map( (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> { - if (isToBeDumpedOrg(t2._2())) - return eu.dnetlib.dhp.eosc.model.Relation - .newInstance(t2._1().getSource(), t2._1().getTarget()); - return null; + if (isToBeRemovedOrg(t2._2())) + return new eu.dnetlib.dhp.eosc.model.Relation(); + return eu.dnetlib.dhp.eosc.model.Relation + .newInstance(t2._1().getSource(), t2._1().getTarget()); + }, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) - .filter(Objects::nonNull) + .filter((FilterFunction) r -> StringUtil.isNotBlank(r.getSource())) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -163,19 +162,54 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()); Dataset resultOrganization = relations - .joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left") - .map((MapFunction, ResultOrganizations>) t2 -> getResultOrganizations(t2), Encoders.bean(ResultOrganizations.class)) - .filter(Objects::nonNull); + .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) + .map((MapFunction, ResultOrganizations>) t2 -> { + ResultOrganizations rOrg = new ResultOrganizations(); - System.out.println(resultOrganization.count()); + if (t2._2() != null) { + + rOrg.setResultId(t2._1().getTarget()); + Affiliation org = new Affiliation(); + org.setId(t2._2().getId()); + if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) { + org.setName(t2._2().getLegalname().getValue()); + } else { + org.setName(""); + } + HashMap> organizationPids = new HashMap<>(); + if (Optional.ofNullable(t2._2().getPid()).isPresent()) + t2._2().getPid().forEach(p -> { + if (!organizationPids.containsKey(p.getQualifier().getClassid())) + organizationPids.put(p.getQualifier().getClassid(), new HashSet<>()); + organizationPids.get(p.getQualifier().getClassid()).add(p.getValue()); + }); + List pids = new ArrayList<>(); + for (String key : organizationPids.keySet()) { + for (String value : organizationPids.get(key)) { + OrganizationPid pid = new OrganizationPid(); + pid.setValue(value); + pid.setType(key); + pids.add(pid); + } + } + org.setPid(pids); + rOrg.setAffiliation(org); + return rOrg; + } + return rOrg; + + }, Encoders.bean(ResultOrganizations.class)) + .filter((FilterFunction) ro -> ro.getResultId() != null); + + // resultOrganization.count(); results .joinWith(resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left") .groupByKey( (MapFunction, String>) t2 -> t2._1().getId(), Encoders.STRING()) .mapGroups( - (MapGroupsFunction, Result>) (s, it) -> addAffiliation(it) - , Encoders.bean(Result.class)) + (MapGroupsFunction, Result>) (s, it) -> addAffiliation(it), + Encoders.bean(Result.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -242,7 +276,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) { - if (isToBeDumpedOrg(org)) + if (isToBeRemovedOrg(org)) return null; eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization(); @@ -298,7 +332,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { return organization; } - private static boolean isToBeDumpedOrg(Organization org) { + private static boolean isToBeRemovedOrg(Organization org) { if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference())) return true; if (!Optional.ofNullable(org.getLegalname()).isPresent() diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java index 6127a88..340c1b6 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java @@ -112,7 +112,7 @@ public class SparkUpdateProjectInfo implements Serializable { Dataset project = Utils.readPath(spark, inputPath + "/project", Project.class); Dataset projectIds = result - .joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId")), "left") + .joinWith(resultProject, result.col("id").equalTo(resultProject.col("resultId"))) .flatMap( (FlatMapFunction, String>) t2 -> t2 ._2() @@ -136,8 +136,7 @@ public class SparkUpdateProjectInfo implements Serializable { result .joinWith( - resultProject, result.col("id").equalTo(resultProject.col("resultId")), - "left") + resultProject, result.col("id").equalTo(resultProject.col("resultId"))) .map( (MapFunction, ResultProject>) t2 -> t2._2(), Encoders.bean(ResultProject.class)) diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_organizationprojectrelations_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_organizationprojectrelations_parameters.json index 763e0df..81d594b 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_organizationprojectrelations_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_organizationprojectrelations_parameters.json @@ -19,6 +19,12 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the path used to store temporary output files", "paramRequired": false + }, + { + "paramName": "wp", + "paramLongName": "workingPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true } ]