[Subcommunities] added remapping to representative organization
This commit is contained in:
parent
0656ed568d
commit
ab96983647
|
@ -8,6 +8,7 @@ import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,7 +66,7 @@ public class QueryCommunityAPI {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String propagationOrganizationCommunityMap(String baseURL) throws IOException {
|
public static String propagationOrganizationCommunityMap(String baseURL) throws IOException {
|
||||||
return get(baseURL + "/propagationOrganizationCommunityMap");
|
return get(StringUtils.substringBefore(baseURL, "community") + "propagationOrganizationCommunityMap");
|
||||||
}
|
}
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
|
|
|
@ -127,7 +127,8 @@ public class Utils implements Serializable {
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
String response = datasourceQueryFunction.query();
|
String response = datasourceQueryFunction.query();
|
||||||
DatasourceList datasourceList = MAPPER.readValue(response, DatasourceList.class);
|
List<CommunityContentprovider> datasourceList = MAPPER.readValue(response, new TypeReference<List<CommunityContentprovider>>() {
|
||||||
|
});
|
||||||
|
|
||||||
return datasourceList.stream().map(d -> {
|
return datasourceList.stream().map(d -> {
|
||||||
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
|
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
|
||||||
|
@ -256,7 +257,9 @@ public class Utils implements Serializable {
|
||||||
if (StringUtils.isNotBlank(input.getZenodoCommunity()))
|
if (StringUtils.isNotBlank(input.getZenodoCommunity()))
|
||||||
c.getZenodoCommunities().add(input.getZenodoCommunity());
|
c.getZenodoCommunities().add(input.getZenodoCommunity());
|
||||||
c.setSubjects(input.getSubjects());
|
c.setSubjects(input.getSubjects());
|
||||||
|
if(input.getFos() != null)
|
||||||
c.getSubjects().addAll(input.getFos());
|
c.getSubjects().addAll(input.getFos());
|
||||||
|
if(input.getSdg()!=null)
|
||||||
c.getSubjects().addAll(input.getSdg());
|
c.getSubjects().addAll(input.getSdg());
|
||||||
if (input.getAdvancedConstraints() != null) {
|
if (input.getAdvancedConstraints() != null) {
|
||||||
c.setConstraints(input.getAdvancedConstraints());
|
c.setConstraints(input.getAdvancedConstraints());
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.api.model;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
|
|
||||||
|
|
||||||
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
|
|
||||||
public DatasourceList() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,10 +15,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -113,7 +111,7 @@ public class SparkBulkTagJob {
|
||||||
spark, inputPath, outputPath, protoMap, cc);
|
spark, inputPath, outputPath, protoMap, cc);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
spark, inputPath + "organization", outputPath + "organization",
|
spark, inputPath + "organization", outputPath + "organization",
|
||||||
Utils.getOrganizationCommunityMap(baseURL),
|
mapWithRepresentative(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
|
||||||
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
||||||
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
|
@ -128,6 +126,38 @@ public class SparkBulkTagJob {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static CommunityEntityMap mapWithRepresentative(SparkSession spark, String relationPath
|
||||||
|
, CommunityEntityMap organizationCommunityMap) {
|
||||||
|
Dataset<Row> mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema())
|
||||||
|
.json(relationPath)
|
||||||
|
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
|
||||||
|
.select("source", "target");
|
||||||
|
|
||||||
|
ArrayList<String> idList = organizationCommunityMap.keySet()
|
||||||
|
.stream()
|
||||||
|
.map(k -> ModelSupport.idPrefixMap.get(Organization.class) + "|" + k).collect(Collectors.toCollection(ArrayList::new));
|
||||||
|
|
||||||
|
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
||||||
|
List<Row> mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
|
||||||
|
.select("source", "target").collectAsList();
|
||||||
|
|
||||||
|
for (Row mappedEntry : mappedKeys) {
|
||||||
|
String oldKey = mappedEntry.getAs("target");
|
||||||
|
String newKey = mappedEntry.getAs("source");
|
||||||
|
//inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which
|
||||||
|
//will be used as the newValue parameter of the BiFunction
|
||||||
|
organizationCommunityMap.merge(newKey, organizationCommunityMap.remove(oldKey), (existing, newValue) ->{
|
||||||
|
existing.addAll(newValue);
|
||||||
|
return existing;
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return organizationCommunityMap;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
|
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
|
||||||
CommunityEntityMap communityEntity, Class<E> entityClass,
|
CommunityEntityMap communityEntity, Class<E> entityClass,
|
||||||
String classID, String calssName) {
|
String classID, String calssName) {
|
||||||
|
|
|
@ -43,6 +43,7 @@ public class Community implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSubjects(List<String> subjects) {
|
public void setSubjects(List<String> subjects) {
|
||||||
|
if(subjects != null)
|
||||||
this.subjects = subjects;
|
this.subjects = subjects;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,6 +60,7 @@ public class Community implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setZenodoCommunities(List<String> zenodoCommunities) {
|
public void setZenodoCommunities(List<String> zenodoCommunities) {
|
||||||
|
if(zenodoCommunities!=null)
|
||||||
this.zenodoCommunities = zenodoCommunities;
|
this.zenodoCommunities = zenodoCommunities;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1964,6 +1964,8 @@ public class BulkTagJobTest {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
System.out.println(new ObjectMapper().writeValueAsString(Utils.getOrganizationCommunityMap(baseURL)));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue