[Subcommunities] added remapping to master datasource
This commit is contained in:
parent
ab96983647
commit
c0729ac279
|
@ -8,6 +8,8 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
|
||||||
|
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -86,6 +88,15 @@ public class SparkBulkTagJob {
|
||||||
log.info("protoMap: {}", temp);
|
log.info("protoMap: {}", temp);
|
||||||
ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class);
|
ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class);
|
||||||
log.info("pathMap: {}", new Gson().toJson(protoMap));
|
log.info("pathMap: {}", new Gson().toJson(protoMap));
|
||||||
|
final String dbUrl = parser.get("dbUrl");
|
||||||
|
log.info("dbUrl: {}", dbUrl);
|
||||||
|
final String dbUser = parser.get("dbUser");
|
||||||
|
log.info("dbUser: {}", dbUser);
|
||||||
|
final String dbPassword = parser.get("dbPassword");
|
||||||
|
log.info("dbPassword: {}", dbPassword);
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
CommunityConfiguration cc;
|
CommunityConfiguration cc;
|
||||||
|
@ -107,11 +118,12 @@ public class SparkBulkTagJob {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
|
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
|
||||||
|
ReadDatasourceMasterDuplicateFromDB.execute(dbUrl, dbUser, dbPassword, hdfsPath, hdfsNameNode);
|
||||||
execBulkTag(
|
execBulkTag(
|
||||||
spark, inputPath, outputPath, protoMap, cc);
|
spark, inputPath, outputPath, protoMap, cc);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
spark, inputPath + "organization", outputPath + "organization",
|
spark, inputPath + "organization", outputPath + "organization",
|
||||||
mapWithRepresentative(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
|
mapWithRepresentativeOrganization(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
|
||||||
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
||||||
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
|
@ -120,42 +132,66 @@ public class SparkBulkTagJob {
|
||||||
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
|
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
spark, inputPath + "datasource", outputPath + "datasource",
|
spark, inputPath + "datasource", outputPath + "datasource",
|
||||||
Utils.getDatasourceCommunityMap(baseURL),
|
mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)),
|
||||||
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
|
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CommunityEntityMap mapWithRepresentative(SparkSession spark, String relationPath
|
private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, CommunityEntityMap datasourceCommunityMap) {
|
||||||
|
//load master-duplicate relations
|
||||||
|
Dataset<MasterDuplicate> masterDuplicate = spark.read().schema(Encoders.bean(MasterDuplicate.class).schema())
|
||||||
|
.json(masterDuplicatePath).as(Encoders.bean(MasterDuplicate.class));
|
||||||
|
//list of id for the communities related entities
|
||||||
|
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap);
|
||||||
|
|
||||||
|
//find the mapping with the representative entity if any
|
||||||
|
Dataset<String> datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
||||||
|
List<Row> mappedKeys = datasourceIdentifiers.join(masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")), "left_semi")
|
||||||
|
.selectExpr("masterId as source", "duplicateId as target").collectAsList();
|
||||||
|
|
||||||
|
//remap the entity with its corresponding representative
|
||||||
|
return remapCommunityEntityMap(datasourceCommunityMap,mappedKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) {
|
||||||
|
final String prefix = idPrefixMap + "|";
|
||||||
|
return datasourceCommunityMap.keySet()
|
||||||
|
.stream()
|
||||||
|
.map(key -> prefix + key)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath
|
||||||
, CommunityEntityMap organizationCommunityMap) {
|
, CommunityEntityMap organizationCommunityMap) {
|
||||||
Dataset<Row> mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema())
|
Dataset<Row> mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema())
|
||||||
.json(relationPath)
|
.json(relationPath)
|
||||||
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
|
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
|
||||||
.select("source", "target");
|
.select("source", "target");
|
||||||
|
|
||||||
ArrayList<String> idList = organizationCommunityMap.keySet()
|
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap);
|
||||||
.stream()
|
|
||||||
.map(k -> ModelSupport.idPrefixMap.get(Organization.class) + "|" + k).collect(Collectors.toCollection(ArrayList::new));
|
|
||||||
|
|
||||||
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
||||||
List<Row> mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
|
List<Row> mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
|
||||||
.select("source", "target").collectAsList();
|
.select("source", "target").collectAsList();
|
||||||
|
|
||||||
for (Row mappedEntry : mappedKeys) {
|
return remapCommunityEntityMap(organizationCommunityMap, mappedKeys);
|
||||||
String oldKey = mappedEntry.getAs("target");
|
|
||||||
String newKey = mappedEntry.getAs("source");
|
|
||||||
//inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which
|
|
||||||
//will be used as the newValue parameter of the BiFunction
|
|
||||||
organizationCommunityMap.merge(newKey, organizationCommunityMap.remove(oldKey), (existing, newValue) ->{
|
|
||||||
existing.addAll(newValue);
|
|
||||||
return existing;
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, List<Row> mappedKeys) {
|
||||||
|
for (Row mappedEntry : mappedKeys) {
|
||||||
|
String oldKey = mappedEntry.getAs("target");
|
||||||
|
String newKey = mappedEntry.getAs("source");
|
||||||
|
//inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which
|
||||||
|
//will be used as the newValue parameter of the BiFunction
|
||||||
|
entityCommunityMap.merge(newKey, entityCommunityMap.remove(oldKey), (existing, newValue) ->{
|
||||||
|
existing.addAll(newValue);
|
||||||
|
return existing;
|
||||||
|
});
|
||||||
|
|
||||||
return organizationCommunityMap;
|
}
|
||||||
|
return entityCommunityMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
|
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
|
||||||
|
|
Loading…
Reference in New Issue