diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java index 8d106ef..be78482 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/ExtendEoscResultWithOrganizationStep2.java @@ -76,13 +76,13 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, workingPath + resultType + "extendedaffiliation"); - addOrganizations(spark, inputPath, workingPath, outputPath, resultType); - dumpOrganizationAndRelations(spark, inputPath, workingPath, outputPath, resultType); + addOrganizations(spark, inputPath, workingPath, resultType); + dumpOrganizationAndRelations(spark, inputPath, workingPath, resultType); }); } private static void dumpOrganizationAndRelations(SparkSession spark, String inputPath, String workingPath, - String outputPath, String resultType) { + String resultType) { Dataset relation = Utils .readPath(spark, inputPath + "/relation", Relation.class) .filter( @@ -102,36 +102,46 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { .map((MapFunction, Relation>) t2 -> t2._2(), Encoders.bean(Relation.class)); // from eoscRelation select the organization - eoscRelation + Dataset organizationIds = eoscRelation .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) .map( - (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( + (MapFunction, String>) t2 -> t2._2().getId(), + Encoders.STRING()) + .distinct(); + + organizationIds + .joinWith(organization, organizationIds.col("value").equalTo(organization.col("id"))) + .map( + (MapFunction, eu.dnetlib.dhp.eosc.model.Organization>) t2 -> mapOrganization( t2._2()), Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) - .groupByKey((MapFunction) o -> o.getId(), Encoders.STRING()) - .mapGroups( - (MapGroupsFunction) ( - k, v) -> v.next(), - Encoders.bean(eu.dnetlib.dhp.eosc.model.Organization.class)) + + .filter(Objects::nonNull) + .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "organization"); + .json(workingPath + resultType + "/organization"); eoscRelation .joinWith(organization, eoscRelation.col("target").equalTo(organization.col("id"))) .map( - (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> eu.dnetlib.dhp.eosc.model.Relation - .newInstance(t2._1().getSource(), t2._1().getTarget()), + (MapFunction, eu.dnetlib.dhp.eosc.model.Relation>) t2 -> { + if (isToBeDumpedOrg(t2._2())) + return eu.dnetlib.dhp.eosc.model.Relation + .newInstance(t2._1().getSource(), t2._1().getTarget()); + return null; + }, Encoders.bean(eu.dnetlib.dhp.eosc.model.Relation.class)) + .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath + "resultOrganization"); + .json(workingPath + resultType + "/resultOrganization"); } - private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String outputPath, + private static void addOrganizations(SparkSession spark, String inputPath, String workingPath, String resultType) { Dataset results = Utils @@ -219,10 +229,7 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { private static eu.dnetlib.dhp.eosc.model.Organization mapOrganization(Organization org) { - if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference())) - return null; - if (!Optional.ofNullable(org.getLegalname()).isPresent() - && !Optional.ofNullable(org.getLegalshortname()).isPresent()) + if (isToBeDumpedOrg(org)) return null; eu.dnetlib.dhp.eosc.model.Organization organization = new eu.dnetlib.dhp.eosc.model.Organization(); @@ -278,4 +285,13 @@ public class ExtendEoscResultWithOrganizationStep2 implements Serializable { return organization; } + private static boolean isToBeDumpedOrg(Organization org) { + if (Boolean.TRUE.equals(org.getDataInfo().getDeletedbyinference())) + return true; + if (!Optional.ofNullable(org.getLegalname()).isPresent() + && !Optional.ofNullable(org.getLegalshortname()).isPresent()) + return true; + return false; + } + } diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/job.properties b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/job.properties index 10c14b2..612a16e 100644 --- a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/job.properties +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/eosc/job.properties @@ -12,4 +12,5 @@ depositionId=6616871 removeSet=merges;isMergedIn postgresURL=jdbc:postgresql://postgresql.services.openaire.eu:5432/dnet_openaireplus postgresUser=dnet -postgresPassword=dnetPwd \ No newline at end of file +postgresPassword=dnetPwd +isLookUpUrl=http://services.openaire.eu:8280/is/services/isLookUp?wsdl \ No newline at end of file