diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index c83de6db3a..f11e641e18 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -44,15 +44,12 @@ public class QueryCommunityAPI { } - public static String communityDatasource(String id, String baseURL) throws IOException { return get(baseURL + id + "/datasources"); } - - public static String communityPropagationOrganization(String id, String baseURL) throws IOException { return get(baseURL + id + "/propagationOrganizations"); @@ -86,16 +83,21 @@ public class QueryCommunityAPI { return body; } - public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL) throws IOException { + public static String subcommunityDatasource(String communityId, String subcommunityId, String baseURL) + throws IOException { return get(baseURL + communityId + "/subcommunities/datasources?subCommunityId=" + subcommunityId); } - public static String subcommunityPropagationOrganization(String communityId, String subcommunityId , String baseURL) throws IOException { + public static String subcommunityPropagationOrganization(String communityId, String subcommunityId, String baseURL) + throws IOException { return get(baseURL + communityId + "/subcommunities/propagationOrganizations?subCommunityId=" + subcommunityId); } - public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size, String baseURL) throws IOException { - return get(baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId=" + subcommunityId); + public static String subcommunityProjects(String communityId, String subcommunityId, String page, String size, + String baseURL) throws IOException { + return get( + baseURL + communityId + "/subcommunities/projects/" + page + "/" + size + "?subCommunityId=" + + subcommunityId); } public static String propagationDatasourceCommunityMap(String baseURL) throws IOException { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index ae36c0f627..a9004f4bd0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -6,10 +6,10 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Maps; @@ -38,69 +38,69 @@ public class Utils implements Serializable { } @FunctionalInterface - private interface DatasourceQueryFunction{ + private interface DatasourceQueryFunction { String query(); } - //PROJECT METHODS + // PROJECT METHODS public static CommunityEntityMap getProjectCommunityMap(String baseURL) throws IOException { CommunityEntityMap projectMap = new CommunityEntityMap(); getValidCommunities(baseURL) - .forEach(community -> { - addRelevantProjects(community.getId(), baseURL, projectMap); - try { - List subcommunities = getSubcommunities(community.getId(), baseURL); - subcommunities.forEach(sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + .forEach(community -> { + addRelevantProjects(community.getId(), baseURL, projectMap); + try { + List subcommunities = getSubcommunities(community.getId(), baseURL); + subcommunities + .forEach( + sc -> addRelevantProjects(community.getId(), sc.getSubCommunityId(), baseURL, projectMap)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); return projectMap; } private static void addRelevantProjects( - String communityId, - String baseURL, - CommunityEntityMap communityEntityMap - ) { + String communityId, + String baseURL, + CommunityEntityMap communityEntityMap) { fetchAndProcessProjects( - (page, size) -> { - try { - return QueryCommunityAPI.communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - communityId, - communityEntityMap - ); + (page, size) -> { + try { + return QueryCommunityAPI + .communityProjects(communityId, String.valueOf(page), String.valueOf(size), baseURL); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + communityId, + communityEntityMap); } private static void addRelevantProjects( - String communityId, - String subcommunityId, - String baseURL, - CommunityEntityMap communityEntityMap - ) { + String communityId, + String subcommunityId, + String baseURL, + CommunityEntityMap communityEntityMap) { fetchAndProcessProjects( - (page, size) -> { - try { - return QueryCommunityAPI.subcommunityProjects(communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - communityId, - communityEntityMap - ); + (page, size) -> { + try { + return QueryCommunityAPI + .subcommunityProjects( + communityId, subcommunityId, String.valueOf(page), String.valueOf(size), baseURL); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + communityId, + communityEntityMap); } private static void fetchAndProcessProjects( - ProjectQueryFunction projectQueryFunction, - String communityId, - CommunityEntityMap communityEntityMap - ) { + ProjectQueryFunction projectQueryFunction, + String communityId, + CommunityEntityMap communityEntityMap) { int page = 0; final int size = 100; ContentModel contentModel; @@ -111,9 +111,13 @@ public class Utils implements Serializable { contentModel = MAPPER.readValue(response, ContentModel.class); if (!contentModel.getContent().isEmpty()) { - contentModel.getContent().forEach(project ->communityEntityMap.add( - ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(), communityId) - ); + contentModel + .getContent() + .forEach( + project -> communityEntityMap + .add( + ModelSupport.getIdPrefix(Project.class) + "|" + project.getOpenaireId(), + communityId)); } } catch (IOException e) { throw new RuntimeException("Error processing projects for community: " + communityId, e); @@ -123,28 +127,28 @@ public class Utils implements Serializable { } private static List getCommunityContentProviders( - DatasourceQueryFunction datasourceQueryFunction - ) { - try { - String response = datasourceQueryFunction.query(); - List datasourceList = MAPPER.readValue(response, new TypeReference>() { + DatasourceQueryFunction datasourceQueryFunction) { + try { + String response = datasourceQueryFunction.query(); + List datasourceList = MAPPER + .readValue(response, new TypeReference>() { }); - return datasourceList.stream().map(d -> { - if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) - return null; - Provider p = new Provider(); - p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId()); - p.setSelectionConstraints(d.getSelectioncriteria()); - if (p.getSelectionConstraints() != null) - p.getSelectionConstraints().setSelection(resolver); - return p; - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException("Error processing datasource information: " + e); - } + return datasourceList.stream().map(d -> { + if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) + return null; + Provider p = new Provider(); + p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId()); + p.setSelectionConstraints(d.getSelectioncriteria()); + if (p.getSelectionConstraints() != null) + p.getSelectionConstraints().setSelection(resolver); + return p; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new RuntimeException("Error processing datasource information: " + e); + } } @@ -156,13 +160,14 @@ public class Utils implements Serializable { */ public static List getValidCommunities(String baseURL) throws IOException { List listCommunity = MAPPER - .readValue(QueryCommunityAPI.communities(baseURL), new TypeReference>() { - }); - return listCommunity.stream() - .filter( - community -> !community.getStatus().equals("hidden") && - (community.getType().equals("ri") || community.getType().equals("community"))) - .collect(Collectors.toList()); + .readValue(QueryCommunityAPI.communities(baseURL), new TypeReference>() { + }); + return listCommunity + .stream() + .filter( + community -> !community.getStatus().equals("hidden") && + (community.getType().equals("ri") || community.getType().equals("community"))) + .collect(Collectors.toList()); } /** @@ -172,16 +177,16 @@ public class Utils implements Serializable { * @return the community set with information from the community model and for the content providers */ private static Community getCommunity(String baseURL, CommunityModel communityModel) { - Community community = getCommunity(communityModel); - community.setProviders(getCommunityContentProviders(()->{ - try { - return QueryCommunityAPI.communityDatasource(community.getId(),baseURL); - } catch (IOException e) { - throw new RuntimeException(e); - } - })); + Community community = getCommunity(communityModel); + community.setProviders(getCommunityContentProviders(() -> { + try { + return QueryCommunityAPI.communityDatasource(community.getId(), baseURL); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); - return community; + return community; } /** @@ -191,9 +196,10 @@ public class Utils implements Serializable { * @param sc * @return */ - private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, SubCommunityModel sc) { + private static @NotNull Community getSubCommunityConfiguration(String baseURL, String communityId, + SubCommunityModel sc) { Community c = getCommunity(sc); - c.setProviders(getCommunityContentProviders(()->{ + c.setProviders(getCommunityContentProviders(() -> { try { return QueryCommunityAPI.subcommunityDatasource(communityId, sc.getSubCommunityId(), baseURL); } catch (IOException e) { @@ -210,12 +216,13 @@ public class Utils implements Serializable { * @param baseURL * @return */ - private static List getSubCommunity(String communityId, String baseURL){ + private static List getSubCommunity(String communityId, String baseURL) { try { List subcommunities = getSubcommunities(communityId, baseURL); - return subcommunities.stream().map(sc -> - getSubCommunityConfiguration(baseURL, communityId, sc)) - .collect(Collectors.toList()); + return subcommunities + .stream() + .map(sc -> getSubCommunityConfiguration(baseURL, communityId, sc)) + .collect(Collectors.toList()); } catch (IOException e) { throw new RuntimeException(e); } @@ -244,22 +251,21 @@ public class Utils implements Serializable { return new CommunityConfiguration(communities); } - /** * filles the common fields in the community model for both the communityconfiguration and the subcommunityconfiguration * @param input * @return * @param */ - private static Community getCommonConfiguration(C input){ + private static Community getCommonConfiguration(C input) { Community c = new Community(); c.setZenodoCommunities(input.getOtherZenodoCommunities()); if (StringUtils.isNotBlank(input.getZenodoCommunity())) c.getZenodoCommunities().add(input.getZenodoCommunity()); c.setSubjects(input.getSubjects()); - if(input.getFos() != null) + if (input.getFos() != null) c.getSubjects().addAll(input.getFos()); - if(input.getSdg()!=null) + if (input.getSdg() != null) c.getSubjects().addAll(input.getSdg()); if (input.getAdvancedConstraints() != null) { c.setConstraints(input.getAdvancedConstraints()); @@ -287,38 +293,49 @@ public class Utils implements Serializable { } public static List getSubcommunities(String communityId, String baseURL) throws IOException { - return MAPPER.readValue(QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference>() { - }); + return MAPPER + .readValue( + QueryCommunityAPI.subcommunities(communityId, baseURL), new TypeReference>() { + }); } - public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException { - return MAPPER.readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class); + public static CommunityEntityMap getOrganizationCommunityMap(String baseURL) throws IOException { + return MAPPER + .readValue(QueryCommunityAPI.propagationOrganizationCommunityMap(baseURL), CommunityEntityMap.class); } - public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException { + public static CommunityEntityMap getDatasourceCommunityMap(String baseURL) throws IOException { return MAPPER.readValue(QueryCommunityAPI.propagationDatasourceCommunityMap(baseURL), CommunityEntityMap.class); } - private static void getRelatedOrganizations(String communityId, String baseURL, CommunityEntityMap communityEntityMap){ + private static void getRelatedOrganizations(String communityId, String baseURL, + CommunityEntityMap communityEntityMap) { try { List associatedOrgs = MAPPER - .readValue( - QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), EntityIdentifierList.class); - associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId )); + .readValue( + QueryCommunityAPI.communityPropagationOrganization(communityId, baseURL), + EntityIdentifierList.class); + associatedOrgs + .forEach( + o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId)); } catch (IOException e) { throw new RuntimeException(e); } } - private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL, CommunityEntityMap communityEntityMap){ + private static void getRelatedOrganizations(String communityId, String subcommunityId, String baseURL, + CommunityEntityMap communityEntityMap) { try { List associatedOrgs = MAPPER - .readValue( - QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), EntityIdentifierList.class); - associatedOrgs.forEach(o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId )); + .readValue( + QueryCommunityAPI.subcommunityPropagationOrganization(communityId, subcommunityId, baseURL), + EntityIdentifierList.class); + associatedOrgs + .forEach( + o -> communityEntityMap.add(ModelSupport.getIdPrefix(Organization.class) + "|" + o, communityId)); } catch (IOException e) { throw new RuntimeException(e); } @@ -331,37 +348,39 @@ public class Utils implements Serializable { public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException { CommunityEntityMap organizationMap = new CommunityEntityMap(); List communityList = getValidCommunities(baseURL); - communityList.forEach(community -> { - getRelatedOrganizations(community.getId(), baseURL, organizationMap ); - try { - List subcommunities = getSubcommunities(community.getId(), baseURL); - subcommunities.forEach(sc -> getRelatedOrganizations(community.getId(), sc.getSubCommunityId(), baseURL, organizationMap)); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + communityList.forEach(community -> { + getRelatedOrganizations(community.getId(), baseURL, organizationMap); + try { + List subcommunities = getSubcommunities(community.getId(), baseURL); + subcommunities + .forEach( + sc -> getRelatedOrganizations( + community.getId(), sc.getSubCommunityId(), baseURL, organizationMap)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); return organizationMap; } - public static List getCommunityIdList(String baseURL) throws IOException { return getValidCommunities(baseURL) .stream() - .flatMap(communityModel -> { - List communityIds = new ArrayList<>(); - communityIds.add(communityModel.getId()); - try { - Utils.getSubcommunities(communityModel.getId(), baseURL).forEach(sc -> communityIds.add(sc.getSubCommunityId())); - } catch (IOException e) { - throw new RuntimeException(e); - } - return communityIds.stream(); - }) + .flatMap(communityModel -> { + List communityIds = new ArrayList<>(); + communityIds.add(communityModel.getId()); + try { + Utils + .getSubcommunities(communityModel.getId(), baseURL) + .forEach(sc -> communityIds.add(sc.getSubCommunityId())); + } catch (IOException e) { + throw new RuntimeException(e); + } + return communityIds.stream(); + }) .collect(Collectors.toList()); } - - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommonConfigurationModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommonConfigurationModel.java index 2aaa51e44d..a9fa97eb1e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommonConfigurationModel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommonConfigurationModel.java @@ -1,74 +1,76 @@ -package eu.dnetlib.dhp.api.model; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import eu.dnetlib.dhp.bulktag.community.SelectionConstraints; +package eu.dnetlib.dhp.api.model; import java.io.Serializable; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import eu.dnetlib.dhp.bulktag.community.SelectionConstraints; + @JsonIgnoreProperties(ignoreUnknown = true) public class CommonConfigurationModel implements Serializable { - private String zenodoCommunity; - private List subjects; - private List otherZenodoCommunities; - private List fos; - private List sdg; - private SelectionConstraints advancedConstraints; - private SelectionConstraints removeConstraints; + private String zenodoCommunity; + private List subjects; + private List otherZenodoCommunities; + private List fos; + private List sdg; + private SelectionConstraints advancedConstraints; + private SelectionConstraints removeConstraints; - public String getZenodoCommunity() { - return zenodoCommunity; - } + public String getZenodoCommunity() { + return zenodoCommunity; + } - public void setZenodoCommunity(String zenodoCommunity) { - this.zenodoCommunity = zenodoCommunity; - } + public void setZenodoCommunity(String zenodoCommunity) { + this.zenodoCommunity = zenodoCommunity; + } - public List getSubjects() { - return subjects; - } + public List getSubjects() { + return subjects; + } - public void setSubjects(List subjects) { - this.subjects = subjects; - } + public void setSubjects(List subjects) { + this.subjects = subjects; + } - public List getOtherZenodoCommunities() { - return otherZenodoCommunities; - } + public List getOtherZenodoCommunities() { + return otherZenodoCommunities; + } - public void setOtherZenodoCommunities(List otherZenodoCommunities) { - this.otherZenodoCommunities = otherZenodoCommunities; - } + public void setOtherZenodoCommunities(List otherZenodoCommunities) { + this.otherZenodoCommunities = otherZenodoCommunities; + } - public List getFos() { - return fos; - } + public List getFos() { + return fos; + } - public void setFos(List fos) { - this.fos = fos; - } + public void setFos(List fos) { + this.fos = fos; + } - public List getSdg() { - return sdg; - } + public List getSdg() { + return sdg; + } - public void setSdg(List sdg) { - this.sdg = sdg; - } + public void setSdg(List sdg) { + this.sdg = sdg; + } - public SelectionConstraints getRemoveConstraints() { - return removeConstraints; - } + public SelectionConstraints getRemoveConstraints() { + return removeConstraints; + } - public void setRemoveConstraints(SelectionConstraints removeConstraints) { - this.removeConstraints = removeConstraints; - } + public void setRemoveConstraints(SelectionConstraints removeConstraints) { + this.removeConstraints = removeConstraints; + } - public SelectionConstraints getAdvancedConstraints() { - return advancedConstraints; - } + public SelectionConstraints getAdvancedConstraints() { + return advancedConstraints; + } - public void setAdvancedConstraints(SelectionConstraints advancedConstraints) { - this.advancedConstraints = advancedConstraints; - } + public void setAdvancedConstraints(SelectionConstraints advancedConstraints) { + this.advancedConstraints = advancedConstraints; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java index ce7a59b09e..982741fb37 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/CommunityEntityMap.java @@ -19,12 +19,11 @@ public class CommunityEntityMap extends HashMap> { return super.get(key); } - public void add(String key, String value){ - if(!super.containsKey(key)){ + public void add(String key, String value) { + if (!super.containsKey(key)) { super.put(key, new ArrayList<>()); } super.get(key).add(value); } - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/SubCommunityModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/SubCommunityModel.java index e24861c81a..9541da38dd 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/SubCommunityModel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/SubCommunityModel.java @@ -1,18 +1,19 @@ -package eu.dnetlib.dhp.api.model; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +package eu.dnetlib.dhp.api.model; import java.io.Serializable; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + @JsonIgnoreProperties(ignoreUnknown = true) public class SubCommunityModel extends CommonConfigurationModel implements Serializable { - private String subCommunityId; + private String subCommunityId; - public String getSubCommunityId() { - return subCommunityId; - } + public String getSubCommunityId() { + return subCommunityId; + } - public void setSubCommunityId(String subCommunityId) { - this.subCommunityId = subCommunityId; - } + public void setSubCommunityId(String subCommunityId) { + this.subCommunityId = subCommunityId; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 291691b0bc..dca4f891f3 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -8,8 +8,6 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; -import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,6 +29,8 @@ import eu.dnetlib.dhp.api.model.CommunityEntityMap; import eu.dnetlib.dhp.api.model.EntityCommunities; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; +import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB; +import eu.dnetlib.dhp.common.action.model.MasterDuplicate; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -96,7 +96,6 @@ public class SparkBulkTagJob { log.info("dbPassword: {}", dbPassword); final String hdfsPath = outputPath + "masterDuplicate"; log.info("hdfsPath: {}", hdfsPath); - SparkConf conf = new SparkConf(); CommunityConfiguration cc; @@ -123,69 +122,85 @@ public class SparkBulkTagJob { spark, inputPath, outputPath, protoMap, cc); execEntityTag( spark, inputPath + "organization", outputPath + "organization", - mapWithRepresentativeOrganization(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), - Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, + mapWithRepresentativeOrganization( + spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), + Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION); execEntityTag( spark, inputPath + "project", outputPath + "project", - Utils.getProjectCommunityMap(baseURL), + Utils.getProjectCommunityMap(baseURL), Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT); execEntityTag( - spark, inputPath + "datasource", outputPath + "datasource", - mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)), - Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE); + spark, inputPath + "datasource", outputPath + "datasource", + mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunityMap(baseURL)), + Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, + TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE); }); } - private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, CommunityEntityMap datasourceCommunityMap) { - //load master-duplicate relations - Dataset masterDuplicate = spark.read().schema(Encoders.bean(MasterDuplicate.class).schema()) - .json(masterDuplicatePath).as(Encoders.bean(MasterDuplicate.class)); - //list of id for the communities related entities + private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, + CommunityEntityMap datasourceCommunityMap) { + // load master-duplicate relations + Dataset masterDuplicate = spark + .read() + .schema(Encoders.bean(MasterDuplicate.class).schema()) + .json(masterDuplicatePath) + .as(Encoders.bean(MasterDuplicate.class)); + // list of id for the communities related entities List idList = entityIdList(ModelSupport.idPrefixMap.get(Datasource.class), datasourceCommunityMap); - //find the mapping with the representative entity if any + // find the mapping with the representative entity if any Dataset datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING()); - List mappedKeys = datasourceIdentifiers.join(masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")), "left_semi") - .selectExpr("masterId as source", "duplicateId as target").collectAsList(); + List mappedKeys = datasourceIdentifiers + .join( + masterDuplicate, datasourceIdentifiers.col("_1").equalTo(masterDuplicate.col("duplicateId")), + "left_semi") + .selectExpr("masterId as source", "duplicateId as target") + .collectAsList(); - //remap the entity with its corresponding representative - return remapCommunityEntityMap(datasourceCommunityMap,mappedKeys); + // remap the entity with its corresponding representative + return remapCommunityEntityMap(datasourceCommunityMap, mappedKeys); } private static List entityIdList(String idPrefixMap, CommunityEntityMap datasourceCommunityMap) { final String prefix = idPrefixMap + "|"; - return datasourceCommunityMap.keySet() - .stream() - .map(key -> prefix + key) - .collect(Collectors.toList()); + return datasourceCommunityMap + .keySet() + .stream() + .map(key -> prefix + key) + .collect(Collectors.toList()); } - private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath - , CommunityEntityMap organizationCommunityMap) { - Dataset mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema()) - .json(relationPath) - .filter("datainfo.deletedbyinference != true and relClass = 'merges") - .select("source", "target"); + private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath, + CommunityEntityMap organizationCommunityMap) { + Dataset mergesRel = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .filter("datainfo.deletedbyinference != true and relClass = 'merges") + .select("source", "target"); List idList = entityIdList(ModelSupport.idPrefixMap.get(Organization.class), organizationCommunityMap); Dataset organizationIdentifiers = spark.createDataset(idList, Encoders.STRING()); - List mappedKeys = organizationIdentifiers.join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi") - .select("source", "target").collectAsList(); + List mappedKeys = organizationIdentifiers + .join(mergesRel, organizationIdentifiers.col("_1").equalTo(mergesRel.col("target")), "left_semi") + .select("source", "target") + .collectAsList(); return remapCommunityEntityMap(organizationCommunityMap, mappedKeys); } - private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, List mappedKeys) { + private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, + List mappedKeys) { for (Row mappedEntry : mappedKeys) { String oldKey = mappedEntry.getAs("target"); String newKey = mappedEntry.getAs("source"); - //inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which - //will be used as the newValue parameter of the BiFunction - entityCommunityMap.merge(newKey, entityCommunityMap.remove(oldKey), (existing, newValue) ->{ + // inserts the newKey in the map while removing the oldKey. The remove produces the value in the Map, which + // will be used as the newValue parameter of the BiFunction + entityCommunityMap.merge(newKey, entityCommunityMap.remove(oldKey), (existing, newValue) -> { existing.addAll(newValue); return existing; }); @@ -255,7 +270,6 @@ public class SparkBulkTagJob { .json(inputPath); } - private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) { @@ -293,11 +307,6 @@ public class SparkBulkTagJob { ProtoMap protoMappingParams, CommunityConfiguration communityConfiguration) { - try { - System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams)); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } ModelSupport.entityTypes .keySet() .parallelStream() diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java index ebbc8bd2b1..146b846eea 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/Community.java @@ -43,7 +43,7 @@ public class Community implements Serializable { } public void setSubjects(List subjects) { - if(subjects != null) + if (subjects != null) this.subjects = subjects; } @@ -60,7 +60,7 @@ public class Community implements Serializable { } public void setZenodoCommunities(List zenodoCommunities) { - if(zenodoCommunities!=null) + if (zenodoCommunities != null) this.zenodoCommunities = zenodoCommunities; } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java index 6b627f4664..f381234ac0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttocommunityfromorganization/PrepareResultCommunitySet.java @@ -52,7 +52,7 @@ public class PrepareResultCommunitySet { log.info("baseURL: {}", baseURL); final CommunityEntityMap organizationMap = Utils.getCommunityOrganization(baseURL); - //final CommunityEntityMap organizationMap = Utils.getOrganizationCommunityMap(baseURL); + // final CommunityEntityMap organizationMap = Utils.getOrganizationCommunityMap(baseURL); log.info("organizationMap: {}", new Gson().toJson(organizationMap)); SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 52c2cafcee..ad794b0da0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -28,4 +28,7 @@ blacklist=empty allowedpids=orcid;orcid_pending baseURL = https://services.openaire.eu/openaire/community/ iterations=1 +dbUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus +dbUser=dnet +dbPassword=dnetPwd diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml index 4351cd595c..cc21c7e429 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml @@ -170,6 +170,18 @@ pathMap ${pathMap} + + dbUrl + ${dbUrl} + + + dbUser + ${dbUser} + + + dbPassword + ${dbPassword} + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index 71799fbc5e..b74ed48a97 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -17,15 +17,15 @@ undelete - dbUrl> + dbUrl - dbUser> + dbUser - dbPassword> + dbPassword diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 6131eb852b..7119891754 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -8,10 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import com.fasterxml.jackson.core.JsonProcessingException; -import eu.dnetlib.dhp.api.Utils; -import eu.dnetlib.dhp.api.model.SubCommunityModel; -import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -31,9 +27,13 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.SubCommunityModel; +import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import eu.dnetlib.dhp.bulktag.community.ProtoMap; import eu.dnetlib.dhp.schema.oaf.*; @@ -1957,13 +1957,13 @@ public class BulkTagJobTest { List subcommunities = Utils.getSubcommunities("clarin", baseURL); CommunityConfiguration tmp = Utils.getCommunityConfiguration(baseURL); - tmp.getCommunities().keySet().forEach(c -> { - try { - System.out.println(new ObjectMapper().writeValueAsString(tmp.getCommunities().get(c))); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }); + tmp.getCommunities().keySet().forEach(c -> { + try { + System.out.println(new ObjectMapper().writeValueAsString(tmp.getCommunities().get(c))); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); System.out.println(new ObjectMapper().writeValueAsString(Utils.getOrganizationCommunityMap(baseURL))); }