From c0729ac279db8df75d63474c805b4586998220c7 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 21 Nov 2024 14:36:26 +0100 Subject: [PATCH] [Subcommunities] added remapping to master datasource --- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 72 ++++++++++++++----- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 1f3072ca1..e3dc2d140 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -8,6 +8,8 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -86,6 +88,15 @@ public class SparkBulkTagJob { log.info("protoMap: {}", temp); ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class); log.info("pathMap: {}", new Gson().toJson(protoMap)); + final String dbUrl = parser.get("dbUrl"); + log.info("dbUrl: {}", dbUrl); + final String dbUser = parser.get("dbUser"); + log.info("dbUser: {}", dbUser); + final String dbPassword = parser.get("dbPassword"); + log.info("dbPassword: {}", dbPassword); + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + SparkConf conf = new SparkConf(); CommunityConfiguration cc; @@ -107,11 +118,12 @@ public class SparkBulkTagJob { isSparkSessionManaged, spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); + ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode); execBulkTag( spark, inputPath, outputPath, protoMap, cc); execEntityTag( spark, inputPath + "organization", outputPath + "organization", - mapWithRepresentative(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), + mapWithRepresentativeOrganization(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION); execEntityTag( @@ -120,42 +132,66 @@ public class SparkBulkTagJob { Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT); execEntityTag( spark, inputPath + "datasource", outputPath + "datasource", - Utils.getDatasourceCommunityMap(baseURL), + mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)), Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE); }); } - private static CommunityEntityMap mapWithRepresentative(SparkSession spark, String relationPath + private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, CommunityEntityMap datasourceCommunityMap) { + //load master-duplicate relations + Dataset masterDuplicate = spark.read().schema(Encoders.bean(MasterDuplicate.class).schema()) + .json(masterDuplicatePath).as(Encoders.bean(MasterDuplicate.class)); + //list of id for the communities related entities + List idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap); + + //find the mapping with the representative entity if any + Dataset datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING()); + List mappedKeys = datasourceIdentifiers.join(masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")), "left_semi") + .selectExpr("masterId as source", "duplicateId as target").collectAsList(); + + //remap the entity with its corresponding representative + return remapCommunityEntityMap(datasourceCommunityMap,mappedKeys); + } + + private static List entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) { + final String prefix = idPrefixMap + "|"; + return datasourceCommunityMap.keySet() + .stream() + .map(key -> prefix + key) + .collect(Collectors.toList()); + } + + private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath , CommunityEntityMap organizationCommunityMap) { Dataset mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema()) .json(relationPath) .filter("datainfo.deletedbyinference != true and relClass = 'merges") .select("source", "target"); - ArrayList idList = organizationCommunityMap.keySet() - .stream() - .map(k -> ModelSupport.idPrefixMap.get(Organization.class) + "|" + k).collect(Collectors.toCollection(ArrayList::new)); + List idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap); Dataset organizationIdentifiers = spark.createDataset(idList, Encoders.STRING()); List mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi") .select("source", "target").collectAsList(); - for (Row mappedEntry : mappedKeys) { - String oldKey = mappedEntry.getAs("target"); - String newKey = mappedEntry.getAs("source"); - //inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which - //will be used as the newValue parameter of the BiFunction - organizationCommunityMap.merge(newKey, organizationCommunityMap.remove(oldKey), (existing, newValue) ->{ - existing.addAll(newValue); - return existing; - }); + return remapCommunityEntityMap(organizationCommunityMap, mappedKeys); - } + } + private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, List mappedKeys) { + for (Row mappedEntry : mappedKeys) { + String oldKey = mappedEntry.getAs("target"); + String newKey = mappedEntry.getAs("source"); + //inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which + //will be used as the newValue parameter of the BiFunction + entityCommunityMap.merge(newKey, entityCommunityMap.remove(oldKey), (existing, newValue) ->{ + existing.addAll(newValue); + return existing; + }); - return organizationCommunityMap; - + } + return entityCommunityMap; } private static void execEntityTag(SparkSession spark, String inputPath, String outputPath,