forked from D-Net/dnet-hadoop
[bulktag subcommunities] refactoring and addition of new properties
This commit is contained in:
parent
2570023590
commit
2d45f125a7
|
@ -44,15 +44,12 @@ public class QueryCommunityAPI {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static String communityDatasource(String id, String baseURL) throws IOException {
|
public static String communityDatasource(String id, String baseURL) throws IOException {
|
||||||
|
|
||||||
return get(baseURL + id + "/datasources");
|
return get(baseURL + id + "/datasources");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static String communityPropagationOrganization(String id, String baseURL) throws IOException {
|
public static String communityPropagationOrganization(String id, String baseURL) throws IOException {
|
||||||
|
|
||||||
return get(baseURL + id + "/propagationOrganizations");
|
return get(baseURL + id + "/propagationOrganizations");
|
||||||
|
@ -86,16 +83,21 @@ public class QueryCommunityAPI {
|
||||||
return body;
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL) throws IOException {
|
public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL)
|
||||||
|
throws IOException {
|
||||||
return get(baseURL + communityId + "/subcommunities/datasources?subCommunityId=" + subcommunityId);
|
return get(baseURL + communityId + "/subcommunities/datasources?subCommunityId=" + subcommunityId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String subcommunityPropagationOrganization(String communityId, String subcommunityId , String baseURL) throws IOException {
|
public static String subcommunityPropagationOrganization(String communityId, String subcommunityId, String baseURL)
|
||||||
|
throws IOException {
|
||||||
return get(baseURL + communityId + "/subcommunities/propagationOrganizations?subCommunityId=" + subcommunityId);
|
return get(baseURL + communityId + "/subcommunities/propagationOrganizations?subCommunityId=" + subcommunityId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size, String baseURL) throws IOException {
|
public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size,
|
||||||
return get(baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId=" + subcommunityId);
|
String baseURL) throws IOException {
|
||||||
|
return get(
|
||||||
|
baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId="
|
||||||
|
+ subcommunityId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String propagationDatasourceCommunityMap(String baseURL) throws IOException {
|
public static String propagationDatasourceCommunityMap(String baseURL) throws IOException {
|
||||||
|
|
|
@ -6,10 +6,10 @@ import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
@ -51,7 +51,9 @@ public class Utils implements Serializable {
|
||||||
addRelevantProjects(community.getId(), baseURL, projectMap);
|
addRelevantProjects(community.getId(), baseURL, projectMap);
|
||||||
try {
|
try {
|
||||||
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
|
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
|
||||||
subcommunities.forEach(sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap));
|
subcommunities
|
||||||
|
.forEach(
|
||||||
|
sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -62,45 +64,43 @@ public class Utils implements Serializable {
|
||||||
private static void addRelevantProjects(
|
private static void addRelevantProjects(
|
||||||
String communityId,
|
String communityId,
|
||||||
String baseURL,
|
String baseURL,
|
||||||
CommunityEntityMap communityEntityMap
|
CommunityEntityMap communityEntityMap) {
|
||||||
) {
|
|
||||||
fetchAndProcessProjects(
|
fetchAndProcessProjects(
|
||||||
(page, size) -> {
|
(page, size) -> {
|
||||||
try {
|
try {
|
||||||
return QueryCommunityAPI.communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL);
|
return QueryCommunityAPI
|
||||||
|
.communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
communityId,
|
communityId,
|
||||||
communityEntityMap
|
communityEntityMap);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void addRelevantProjects(
|
private static void addRelevantProjects(
|
||||||
String communityId,
|
String communityId,
|
||||||
String subcommunityId,
|
String subcommunityId,
|
||||||
String baseURL,
|
String baseURL,
|
||||||
CommunityEntityMap communityEntityMap
|
CommunityEntityMap communityEntityMap) {
|
||||||
) {
|
|
||||||
fetchAndProcessProjects(
|
fetchAndProcessProjects(
|
||||||
(page, size) -> {
|
(page, size) -> {
|
||||||
try {
|
try {
|
||||||
return QueryCommunityAPI.subcommunityProjects(communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL);
|
return QueryCommunityAPI
|
||||||
|
.subcommunityProjects(
|
||||||
|
communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
communityId,
|
communityId,
|
||||||
communityEntityMap
|
communityEntityMap);
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void fetchAndProcessProjects(
|
private static void fetchAndProcessProjects(
|
||||||
ProjectQueryFunction projectQueryFunction,
|
ProjectQueryFunction projectQueryFunction,
|
||||||
String communityId,
|
String communityId,
|
||||||
CommunityEntityMap communityEntityMap
|
CommunityEntityMap communityEntityMap) {
|
||||||
) {
|
|
||||||
int page = 0;
|
int page = 0;
|
||||||
final int size = 100;
|
final int size = 100;
|
||||||
ContentModel contentModel;
|
ContentModel contentModel;
|
||||||
|
@ -111,9 +111,13 @@ public class Utils implements Serializable {
|
||||||
contentModel = MAPPER.readValue(response, ContentModel.class);
|
contentModel = MAPPER.readValue(response, ContentModel.class);
|
||||||
|
|
||||||
if (!contentModel.getContent().isEmpty()) {
|
if (!contentModel.getContent().isEmpty()) {
|
||||||
contentModel.getContent().forEach(project ->communityEntityMap.add(
|
contentModel
|
||||||
ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(), communityId)
|
.getContent()
|
||||||
);
|
.forEach(
|
||||||
|
project -> communityEntityMap
|
||||||
|
.add(
|
||||||
|
ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(),
|
||||||
|
communityId));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException("Error processing projects for community: " + communityId, e);
|
throw new RuntimeException("Error processing projects for community: " + communityId, e);
|
||||||
|
@ -123,11 +127,11 @@ public class Utils implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Provider> getCommunityContentProviders(
|
private static List<Provider> getCommunityContentProviders(
|
||||||
DatasourceQueryFunction datasourceQueryFunction
|
DatasourceQueryFunction datasourceQueryFunction) {
|
||||||
) {
|
|
||||||
try {
|
try {
|
||||||
String response = datasourceQueryFunction.query();
|
String response = datasourceQueryFunction.query();
|
||||||
List<CommunityContentprovider> datasourceList = MAPPER.readValue(response, new TypeReference<List<CommunityContentprovider>>() {
|
List<CommunityContentprovider> datasourceList = MAPPER
|
||||||
|
.readValue(response, new TypeReference<List<CommunityContentprovider>>() {
|
||||||
});
|
});
|
||||||
|
|
||||||
return datasourceList.stream().map(d -> {
|
return datasourceList.stream().map(d -> {
|
||||||
|
@ -158,7 +162,8 @@ public class Utils implements Serializable {
|
||||||
List<CommunityModel> listCommunity = MAPPER
|
List<CommunityModel> listCommunity = MAPPER
|
||||||
.readValue(QueryCommunityAPI.communities(baseURL), new TypeReference<List<CommunityModel>>() {
|
.readValue(QueryCommunityAPI.communities(baseURL), new TypeReference<List<CommunityModel>>() {
|
||||||
});
|
});
|
||||||
return listCommunity.stream()
|
return listCommunity
|
||||||
|
.stream()
|
||||||
.filter(
|
.filter(
|
||||||
community -> !community.getStatus().equals("hidden") &&
|
community -> !community.getStatus().equals("hidden") &&
|
||||||
(community.getType().equals("ri") || community.getType().equals("community")))
|
(community.getType().equals("ri") || community.getType().equals("community")))
|
||||||
|
@ -191,7 +196,8 @@ public class Utils implements Serializable {
|
||||||
* @param sc
|
* @param sc
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) {
|
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId,
|
||||||
|
SubCommunityModel sc) {
|
||||||
Community c = getCommunity(sc);
|
Community c = getCommunity(sc);
|
||||||
c.setProviders(getCommunityContentProviders(() -> {
|
c.setProviders(getCommunityContentProviders(() -> {
|
||||||
try {
|
try {
|
||||||
|
@ -213,8 +219,9 @@ public class Utils implements Serializable {
|
||||||
private static List<Community> getSubCommunity(String communityId, String baseURL) {
|
private static List<Community> getSubCommunity(String communityId, String baseURL) {
|
||||||
try {
|
try {
|
||||||
List<SubCommunityModel> subcommunities = getSubcommunities(communityId, baseURL);
|
List<SubCommunityModel> subcommunities = getSubcommunities(communityId, baseURL);
|
||||||
return subcommunities.stream().map(sc ->
|
return subcommunities
|
||||||
getSubCommunityConfiguration(baseURL, communityId, sc))
|
.stream()
|
||||||
|
.map(sc -> getSubCommunityConfiguration(baseURL, communityId, sc))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
@ -244,7 +251,6 @@ public class Utils implements Serializable {
|
||||||
return new CommunityConfiguration(communities);
|
return new CommunityConfiguration(communities);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration
|
* filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration
|
||||||
* @param input
|
* @param input
|
||||||
|
@ -287,38 +293,49 @@ public class Utils implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
|
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
|
||||||
return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference<List<SubCommunityModel>>() {
|
return MAPPER
|
||||||
|
.readValue(
|
||||||
|
QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference<List<SubCommunityModel>>() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException {
|
public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException {
|
||||||
return MAPPER.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class);
|
return MAPPER
|
||||||
|
.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException {
|
public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException {
|
||||||
return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class);
|
return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getRelatedOrganizations(String communityId, String baseURL, CommunityEntityMap communityEntityMap){
|
private static void getRelatedOrganizations(String communityId, String baseURL,
|
||||||
|
CommunityEntityMap communityEntityMap) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
List<String> associatedOrgs = MAPPER
|
List<String> associatedOrgs = MAPPER
|
||||||
.readValue(
|
.readValue(
|
||||||
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), EntityIdentifierList.class);
|
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL),
|
||||||
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
|
EntityIdentifierList.class);
|
||||||
|
associatedOrgs
|
||||||
|
.forEach(
|
||||||
|
o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL, CommunityEntityMap communityEntityMap){
|
private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL,
|
||||||
|
CommunityEntityMap communityEntityMap) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
List<String> associatedOrgs = MAPPER
|
List<String> associatedOrgs = MAPPER
|
||||||
.readValue(
|
.readValue(
|
||||||
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), EntityIdentifierList.class);
|
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL),
|
||||||
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
|
EntityIdentifierList.class);
|
||||||
|
associatedOrgs
|
||||||
|
.forEach(
|
||||||
|
o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -335,7 +352,10 @@ public class Utils implements Serializable {
|
||||||
getRelatedOrganizations(community.getId(), baseURL, organizationMap);
|
getRelatedOrganizations(community.getId(), baseURL, organizationMap);
|
||||||
try {
|
try {
|
||||||
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
|
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
|
||||||
subcommunities.forEach(sc -> getRelatedOrganizations(community.getId(), sc.getSubCommunityId(), baseURL, organizationMap));
|
subcommunities
|
||||||
|
.forEach(
|
||||||
|
sc -> getRelatedOrganizations(
|
||||||
|
community.getId(), sc.getSubCommunityId(), baseURL, organizationMap));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -344,7 +364,6 @@ public class Utils implements Serializable {
|
||||||
return organizationMap;
|
return organizationMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static List<String> getCommunityIdList(String baseURL) throws IOException {
|
public static List<String> getCommunityIdList(String baseURL) throws IOException {
|
||||||
return getValidCommunities(baseURL)
|
return getValidCommunities(baseURL)
|
||||||
.stream()
|
.stream()
|
||||||
|
@ -352,7 +371,9 @@ public class Utils implements Serializable {
|
||||||
List<String> communityIds = new ArrayList<>();
|
List<String> communityIds = new ArrayList<>();
|
||||||
communityIds.add(communityModel.getId());
|
communityIds.add(communityModel.getId());
|
||||||
try {
|
try {
|
||||||
Utils.getSubcommunities(communityModel.getId(), baseURL).forEach(sc -> communityIds.add(sc.getSubCommunityId()));
|
Utils
|
||||||
|
.getSubcommunities(communityModel.getId(), baseURL)
|
||||||
|
.forEach(sc -> communityIds.add(sc.getSubCommunityId()));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -362,6 +383,4 @@ public class Utils implements Serializable {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
package eu.dnetlib.dhp.api.model;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
package eu.dnetlib.dhp.api.model;
|
||||||
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
public class CommonConfigurationModel implements Serializable {
|
public class CommonConfigurationModel implements Serializable {
|
||||||
private String zenodoCommunity;
|
private String zenodoCommunity;
|
||||||
|
|
|
@ -26,5 +26,4 @@ public class CommunityEntityMap extends HashMap<String, List<String>> {
|
||||||
super.get(key).add(value);
|
super.get(key).add(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.api.model;
|
package eu.dnetlib.dhp.api.model;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
public class SubCommunityModel extends CommonConfigurationModel implements Serializable {
|
public class SubCommunityModel extends CommonConfigurationModel implements Serializable {
|
||||||
private String subCommunityId;
|
private String subCommunityId;
|
||||||
|
|
|
@ -8,8 +8,6 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
|
|
||||||
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -31,6 +29,8 @@ import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||||
import eu.dnetlib.dhp.api.model.EntityCommunities;
|
import eu.dnetlib.dhp.api.model.EntityCommunities;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.bulktag.community.*;
|
import eu.dnetlib.dhp.bulktag.community.*;
|
||||||
|
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
|
||||||
|
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
@ -97,7 +97,6 @@ public class SparkBulkTagJob {
|
||||||
final String hdfsPath = outputPath + "masterDuplicate";
|
final String hdfsPath = outputPath + "masterDuplicate";
|
||||||
log.info("hdfsPath: {}", hdfsPath);
|
log.info("hdfsPath: {}", hdfsPath);
|
||||||
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
CommunityConfiguration cc;
|
CommunityConfiguration cc;
|
||||||
|
|
||||||
|
@ -123,7 +122,8 @@ public class SparkBulkTagJob {
|
||||||
spark, inputPath, outputPath, protoMap, cc);
|
spark, inputPath, outputPath, protoMap, cc);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
spark, inputPath + "organization", outputPath + "organization",
|
spark, inputPath + "organization", outputPath + "organization",
|
||||||
mapWithRepresentativeOrganization(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
|
mapWithRepresentativeOrganization(
|
||||||
|
spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
|
||||||
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
|
||||||
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
|
@ -133,22 +133,31 @@ public class SparkBulkTagJob {
|
||||||
execEntityTag(
|
execEntityTag(
|
||||||
spark, inputPath + "datasource", outputPath + "datasource",
|
spark, inputPath + "datasource", outputPath + "datasource",
|
||||||
mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)),
|
mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)),
|
||||||
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
|
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE,
|
||||||
|
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, CommunityEntityMap datasourceCommunityMap) {
|
private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath,
|
||||||
|
CommunityEntityMap datasourceCommunityMap) {
|
||||||
// load master-duplicate relations
|
// load master-duplicate relations
|
||||||
Dataset<MasterDuplicate> masterDuplicate = spark.read().schema(Encoders.bean(MasterDuplicate.class).schema())
|
Dataset<MasterDuplicate> masterDuplicate = spark
|
||||||
.json(masterDuplicatePath).as(Encoders.bean(MasterDuplicate.class));
|
.read()
|
||||||
|
.schema(Encoders.bean(MasterDuplicate.class).schema())
|
||||||
|
.json(masterDuplicatePath)
|
||||||
|
.as(Encoders.bean(MasterDuplicate.class));
|
||||||
// list of id for the communities related entities
|
// list of id for the communities related entities
|
||||||
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap);
|
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap);
|
||||||
|
|
||||||
// find the mapping with the representative entity if any
|
// find the mapping with the representative entity if any
|
||||||
Dataset<String> datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
Dataset<String> datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
||||||
List<Row> mappedKeys = datasourceIdentifiers.join(masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")), "left_semi")
|
List<Row> mappedKeys = datasourceIdentifiers
|
||||||
.selectExpr("masterId as source", "duplicateId as target").collectAsList();
|
.join(
|
||||||
|
masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")),
|
||||||
|
"left_semi")
|
||||||
|
.selectExpr("masterId as source", "duplicateId as target")
|
||||||
|
.collectAsList();
|
||||||
|
|
||||||
// remap the entity with its corresponding representative
|
// remap the entity with its corresponding representative
|
||||||
return remapCommunityEntityMap(datasourceCommunityMap, mappedKeys);
|
return remapCommunityEntityMap(datasourceCommunityMap, mappedKeys);
|
||||||
|
@ -156,15 +165,18 @@ public class SparkBulkTagJob {
|
||||||
|
|
||||||
private static List<String> entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) {
|
private static List<String> entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) {
|
||||||
final String prefix = idPrefixMap + "|";
|
final String prefix = idPrefixMap + "|";
|
||||||
return datasourceCommunityMap.keySet()
|
return datasourceCommunityMap
|
||||||
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.map(key -> prefix + key)
|
.map(key -> prefix + key)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath
|
private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath,
|
||||||
, CommunityEntityMap organizationCommunityMap) {
|
CommunityEntityMap organizationCommunityMap) {
|
||||||
Dataset<Row> mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema())
|
Dataset<Row> mergesRel = spark
|
||||||
|
.read()
|
||||||
|
.schema(Encoders.bean(Relation.class).schema())
|
||||||
.json(relationPath)
|
.json(relationPath)
|
||||||
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
|
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
|
||||||
.select("source", "target");
|
.select("source", "target");
|
||||||
|
@ -172,14 +184,17 @@ public class SparkBulkTagJob {
|
||||||
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap);
|
List<String> idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap);
|
||||||
|
|
||||||
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
|
||||||
List<Row> mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
|
List<Row> mappedKeys = organizationIdentifiers
|
||||||
.select("source", "target").collectAsList();
|
.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
|
||||||
|
.select("source", "target")
|
||||||
|
.collectAsList();
|
||||||
|
|
||||||
return remapCommunityEntityMap(organizationCommunityMap, mappedKeys);
|
return remapCommunityEntityMap(organizationCommunityMap, mappedKeys);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, List<Row> mappedKeys) {
|
private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap,
|
||||||
|
List<Row> mappedKeys) {
|
||||||
for (Row mappedEntry : mappedKeys) {
|
for (Row mappedEntry : mappedKeys) {
|
||||||
String oldKey = mappedEntry.getAs("target");
|
String oldKey = mappedEntry.getAs("target");
|
||||||
String newKey = mappedEntry.getAs("source");
|
String newKey = mappedEntry.getAs("source");
|
||||||
|
@ -255,7 +270,6 @@ public class SparkBulkTagJob {
|
||||||
.json(inputPath);
|
.json(inputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
|
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
|
||||||
CommunityConfiguration cc) {
|
CommunityConfiguration cc) {
|
||||||
|
|
||||||
|
@ -293,11 +307,6 @@ public class SparkBulkTagJob {
|
||||||
ProtoMap protoMappingParams,
|
ProtoMap protoMappingParams,
|
||||||
CommunityConfiguration communityConfiguration) {
|
CommunityConfiguration communityConfiguration) {
|
||||||
|
|
||||||
try {
|
|
||||||
System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams));
|
|
||||||
} catch (JsonProcessingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
ModelSupport.entityTypes
|
ModelSupport.entityTypes
|
||||||
.keySet()
|
.keySet()
|
||||||
.parallelStream()
|
.parallelStream()
|
||||||
|
|
|
@ -28,4 +28,7 @@ blacklist=empty
|
||||||
allowedpids=orcid;orcid_pending
|
allowedpids=orcid;orcid_pending
|
||||||
baseURL = https://services.openaire.eu/openaire/community/
|
baseURL = https://services.openaire.eu/openaire/community/
|
||||||
iterations=1
|
iterations=1
|
||||||
|
dbUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus
|
||||||
|
dbUser=dnet
|
||||||
|
dbPassword=dnetPwd
|
||||||
|
|
||||||
|
|
|
@ -170,6 +170,18 @@
|
||||||
<name>pathMap</name>
|
<name>pathMap</name>
|
||||||
<value>${pathMap}</value>
|
<value>${pathMap}</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dbUrl</name>
|
||||||
|
<value>${dbUrl}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dbUser</name>
|
||||||
|
<value>${dbUser}</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dbPassword</name>
|
||||||
|
<value>${dbPassword}</value>
|
||||||
|
</property>
|
||||||
</configuration>
|
</configuration>
|
||||||
</sub-workflow>
|
</sub-workflow>
|
||||||
<ok to="affiliation_inst_repo" />
|
<ok to="affiliation_inst_repo" />
|
||||||
|
|
|
@ -17,15 +17,15 @@
|
||||||
<value>undelete</value>
|
<value>undelete</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>dbUrl></name>
|
<name>dbUrl</name>
|
||||||
|
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>dbUser></name>
|
<name>dbUser</name>
|
||||||
|
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>dbPassword></name>
|
<name>dbPassword</name>
|
||||||
|
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
|
@ -8,10 +8,6 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
||||||
import eu.dnetlib.dhp.api.Utils;
|
|
||||||
import eu.dnetlib.dhp.api.model.SubCommunityModel;
|
|
||||||
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -31,9 +27,13 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.api.Utils;
|
||||||
|
import eu.dnetlib.dhp.api.model.SubCommunityModel;
|
||||||
|
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
|
||||||
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
|
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue