From db388ebc21c2dddec06d907845ccce94385248ec Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 16 Nov 2023 12:17:42 +0100 Subject: [PATCH] - --- ...ExtendEoscResultWithOrganizationStep2.java | 127 ++++++++++-------- .../dump/eosc/SelectEoscResultsJobStep1.java | 2 + .../eosc/SparkDumpOrganizationProject.java | 74 +++++++++- .../dump/eosc/SparkUpdateProjectInfo.java | 4 +- .../oa/graph/dump/eosc/oozie_app/workflow.xml | 1 + 5 files changed, 145 insertions(+), 63 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index be78482..422ba5c 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -17,6 +17,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,10 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, workingPath + resultType + "extendedaffiliation"); + Utils.removeOutputDir(spark, workingPath + resultType + "organization"); + Utils.removeOutputDir(spark, workingPath + resultType + "resultOrganization"); addOrganizations(spark, inputPath, workingPath, resultType); - dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType); + dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType); }); } @@ -101,6 +104,8 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .joinWith(relation, result.col("id").equalTo(relation.col("source"))) .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); + log.info("Number of affiliation relation for " + resultType + " = " + eoscRelation.count()); + // from eoscRelation select the organization Dataset organizationIds = eoscRelation .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) @@ -121,7 +126,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath + resultType + "/organization"); + .json(workingPath + resultType + "organization"); eoscRelation .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) @@ -137,7 +142,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(workingPath + resultType + "/resultOrganization"); + .json(workingPath + resultType + "resultOrganization"); } @@ -159,67 +164,18 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { Dataset resultOrganization = relations .joinWith(organizations, relations.col("source").equalTo(organizations.col("id")), "left") - .map((MapFunction, ResultOrganizations>) t2 -> { - if (t2._2() != null) { - ResultOrganizations rOrg = new ResultOrganizations(); - rOrg.setResultId(t2._1().getTarget()); - Affiliation org = new Affiliation(); - org.setId(t2._2().getId()); - if (Optional.ofNullable(t2._2().getLegalname()).isPresent()) { - org.setName(t2._2().getLegalname().getValue()); - } else { - org.setName(""); - } - HashMap> organizationPids = new HashMap<>(); - if (Optional.ofNullable(t2._2().getPid()).isPresent()) - t2._2().getPid().forEach(p -> { - if (!organizationPids.containsKey(p.getQualifier().getClassid())) - organizationPids.put(p.getQualifier().getClassid(), new HashSet<>()); - organizationPids.get(p.getQualifier().getClassid()).add(p.getValue()); - }); - List pids = new ArrayList<>(); - for (String key : organizationPids.keySet()) { - for (String value : organizationPids.get(key)) { - OrganizationPid pid = new OrganizationPid(); - pid.setValue(value); - pid.setType(key); - pids.add(pid); - } - } - org.setPid(pids); - rOrg.setAffiliation(org); - return rOrg; - } - return null; - - }, Encoders.bean(ResultOrganizations.class)) + .map((MapFunction, ResultOrganizations>) t2 -> getResultOrganizations(t2), Encoders.bean(ResultOrganizations.class)) .filter(Objects::nonNull); + System.out.println(resultOrganization.count()); + results .joinWith(resultOrganization, results.col("id").equalTo(resultOrganization.col("resultId")), "left") .groupByKey( (MapFunction, String>) t2 -> t2._1().getId(), Encoders.STRING()) .mapGroups( - (MapGroupsFunction, Result>) (s, it) -> { - Tuple2 first = it.next(); - if (first._2() == null) { - return first._1(); - } - Result ret = first._1(); - List affiliation = new ArrayList<>(); - Set alreadyInsertedAffiliations = new HashSet<>(); - affiliation.add(first._2().getAffiliation()); - alreadyInsertedAffiliations.add(first._2().getAffiliation().getId()); - it.forEachRemaining(res -> { - if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) { - affiliation.add(res._2().getAffiliation()); - alreadyInsertedAffiliations.add(res._2().getAffiliation().getId()); - } - - }); - ret.setAffiliation(affiliation); - return ret; - }, Encoders.bean(Result.class)) + (MapGroupsFunction, Result>) (s, it) -> addAffiliation(it) + , Encoders.bean(Result.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -227,6 +183,63 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { } + @Nullable + private static ResultOrganizations getResultOrganizations(Tuple2 t2) { + if (t2._2() != null) { + Organization organization = t2._2(); + ResultOrganizations rOrg = new ResultOrganizations(); + rOrg.setResultId(t2._1().getTarget()); + Affiliation org = new Affiliation(); + org.setId(organization.getId()); + if (Optional.ofNullable(organization.getLegalname()).isPresent()) { + org.setName(organization.getLegalname().getValue()); + } else { + org.setName(""); + } + HashMap> organizationPids = new HashMap<>(); + if (Optional.ofNullable(organization.getPid()).isPresent()) + organization.getPid().forEach(p -> { + if (!organizationPids.containsKey(p.getQualifier().getClassid())) + organizationPids.put(p.getQualifier().getClassid(), new HashSet<>()); + organizationPids.get(p.getQualifier().getClassid()).add(p.getValue()); + }); + List pids = new ArrayList<>(); + for (String key : organizationPids.keySet()) { + for (String value : organizationPids.get(key)) { + OrganizationPid pid = new OrganizationPid(); + pid.setValue(value); + pid.setType(key); + pids.add(pid); + } + } + org.setPid(pids); + rOrg.setAffiliation(org); + return rOrg; + } + return null; + } + + private static Result addAffiliation(Iterator> it) { + Tuple2 first = it.next(); + if (first._2() == null) { + return first._1(); + } + Result ret = first._1(); + List affiliation = new ArrayList<>(); + Set alreadyInsertedAffiliations = new HashSet<>(); + affiliation.add(first._2().getAffiliation()); + alreadyInsertedAffiliations.add(first._2().getAffiliation().getId()); + it.forEachRemaining(res -> { + if (!alreadyInsertedAffiliations.contains(res._2().getAffiliation().getId())) { + affiliation.add(res._2().getAffiliation()); + alreadyInsertedAffiliations.add(res._2().getAffiliation().getId()); + } + + }); + ret.setAffiliation(affiliation); + return ret; + } + private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) { if (isToBeDumpedOrg(org)) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java index 65fcd54..d7f0ece 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -95,6 +96,7 @@ public class SelectEoscResultsJobStep1 implements Serializable { (MapFunction) r -> (Result) ResultMapper .map(r, communityMap, df), Encoders.bean(Result.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java index 9f94535..57211e3 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkDumpOrganizationProject.java @@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -57,6 +58,9 @@ public class SparkDumpOrganizationProject implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final String workingPath = parser.get("workingPath"); + log.info("workingPath: {}", workingPath); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -64,16 +68,42 @@ public class SparkDumpOrganizationProject implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath + "/organizationProject"); - dumpRelation(spark, inputPath, outputPath); + dumpRelation(spark, inputPath, outputPath, workingPath); }); } - private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) { - Dataset organization = Utils.readPath(spark, outputPath + "organization", Organization.class); + private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, String workingPath) { + Dataset organization = Utils + .readPath(spark, workingPath + "publicationorganization", Organization.class) + .union(Utils.readPath(spark, workingPath + "datasetorganization", Organization.class)) + .union(Utils.readPath(spark, workingPath + "softwareorganization", Organization.class)) + .union(Utils.readPath(spark, workingPath + "otherresearchproductorganization", Organization.class)) + .groupByKey((MapFunction) o -> o.getId(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, v) -> v.next(), + Encoders.bean(Organization.class)); - Dataset project = Utils.readPath(spark, outputPath + "project", Project.class); + Dataset project = Utils + .readPath(spark, workingPath + "publicationproject", Project.class) + .union(Utils.readPath(spark, workingPath + "datasetproject", Project.class)) + .union(Utils.readPath(spark, workingPath + "softwareproject", Project.class)) + .union(Utils.readPath(spark, workingPath + "otherresearchproductproject", Project.class)) + .groupByKey((MapFunction) o -> o.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Project.class)); + + organization + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "organization"); + + project + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "project"); Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) @@ -96,6 +126,42 @@ public class SparkDumpOrganizationProject implements Serializable { .option("compression", "gzip") .json(outputPath + "organizationProject"); + Utils + .readPath(spark, workingPath + "publicationresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class) + .union( + Utils + .readPath( + spark, workingPath + "datasetresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class)) + .union( + Utils + .readPath( + spark, workingPath + "softwareresultOrganization", eu.dnetlib.dhp.eosc.model.Relation.class)) + .union( + Utils + .readPath( + spark, workingPath + "otherresearchproductresultOrganization", + eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "resultOrganization"); + + Utils + .readPath(spark, workingPath + "publicationresultProject", eu.dnetlib.dhp.eosc.model.Relation.class) + .union( + Utils.readPath(spark, workingPath + "datasetresultProject", eu.dnetlib.dhp.eosc.model.Relation.class)) + .union( + Utils.readPath(spark, workingPath + "softwareresultProject", eu.dnetlib.dhp.eosc.model.Relation.class)) + .union( + Utils + .readPath( + spark, workingPath + "otherresearchproductresultProject", + eu.dnetlib.dhp.eosc.model.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "resultProject"); + } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java index 0f0859e..6127a88 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SparkUpdateProjectInfo.java @@ -132,7 +132,7 @@ public class SparkUpdateProjectInfo implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "project"); + .json(workingPath + resultType + "project"); result .joinWith( @@ -152,7 +152,7 @@ public class SparkUpdateProjectInfo implements Serializable { .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "resultProject"); + .json(workingPath + resultType + "resultProject"); } private static eu.dnetlib.dhp.eosc.model.Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml index 1513c70..9b34569 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml @@ -632,6 +632,7 @@ --sourcePath${sourcePath} --outputPath${outputPath}/dump/ + --workingPath${workingDir}/dump/