From aa48d5270779fcdb3dfdc73cd7a9afc9cae47920 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 25 Oct 2023 15:18:44 +0200 Subject: [PATCH] fixing issues --- ...ExtendEoscResultWithOrganizationStep2.java | 36 ++++++++++--------- .../oa/graph/dump/eosc/oozie_app/workflow.xml | 4 +++ ...d_result_with_organization_parameters.json | 7 +++- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 1ca6f20..af4bb19 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -63,6 +63,9 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); + final String resultType = parser.get("resultType"); + log.info("resultType: {}", resultType); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -70,14 +73,14 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, workingPath + "publicationextendedaffiliation"); - addOrganizations(spark, inputPath, workingPath, outputPath); + addOrganizations(spark, inputPath, workingPath, outputPath, resultType); }); } - private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath) { + private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath, String resultType) { Dataset results = Utils - .readPath(spark, workingPath + "publication", Result.class); + .readPath(spark, workingPath + "resultType", Result.class); Dataset relations = Utils .readPath(spark, inputPath + "/relation", Relation.class) @@ -155,20 +158,21 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .option("compression", "gzip") .json(workingPath + "publicationextendedaffiliation"); - relations - .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) - .map( - (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( - t2._2()), - Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "organization"); + Dataset organizationWithAffiliation = relations + .joinWith(results, relations.col("target").equalTo(results.col("id"))) + .map((MapFunction, Relation>) t2 -> t2._1(), Encoders.bean(Relation.class)); - relations - .joinWith(organizations, relations.col("source").equalTo(organizations.col("id"))) + organizationWithAffiliation.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id"))) + .map((MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( + t2._2()), + Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "organization"); + + organizationWithAffiliation.joinWith(organizations, organizationWithAffiliation.col("source").equalTo(organizations.col("id"))) .map( (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation .newInstance(t2._1().getSource(), t2._1().getTarget()), diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml index 077bb4f..fd1bcdc 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/oozie_app/workflow.xml @@ -166,6 +166,7 @@ --sourcePath${sourcePath} --workingPath${workingDir}/dump/ --outputPath${outputPath}/dump/ + --resultTypepublication @@ -218,6 +219,7 @@ --sourcePath${sourcePath} --workingPath${workingDir}/dump/ --outputPath${outputPath}/dump/ + --resultTypedataset @@ -271,6 +273,7 @@ --sourcePath${sourcePath} --workingPath${workingDir}/dump/ --outputPath${outputPath}/dump/ + --resultTypeotherresearchproduct @@ -323,6 +326,7 @@ --sourcePath${sourcePath} --workingPath${workingDir}/dump/ --outputPath${outputPath}/dump/ + --resultTypesoftware diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json index 3a448b6..3bd3108 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc_extend_result_with_organization_parameters.json @@ -28,7 +28,12 @@ "paramLongName":"workingPath", "paramDescription": "The path to the community map", "paramRequired": false -} +}, + { + "paramName":"rt", + "paramLongName":"resultType", + "paramDescription": "The path to the community map", + "paramRequired": false} ]