diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index e54f02bf3..c83de6db3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -8,6 +8,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.jetbrains.annotations.NotNull; /** @@ -65,7 +66,7 @@ public class QueryCommunityAPI { } public static String propagationOrganizationCommunityMap(String baseURL) throws IOException { - return get(baseURL + "/propagationOrganizationCommunityMap"); + return get(StringUtils.substringBefore(baseURL, "community") + "propagationOrganizationCommunityMap"); } @NotNull diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index 5b948f014..ae36c0f62 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -127,7 +127,8 @@ public class Utils implements Serializable { ) { try { String response = datasourceQueryFunction.query(); - DatasourceList datasourceList = MAPPER.readValue(response, DatasourceList.class); + List datasourceList = MAPPER.readValue(response, new TypeReference>() { + }); return datasourceList.stream().map(d -> { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) @@ -256,8 +257,10 @@ public class Utils implements Serializable { if (StringUtils.isNotBlank(input.getZenodoCommunity())) c.getZenodoCommunities().add(input.getZenodoCommunity()); c.setSubjects(input.getSubjects()); - c.getSubjects().addAll(input.getFos()); - c.getSubjects().addAll(input.getSdg()); + if(input.getFos() != null) + c.getSubjects().addAll(input.getFos()); + if(input.getSdg()!=null) + c.getSubjects().addAll(input.getSdg()); if (input.getAdvancedConstraints() != null) { c.setConstraints(input.getAdvancedConstraints()); c.getConstraints().setSelection(resolver); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java deleted file mode 100644 index 30d0241c3..000000000 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/DatasourceList.java +++ /dev/null @@ -1,13 +0,0 @@ - -package eu.dnetlib.dhp.api.model; - -import java.io.Serializable; -import java.util.ArrayList; - -import eu.dnetlib.dhp.api.model.CommunityContentprovider; - -public class DatasourceList extends ArrayList implements Serializable { - public DatasourceList() { - super(); - } -} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 595f62529..1f3072ca1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -15,10 +15,8 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +111,7 @@ public class SparkBulkTagJob { spark, inputPath, outputPath, protoMap, cc); execEntityTag( spark, inputPath + "organization", outputPath + "organization", - Utils.getOrganizationCommunityMap(baseURL), + mapWithRepresentative(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION); execEntityTag( @@ -128,6 +126,38 @@ public class SparkBulkTagJob { }); } + private static CommunityEntityMap mapWithRepresentative(SparkSession spark, String relationPath + , CommunityEntityMap organizationCommunityMap) { + Dataset mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .filter("datainfo.deletedbyinference != true and relClass = 'merges") + .select("source", "target"); + + ArrayList idList = organizationCommunityMap.keySet() + .stream() + .map(k -> ModelSupport.idPrefixMap.get(Organization.class) + "|" + k).collect(Collectors.toCollection(ArrayList::new)); + + Dataset organizationIdentifiers = spark.createDataset(idList, Encoders.STRING()); + List mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi") + .select("source", "target").collectAsList(); + + for (Row mappedEntry : mappedKeys) { + String oldKey = mappedEntry.getAs("target"); + String newKey = mappedEntry.getAs("source"); + //inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which + //will be used as the newValue parameter of the BiFunction + organizationCommunityMap.merge(newKey, organizationCommunityMap.remove(oldKey), (existing, newValue) ->{ + existing.addAll(newValue); + return existing; + }); + + } + + + return organizationCommunityMap; + + } + private static void execEntityTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityEntity, Class entityClass, String classID, String calssName) { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java index 9cd3a8f82..ebbc8bd2b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java @@ -43,7 +43,8 @@ public class Community implements Serializable { } public void setSubjects(List subjects) { - this.subjects = subjects; + if(subjects != null) + this.subjects = subjects; } public List getProviders() { @@ -59,7 +60,8 @@ public class Community implements Serializable { } public void setZenodoCommunities(List zenodoCommunities) { - this.zenodoCommunities = zenodoCommunities; + if(zenodoCommunities!=null) + this.zenodoCommunities = zenodoCommunities; } public SelectionConstraints getConstraints() { diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 41bea45dd..6131eb852 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -1964,6 +1964,8 @@ public class BulkTagJobTest { throw new RuntimeException(e); } }); + + System.out.println(new ObjectMapper().writeValueAsString(Utils.getOrganizationCommunityMap(baseURL))); } }