From 8c05f496659c48558973cdc25bf1b8a58813b64d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 9 May 2023 10:48:34 +0200 Subject: [PATCH] moved the version as it was before the change --- .../PrepareResultCommunitySet.java | 45 +++++++++---------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index 19b985964..0fc8cb390 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -72,33 +72,28 @@ public class PrepareResultCommunitySet { String outputPath, OrganizationMap organizationMap) { - Dataset relationAffiliation = readPath(spark, inputPath, Relation.class) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equalsIgnoreCase(ModelConstants.HAS_AUTHOR_INSTITUTION)); + Dataset relation = readPath(spark, inputPath, Relation.class); + relation.createOrReplaceTempView("relation"); - Dataset relationOrganization = readPath(spark, inputPath, Relation.class) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equalsIgnoreCase(ModelConstants.MERGES)); + String query = "SELECT result_organization.source resultId, result_organization.target orgId, org_set merges " + + "FROM (SELECT source, target " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND lower(relClass) = '" + + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + + "') result_organization " + + "LEFT JOIN (SELECT source, collect_set(target) org_set " + + " FROM relation " + + " WHERE datainfo.deletedbyinference = false " + + " AND lower(relClass) = '" + + ModelConstants.MERGES.toLowerCase() + + "' " + + " GROUP BY source) organization_organization " + + "ON result_organization.target = organization_organization.source "; - Dataset result_organizationset = relationAffiliation - .joinWith( - relationOrganization, - relationAffiliation.col("target").equalTo(relationOrganization.col("source")), - "left") - .groupByKey((MapFunction, String>) t2 -> t2._2().getSource(), Encoders.STRING()) - .mapGroups((MapGroupsFunction, ResultOrganizations>) (k, it) -> { - ResultOrganizations rOrgs = new ResultOrganizations(); - rOrgs.setOrgId(k); - Tuple2 first = it.next(); - rOrgs.setResultId(first._1().getSource()); - ArrayList merges = new ArrayList<>(); - merges.add(first._2().getTarget()); - it.forEachRemaining(t -> merges.add(t._2().getTarget())); - rOrgs.setMerges(merges); - return rOrgs; - }, Encoders.bean(ResultOrganizations.class)); + Dataset result_organizationset = spark + .sql(query) + .as(Encoders.bean(ResultOrganizations.class)); result_organizationset .map(mapResultCommunityFn(organizationMap), Encoders.bean(ResultCommunityList.class))