[CommunityAPI] use of new access point to directly get the organizationCommunityMap and the datasouceCommunityMap for all the communities and subcommunities. To be changed in the propagation code when implemented in the APIs

This commit is contained in:
Miriam Baglioni 2024-11-20 17:44:33 +01:00
parent 3081cad1d3
commit 896de42598
10 changed files with 174 additions and 202 deletions

View File

@ -37,12 +37,6 @@ public class QueryCommunityAPI {
}
public static String community(String id, String baseURL) throws IOException {
return get(baseURL + id);
}
public static String subcommunities(String communityId, String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities");
@ -56,6 +50,8 @@ public class QueryCommunityAPI {
}
public static String communityPropagationOrganization(String id, String baseURL) throws IOException {
return get(baseURL + id + "/propagationOrganizations");
@ -68,6 +64,10 @@ public class QueryCommunityAPI {
}
public static String propagationOrganizationCommunityMap(String baseURL) throws IOException {
return get(baseURL + "/propagationOrganizationCommunityMap");
}
@NotNull
private static String getBody(HttpURLConnection conn) throws IOException {
String body = "{}";
@ -96,4 +96,8 @@ public class QueryCommunityAPI {
public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size, String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId=" + subcommunityId);
}
public static String propagationDatasourceCommunityMap(String baseURL) throws IOException {
return get(baseURL + "/propagationDatasourceCommunityMap");
}
}

View File

@ -6,6 +6,7 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
@ -42,7 +43,7 @@ public class Utils implements Serializable {
}
//PROJECT METHODS
public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException {
public static CommunityEntityMap getProjectCommunityMap(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
getValidCommunities(baseURL)
@ -110,13 +111,8 @@ public class Utils implements Serializable {
contentModel = MAPPER.readValue(response, ContentModel.class);
if (!contentModel.getContent().isEmpty()) {
contentModel.getContent().forEach(project ->
updateEntityMap(
communityId,
project.getOpenaireId(),
communityEntityMap,
ModelSupport.getIdPrefix(Project.class)
)
contentModel.getContent().forEach(project ->communityEntityMap.add(
ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(), communityId)
);
}
} catch (IOException e) {
@ -126,7 +122,7 @@ public class Utils implements Serializable {
} while (!contentModel.getLast());
}
private static List<Provider> addRelevantDatasources(
private static List<Provider> getCommunityContentProviders(
DatasourceQueryFunction datasourceQueryFunction
) {
try {
@ -151,39 +147,92 @@ public class Utils implements Serializable {
}
/**
* Select the communties with status different from hidden
* @param baseURL the base url of the API to be queried
* @return the list of communities in the CommunityModel class
* @throws IOException
*/
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
List<CommunityModel> listCommunity = MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), new TypeReference<List<CommunityModel>>() {
});
return listCommunity.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
}
/**
* Sets the Community information from the replies of the communityAPIs
* @param baseURL the base url of the API to be queried
* @param communityModel the communityModel as replied by the APIs
* @return the community set with information from the community model and for the content providers
*/
private static Community getCommunity(String baseURL, CommunityModel communityModel) {
Community community = getCommunity(communityModel);
community.setProviders(getCommunityContentProviders(()->{
try {
return QueryCommunityAPI.communityDatasource(community.getId(),baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return community;
}
/**
* extends the community configuration for the subcommunity by adding the content providers
* @param baseURL
* @param communityId
* @param sc
* @return
*/
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) {
Community c = getCommunity(sc);
c.setProviders(getCommunityContentProviders(()->{
try {
return QueryCommunityAPI.subcommunityDatasource(communityId, sc.getSubCommunityId(), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return c;
}
/**
* Gets all the sub-comminities fir a given community identifier
* @param communityId
* @param baseURL
* @return
*/
private static List<Community> getSubCommunity(String communityId, String baseURL){
try {
List<SubCommunityModel> subcommunities = getSubcommunities(communityId, baseURL);
return subcommunities.stream().map(sc ->
getSubCommunityConfiguration(baseURL, communityId, sc))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* prepare the configuration for the communities and sub-communities
* @param baseURL
* @return
* @throws IOException
*/
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<CommunityModel> communityList = getValidCommunities(baseURL);
List<Community> validCommunities = new ArrayList<>();
communityList
.forEach(community -> {
try {
CommunityModel cm = MAPPER
.readValue(QueryCommunityAPI.community(community.getId(), baseURL), CommunityModel.class);
validCommunities.add(getCommunity(cm));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community->community.setProviders(addRelevantDatasources(()->{
try {
return QueryCommunityAPI.communityDatasource(community.getId(),baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
})));
//add subcommunities information if any
communityList.forEach(community -> {
try {
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
subcommunities.forEach(sc ->
validCommunities.add(getSubCommunityConfiguration(baseURL, community.getId(), sc)));
} catch (IOException e) {
throw new RuntimeException(e);
}
validCommunities.add(getCommunity(baseURL, community));
validCommunities.addAll(getSubCommunity(community.getId(), baseURL));
});
validCommunities.forEach(community -> {
@ -191,23 +240,16 @@ public class Utils implements Serializable {
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
}
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) {
Community c = getCommunity(sc);
c.setProviders(addRelevantDatasources(()->{
try {
return QueryCommunityAPI.subcommunityDatasource(communityId, sc.getSubCommunityId(), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return c;
}
/**
* filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration
* @param input
* @return
* @param <C>
*/
private static <C extends CommonConfigurationModel> Community getCommonConfiguration(C input){
Community c = new Community();
c.setZenodoCommunities(input.getOtherZenodoCommunities());
@ -241,18 +283,17 @@ public class Utils implements Serializable {
return c;
}
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
return MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), CommunitySummary.class)
.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference<List<SubCommunityModel>>() {
});
}
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), SubCommunitySummary.class);
public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class);
}
public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class);
}
private static void getRelatedOrganizations(String communityId, String baseURL, CommunityEntityMap communityEntityMap){
@ -260,8 +301,8 @@ public class Utils implements Serializable {
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), OrganizationList.class);
associatedOrgs.forEach(o -> updateEntityMap(communityId, o, communityEntityMap, ModelSupport.getIdPrefix(Organization.class)));
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), EntityIdentifierList.class);
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -273,24 +314,14 @@ public class Utils implements Serializable {
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), OrganizationList.class);
associatedOrgs.forEach(o -> updateEntityMap(communityId, o, communityEntityMap, ModelSupport.getIdPrefix(Organization.class)));
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), EntityIdentifierList.class);
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void updateEntityMap(String communityId, String entityId, CommunityEntityMap communityEntityMap, String entityPrefix){
if (!communityEntityMap
.containsKey(entityPrefix + "|" + entityId))
communityEntityMap.put(entityPrefix + "|" + entityId, new ArrayList<>());
communityEntityMap.get(entityPrefix + "|" + entityId).add(communityId);
}
/**
* it returns for each organization the list of associated communities
*/
@ -310,6 +341,7 @@ public class Utils implements Serializable {
return organizationMap;
}
public static List<String> getCommunityIdList(String baseURL) throws IOException {
return getValidCommunities(baseURL)
.stream()
@ -317,32 +349,45 @@ public class Utils implements Serializable {
.collect(Collectors.toList());
}
public static List<EntityCommunities> getDatasourceCommunities(String baseURL) throws IOException {
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
HashMap<String, Set<String>> map = new HashMap<>();
String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|";
public static CommunityEntityMap getCommunityDatasource(String baseURL) throws IOException {
CommunityEntityMap datasourceMap = new CommunityEntityMap();
validCommunities.forEach(c -> {
try {
new ObjectMapper()
.readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
.forEach(d -> {
if (!map.containsKey(d.getOpenaireId()))
map.put(d.getOpenaireId(), new HashSet<>());
getValidCommunities(baseURL)
.forEach(community -> {
getRelatedDatasource(community.getId(), baseURL, datasourceMap);
try {
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
subcommunities.forEach(sc -> getRelatedDatasource(community.getId(), sc.getSubCommunityId(), baseURL, datasourceMap));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return datasourceMap;
}
map.get(d.getOpenaireId()).add(c.getId());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
private static void getRelatedDatasource(String communityId, String baseURL, CommunityEntityMap communityEntityMap){
return map
.keySet()
.stream()
.map(k -> EntityCommunities.newInstance(entityPrefix + k, new ArrayList<>(map.get(k))))
.collect(Collectors.toList());
try {
List<String> associatedDatasources = MAPPER
.readValue(
QueryCommunityAPI.communityDatasource(communityId, baseURL), EntityIdentifierList.class);
associatedDatasources.forEach(d -> communityEntityMap.add(ModelSupport.getIdPrefix(Datasource.class) + "|" + d, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void getRelatedDatasource(String communityId, String baseURL,String subcommunityId, CommunityEntityMap communityEntityMap){
try {
List<String> associatedDatasources = MAPPER
.readValue(
QueryCommunityAPI.subcommunityDatasource(communityId, subcommunityId, baseURL), EntityIdentifierList.class);
associatedDatasources.forEach(d -> communityEntityMap.add(ModelSupport.getIdPrefix(Datasource.class) + "|" + d, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

View File

@ -18,4 +18,13 @@ public class CommunityEntityMap extends HashMap<String, List<String>> {
}
return super.get(key);
}
public void add(String key, String value){
if(!super.containsKey(key)){
super.put(key, new ArrayList<>());
}
super.get(key).add(value);
}
}

View File

@ -1,15 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
public CommunitySummary() {
super();
}
}

View File

@ -8,9 +8,9 @@ import java.util.ArrayList;
* @author miriam.baglioni
* @Date 09/10/23
*/
public class OrganizationList extends ArrayList<String> implements Serializable {
public class EntityIdentifierList extends ArrayList<String> implements Serializable {
public OrganizationList() {
public EntityIdentifierList() {
super();
}
}

View File

@ -1,10 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
public class SubCommunitySummary extends ArrayList<SubCommunityModel> implements Serializable {
public SubCommunitySummary(){
super();
}
}

View File

@ -113,12 +113,19 @@ public class SparkBulkTagJob {
spark, inputPath, outputPath, protoMap, cc);
execEntityTag(
spark, inputPath + "organization", outputPath + "organization",
Utils.getCommunityOrganization(baseURL), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
Utils.getCommunityOrganization(baseURL),
//Utils.getOrganizationCommunityMap(baseURL),
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
execEntityTag(
spark, inputPath + "project", outputPath + "project", Utils.getCommunityProjects(baseURL),
spark, inputPath + "project", outputPath + "project",
Utils.getProjectCommunityMap(baseURL),
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
execEntityTag(
spark, inputPath + "datasource", outputPath + "datasource",
Utils.getCommunityDatasource(baseURL),
//Utils.getDatasourceCommunityMap(baseURL),
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
});
}
@ -184,62 +191,6 @@ public class SparkBulkTagJob {
.json(inputPath);
}
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath,
List<EntityCommunities> datasourceCommunities) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
Dataset<EntityCommunities> dc = spark
.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
datasource
.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
Datasource ds = t2._1();
if (t2._2() != null) {
List<String> context = Optional
.ofNullable(ds.getContext())
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
.orElse(new ArrayList<>());
if (!Optional.ofNullable(ds.getContext()).isPresent())
ds.setContext(new ArrayList<>());
t2._2().getCommunitiesId().forEach(c -> {
if (!context.contains(c)) {
Context con = new Context();
con.setId(c);
con
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"1")));
ds.getContext().add(con);
}
});
}
return ds;
}, Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "datasource");
readPath(spark, outputPath + "datasource", Datasource.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "datasource");
}
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
CommunityConfiguration cc) {

View File

@ -52,6 +52,7 @@ public class PrepareResultCommunitySet {
log.info("baseURL: {}", baseURL);
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL);
//final CommunityEntityMap organizationMap = Utils.getOrganizationCommunityMap(baseURL);
log.info("organizationMap: {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf();

View File

@ -2,13 +2,11 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
@ -18,16 +16,10 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet {
@ -55,7 +47,7 @@ public class PrepareResultCommunitySet {
final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(baseURL);
final CommunityEntityMap projectsMap = Utils.getProjectCommunityMap(baseURL);
SparkConf conf = new SparkConf();

View File

@ -6,15 +6,11 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.dhp.api.QueryCommunityAPI;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityModel;
import eu.dnetlib.dhp.api.model.SubCommunityModel;
import eu.dnetlib.dhp.api.model.SubCommunitySummary;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -25,7 +21,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;