Compare commits

...

8 Commits

12 changed files with 343 additions and 298 deletions

View File

@ -8,6 +8,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull;
/**
@ -37,12 +38,6 @@ public class QueryCommunityAPI {
}
public static String community(String id, String baseURL) throws IOException {
return get(baseURL + id);
}
public static String subcommunities(String communityId, String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities");
@ -56,6 +51,8 @@ public class QueryCommunityAPI {
}
public static String communityPropagationOrganization(String id, String baseURL) throws IOException {
return get(baseURL + id + "/propagationOrganizations");
@ -68,6 +65,10 @@ public class QueryCommunityAPI {
}
public static String propagationOrganizationCommunityMap(String baseURL) throws IOException {
return get(StringUtils.substringBefore(baseURL, "community") + "propagationOrganizationCommunityMap");
}
@NotNull
private static String getBody(HttpURLConnection conn) throws IOException {
String body = "{}";
@ -88,4 +89,16 @@ public class QueryCommunityAPI {
public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities/datasources?subCommunityId=" + subcommunityId);
}
public static String subcommunityPropagationOrganization(String communityId, String subcommunityId , String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities/propagationOrganizations?subCommunityId=" + subcommunityId);
}
public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size, String baseURL) throws IOException {
return get(baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId=" + subcommunityId);
}
public static String propagationDatasourceCommunityMap(String baseURL) throws IOException {
return get(baseURL + "/propagationDatasourceCommunityMap");
}
}

View File

@ -6,10 +6,9 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
@ -33,78 +32,105 @@ public class Utils implements Serializable {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final VerbResolver resolver = VerbResolverFactory.newInstance();
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<CommunityModel> communityList = getValidCommunities(baseURL);
List<Community> validCommunities = new ArrayList<>();
communityList
@FunctionalInterface
private interface ProjectQueryFunction {
String query(int page, int size);
}
@FunctionalInterface
private interface DatasourceQueryFunction{
String query();
}
//PROJECT METHODS
public static CommunityEntityMap getProjectCommunityMap(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
getValidCommunities(baseURL)
.forEach(community -> {
try {
CommunityModel cm = MAPPER
.readValue(QueryCommunityAPI.community(community.getId(), baseURL), CommunityModel.class);
validCommunities.add(getCommunity(cm));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
try {
DatasourceList dl = MAPPER
.readValue(
QueryCommunityAPI.communityDatasource(community.getId(), baseURL), DatasourceList.class);
community.setProviders(dl.stream().map(d -> {
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
return null;
Provider p = new Provider();
p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId());
p.setSelectionConstraints(d.getSelectioncriteria());
if (p.getSelectionConstraints() != null)
p.getSelectionConstraints().setSelection(resolver);
return p;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
//add subcommunities information if any
communityList.forEach(community -> {
addRelevantProjects(community.getId(), baseURL, projectMap);
try {
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
subcommunities.forEach(sc ->
validCommunities.add(getSubCommunityConfiguration(baseURL, community.getId(), sc)));
subcommunities.forEach(sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
return projectMap;
}
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) {
Community c = getCommunity(sc);
c.setProviders(getSubcommunityDatasources(baseURL, communityId, sc.getSubCommunityId()));
return c;
}
private static List<Provider> getSubcommunityDatasources(String baseURL, String communityId, String subcommunityId) {
private static void addRelevantProjects(
String communityId,
String baseURL,
CommunityEntityMap communityEntityMap
) {
fetchAndProcessProjects(
(page, size) -> {
try {
DatasourceList dl = null;
dl = MAPPER
.readValue(
QueryCommunityAPI.subcommunityDatasource(communityId, subcommunityId, baseURL), DatasourceList.class);
return dl.stream().map(d -> {
return QueryCommunityAPI.communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
communityId,
communityEntityMap
);
}
private static void addRelevantProjects(
String communityId,
String subcommunityId,
String baseURL,
CommunityEntityMap communityEntityMap
) {
fetchAndProcessProjects(
(page, size) -> {
try {
return QueryCommunityAPI.subcommunityProjects(communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
communityId,
communityEntityMap
);
}
private static void fetchAndProcessProjects(
ProjectQueryFunction projectQueryFunction,
String communityId,
CommunityEntityMap communityEntityMap
) {
int page = 0;
final int size = 100;
ContentModel contentModel;
do {
try {
String response = projectQueryFunction.query(page, size);
contentModel = MAPPER.readValue(response, ContentModel.class);
if (!contentModel.getContent().isEmpty()) {
contentModel.getContent().forEach(project ->communityEntityMap.add(
ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(), communityId)
);
}
} catch (IOException e) {
throw new RuntimeException("Error processing projects for community: " + communityId, e);
}
page++;
} while (!contentModel.getLast());
}
private static List<Provider> getCommunityContentProviders(
DatasourceQueryFunction datasourceQueryFunction
) {
try {
String response = datasourceQueryFunction.query();
List<CommunityContentprovider> datasourceList = MAPPER.readValue(response, new TypeReference<List<CommunityContentprovider>>() {
});
return datasourceList.stream().map(d -> {
if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled()))
return null;
Provider p = new Provider();
@ -116,18 +142,124 @@ public class Utils implements Serializable {
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException("Error processing datasource information: " + e);
}
}
/**
* Select the communties with status different from hidden
* @param baseURL the base url of the API to be queried
* @return the list of communities in the CommunityModel class
* @throws IOException
*/
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
List<CommunityModel> listCommunity = MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), new TypeReference<List<CommunityModel>>() {
});
return listCommunity.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
}
/**
* Sets the Community information from the replies of the communityAPIs
* @param baseURL the base url of the API to be queried
* @param communityModel the communityModel as replied by the APIs
* @return the community set with information from the community model and for the content providers
*/
private static Community getCommunity(String baseURL, CommunityModel communityModel) {
Community community = getCommunity(communityModel);
community.setProviders(getCommunityContentProviders(()->{
try {
return QueryCommunityAPI.communityDatasource(community.getId(),baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return community;
}
/**
* extends the community configuration for the subcommunity by adding the content providers
* @param baseURL
* @param communityId
* @param sc
* @return
*/
private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) {
Community c = getCommunity(sc);
c.setProviders(getCommunityContentProviders(()->{
try {
return QueryCommunityAPI.subcommunityDatasource(communityId, sc.getSubCommunityId(), baseURL);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
return c;
}
/**
* Gets all the sub-comminities fir a given community identifier
* @param communityId
* @param baseURL
* @return
*/
private static List<Community> getSubCommunity(String communityId, String baseURL){
try {
List<SubCommunityModel> subcommunities = getSubcommunities(communityId, baseURL);
return subcommunities.stream().map(sc ->
getSubCommunityConfiguration(baseURL, communityId, sc))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* prepare the configuration for the communities and sub-communities
* @param baseURL
* @return
* @throws IOException
*/
public static CommunityConfiguration getCommunityConfiguration(String baseURL) throws IOException {
final Map<String, Community> communities = Maps.newHashMap();
List<CommunityModel> communityList = getValidCommunities(baseURL);
List<Community> validCommunities = new ArrayList<>();
communityList.forEach(community -> {
validCommunities.add(getCommunity(baseURL, community));
validCommunities.addAll(getSubCommunity(community.getId(), baseURL));
});
validCommunities.forEach(community -> {
if (community.isValid())
communities.put(community.getId(), community);
});
return new CommunityConfiguration(communities);
}
/**
* filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration
* @param input
* @return
* @param <C>
*/
private static <C extends CommonConfigurationModel> Community getCommonConfiguration(C input){
Community c = new Community();
c.setZenodoCommunities(input.getOtherZenodoCommunities());
if (StringUtils.isNotBlank(input.getZenodoCommunity()))
c.getZenodoCommunities().add(input.getZenodoCommunity());
c.setSubjects(input.getSubjects());
if(input.getFos() != null)
c.getSubjects().addAll(input.getFos());
if(input.getSdg()!=null)
c.getSubjects().addAll(input.getSdg());
if (input.getAdvancedConstraints() != null) {
c.setConstraints(input.getAdvancedConstraints());
@ -147,8 +279,6 @@ public class Utils implements Serializable {
return c;
}
private static Community getCommunity(CommunityModel cm) {
Community c = getCommonConfiguration(cm);
c.setId(cm.getId());
@ -156,18 +286,43 @@ public class Utils implements Serializable {
return c;
}
public static List<CommunityModel> getValidCommunities(String baseURL) throws IOException {
return MAPPER
.readValue(QueryCommunityAPI.communities(baseURL), CommunitySummary.class)
.stream()
.filter(
community -> !community.getStatus().equals("hidden") &&
(community.getType().equals("ri") || community.getType().equals("community")))
.collect(Collectors.toList());
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference<List<SubCommunityModel>>() {
});
}
public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class);
}
public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class);
}
private static void getRelatedOrganizations(String communityId, String baseURL, CommunityEntityMap communityEntityMap){
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), EntityIdentifierList.class);
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL, CommunityEntityMap communityEntityMap){
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), EntityIdentifierList.class);
associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId ));
} catch (IOException e) {
throw new RuntimeException(e);
}
public static List<SubCommunityModel> getSubcommunities(String communityId, String baseURL) throws IOException {
return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), SubCommunitySummary.class);
}
/**
@ -175,22 +330,12 @@ public class Utils implements Serializable {
*/
public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException {
CommunityEntityMap organizationMap = new CommunityEntityMap();
String entityPrefix = ModelSupport.getIdPrefix(Organization.class);
getValidCommunities(baseURL)
.forEach(community -> {
String id = community.getId();
List<CommunityModel> communityList = getValidCommunities(baseURL);
communityList.forEach(community -> {
getRelatedOrganizations(community.getId(), baseURL, organizationMap );
try {
List<String> associatedOrgs = MAPPER
.readValue(
QueryCommunityAPI.communityPropagationOrganization(id, baseURL), OrganizationList.class);
associatedOrgs.forEach(o -> {
if (!organizationMap
.keySet()
.contains(
entityPrefix + "|" + o))
organizationMap.put(entityPrefix + "|" + o, new ArrayList<>());
organizationMap.get(entityPrefix + "|" + o).add(community.getId());
});
List<SubCommunityModel> subcommunities = getSubcommunities(community.getId(), baseURL);
subcommunities.forEach(sc -> getRelatedOrganizations(community.getId(), sc.getSubCommunityId(), baseURL, organizationMap));
} catch (IOException e) {
throw new RuntimeException(e);
}
@ -199,79 +344,24 @@ public class Utils implements Serializable {
return organizationMap;
}
public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException {
CommunityEntityMap projectMap = new CommunityEntityMap();
String entityPrefix = ModelSupport.getIdPrefix(Project.class);
getValidCommunities(baseURL)
.forEach(community -> {
int page = -1;
int size = 100;
ContentModel cm = new ContentModel();
do {
page++;
try {
cm = MAPPER
.readValue(
QueryCommunityAPI
.communityProjects(
community.getId(), String.valueOf(page), String.valueOf(size), baseURL),
ContentModel.class);
if (cm.getContent().size() > 0) {
cm.getContent().forEach(p -> {
if (!projectMap.keySet().contains(entityPrefix + "|" + p.getOpenaireId()))
projectMap.put(entityPrefix + "|" + p.getOpenaireId(), new ArrayList<>());
projectMap.get(entityPrefix + "|" + p.getOpenaireId()).add(community.getId());
});
}
} catch (IOException e) {
throw new RuntimeException(e);
}
} while (!cm.getLast());
});
return projectMap;
}
public static List<String> getCommunityIdList(String baseURL) throws IOException {
return getValidCommunities(baseURL)
.stream()
.map(community -> community.getId())
.collect(Collectors.toList());
}
public static List<EntityCommunities> getDatasourceCommunities(String baseURL) throws IOException {
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
HashMap<String, Set<String>> map = new HashMap<>();
String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|";
validCommunities.forEach(c -> {
.flatMap(communityModel -> {
List<String> communityIds = new ArrayList<>();
communityIds.add(communityModel.getId());
try {
new ObjectMapper()
.readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
.forEach(d -> {
if (!map.keySet().contains(d.getOpenaireId()))
map.put(d.getOpenaireId(), new HashSet<>());
map.get(d.getOpenaireId()).add(c.getId());
});
Utils.getSubcommunities(communityModel.getId(), baseURL).forEach(sc -> communityIds.add(sc.getSubCommunityId()));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return communityIds.stream();
})
List<EntityCommunities> temp = map
.keySet()
.stream()
.map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map)))
.collect(Collectors.toList());
return temp;
}
@NotNull
private static List<String> getCollect(String k, HashMap<String, Set<String>> map) {
List<String> temp = map.get(k).stream().collect(Collectors.toList());
return temp;
}
}

View File

@ -18,4 +18,13 @@ public class CommunityEntityMap extends HashMap<String, List<String>> {
}
return super.get(key);
}
public void add(String key, String value){
if(!super.containsKey(key)){
super.put(key, new ArrayList<>());
}
super.get(key).add(value);
}
}

View File

@ -1,15 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
/**
* @author miriam.baglioni
* @Date 06/10/23
*/
public class CommunitySummary extends ArrayList<CommunityModel> implements Serializable {
public CommunitySummary() {
super();
}
}

View File

@ -1,13 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
import eu.dnetlib.dhp.api.model.CommunityContentprovider;
public class DatasourceList extends ArrayList<CommunityContentprovider> implements Serializable {
public DatasourceList() {
super();
}
}

View File

@ -8,9 +8,9 @@ import java.util.ArrayList;
* @author miriam.baglioni
* @Date 09/10/23
*/
public class OrganizationList extends ArrayList<String> implements Serializable {
public class EntityIdentifierList extends ArrayList<String> implements Serializable {
public OrganizationList() {
public EntityIdentifierList() {
super();
}
}

View File

@ -1,10 +0,0 @@
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.ArrayList;
public class SubCommunitySummary extends ArrayList<SubCommunityModel> implements Serializable {
public SubCommunitySummary(){
super();
}
}

View File

@ -15,10 +15,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -113,16 +111,53 @@ public class SparkBulkTagJob {
spark, inputPath, outputPath, protoMap, cc);
execEntityTag(
spark, inputPath + "organization", outputPath + "organization",
Utils.getCommunityOrganization(baseURL), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
mapWithRepresentative(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)),
Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
execEntityTag(
spark, inputPath + "project", outputPath + "project", Utils.getCommunityProjects(baseURL),
spark, inputPath + "project", outputPath + "project",
Utils.getProjectCommunityMap(baseURL),
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
execEntityTag(
spark, inputPath + "datasource", outputPath + "datasource",
Utils.getDatasourceCommunityMap(baseURL),
Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
});
}
private static CommunityEntityMap mapWithRepresentative(SparkSession spark, String relationPath
, CommunityEntityMap organizationCommunityMap) {
Dataset<Row> mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema())
.json(relationPath)
.filter("datainfo.deletedbyinference != true and relClass = 'merges")
.select("source", "target");
ArrayList<String> idList = organizationCommunityMap.keySet()
.stream()
.map(k -> ModelSupport.idPrefixMap.get(Organization.class) + "|" + k).collect(Collectors.toCollection(ArrayList::new));
Dataset<String> organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
List<Row> mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi")
.select("source", "target").collectAsList();
for (Row mappedEntry : mappedKeys) {
String oldKey = mappedEntry.getAs("target");
String newKey = mappedEntry.getAs("source");
//inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which
//will be used as the newValue parameter of the BiFunction
organizationCommunityMap.merge(newKey, organizationCommunityMap.remove(oldKey), (existing, newValue) ->{
existing.addAll(newValue);
return existing;
});
}
return organizationCommunityMap;
}
private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
CommunityEntityMap communityEntity, Class<E> entityClass,
String classID, String calssName) {
@ -184,62 +219,6 @@ public class SparkBulkTagJob {
.json(inputPath);
}
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath,
List<EntityCommunities> datasourceCommunities) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
Dataset<EntityCommunities> dc = spark
.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
datasource
.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
Datasource ds = t2._1();
if (t2._2() != null) {
List<String> context = Optional
.ofNullable(ds.getContext())
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
.orElse(new ArrayList<>());
if (!Optional.ofNullable(ds.getContext()).isPresent())
ds.setContext(new ArrayList<>());
t2._2().getCommunitiesId().forEach(c -> {
if (!context.contains(c)) {
Context con = new Context();
con.setId(c);
con
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"1")));
ds.getContext().add(con);
}
});
}
return ds;
}, Encoders.bean(Datasource.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "datasource");
readPath(spark, outputPath + "datasource", Datasource.class)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + "datasource");
}
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
CommunityConfiguration cc) {

View File

@ -43,6 +43,7 @@ public class Community implements Serializable {
}
public void setSubjects(List<String> subjects) {
if(subjects != null)
this.subjects = subjects;
}
@ -59,6 +60,7 @@ public class Community implements Serializable {
}
public void setZenodoCommunities(List<String> zenodoCommunities) {
if(zenodoCommunities!=null)
this.zenodoCommunities = zenodoCommunities;
}

View File

@ -52,6 +52,7 @@ public class PrepareResultCommunitySet {
log.info("baseURL: {}", baseURL);
final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL);
//final CommunityEntityMap organizationMap = Utils.getOrganizationCommunityMap(baseURL);
log.info("organizationMap: {}", new Gson().toJson(organizationMap));
SparkConf conf = new SparkConf();

View File

@ -2,13 +2,11 @@
package eu.dnetlib.dhp.resulttocommunityfromproject;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
@ -18,16 +16,10 @@ import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultOrganizations;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareResultCommunitySet {
@ -55,7 +47,7 @@ public class PrepareResultCommunitySet {
final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL);
final CommunityEntityMap projectsMap = Utils.getCommunityProjects(baseURL);
final CommunityEntityMap projectsMap = Utils.getProjectCommunityMap(baseURL);
SparkConf conf = new SparkConf();

View File

@ -6,15 +6,11 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.dhp.api.QueryCommunityAPI;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityModel;
import eu.dnetlib.dhp.api.model.SubCommunityModel;
import eu.dnetlib.dhp.api.model.SubCommunitySummary;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
@ -25,7 +21,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@ -1969,6 +1964,8 @@ public class BulkTagJobTest {
throw new RuntimeException(e);
}
});
System.out.println(new ObjectMapper().writeValueAsString(Utils.getOrganizationCommunityMap(baseURL)));
}
}