From 6e1f383e4a06a0a8abb923134808013c29d56e58 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Feb 2024 16:37:14 +0100 Subject: [PATCH] [Tagging Projects and Datasource] first extention of bulktagging to add the context to projects and datasource --- .../eu/dnetlib/dhp/api/QueryCommunityAPI.java | 2 + .../main/java/eu/dnetlib/dhp/api/Utils.java | 53 ++++++++---- .../dhp/api/model/EntityCommunities.java | 39 +++++++++ .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 82 ++++++++++++++++++- pom.xml | 2 +- 5 files changed, 161 insertions(+), 17 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index cf33c65099..28110549c4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.util.List; import org.jetbrains.annotations.NotNull; @@ -77,4 +78,5 @@ public class QueryCommunityAPI { return body; } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index 06d0f95c25..bbdfbb96e1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -3,14 +3,13 @@ package eu.dnetlib.dhp.api; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; -import javax.management.Query; - +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +23,6 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import eu.dnetlib.dhp.bulktag.community.Provider; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; -import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob; /** * @author miriam.baglioni @@ -58,7 +56,7 @@ public class Utils implements Serializable { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) return null; Provider p = new Provider(); - p.setOpenaireId("10|" + d.getOpenaireId()); + p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class)+"|" + d.getOpenaireId()); p.setSelectionConstraints(d.getSelectioncriteria()); if (p.getSelectionConstraints() != null) p.getSelectionConstraints().setSelection(resolver); @@ -113,6 +111,7 @@ public class Utils implements Serializable { */ public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException { CommunityEntityMap organizationMap = new CommunityEntityMap(); + String entityPrefix = ModelSupport.getIdPrefix(Organization.class); getValidCommunities(baseURL) .forEach(community -> { String id = community.getId(); @@ -124,9 +123,9 @@ public class Utils implements Serializable { if (!organizationMap .keySet() .contains( - "20|" + o)) - organizationMap.put("20|" + o, new ArrayList<>()); - organizationMap.get("20|" + o).add(community.getId()); + entityPrefix + "|" + o)) + organizationMap.put(entityPrefix + "|" + o, new ArrayList<>()); + organizationMap.get(entityPrefix + "|" + o).add(community.getId()); }); } catch (IOException e) { throw new RuntimeException(e); @@ -138,7 +137,7 @@ public class Utils implements Serializable { public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException { CommunityEntityMap projectMap = new CommunityEntityMap(); - + String entityPrefix = ModelSupport.getIdPrefix(Project.class); getValidCommunities(baseURL) .forEach(community -> { int page = -1; @@ -155,9 +154,9 @@ public class Utils implements Serializable { ContentModel.class); if (cm.getContent().size() > 0) { cm.getContent().forEach(p -> { - if (!projectMap.keySet().contains("40|" + p.getOpenaireId())) - projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>()); - projectMap.get("40|" + p.getOpenaireId()).add(community.getId()); + if (!projectMap.keySet().contains(entityPrefix + "|" + p.getOpenaireId())) + projectMap.put(entityPrefix + "|" + p.getOpenaireId(), new ArrayList<>()); + projectMap.get(entityPrefix + "|" + p.getOpenaireId()).add(community.getId()); }); } } catch (IOException e) { @@ -174,4 +173,28 @@ public class Utils implements Serializable { .map(community -> community.getId()) .collect(Collectors.toList()); } + + public static List getDatasourceCommunities(String baseURL)throws IOException{ + List validCommunities = getValidCommunities(baseURL); + HashMap> map = new HashMap<>(); + validCommunities.forEach(c -> { + try { + new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) + .forEach(d -> { + if (!map.keySet().contains(d.getOpenaireId())) + map.put(d.getOpenaireId(), new HashSet<>()); + + map.get(d.getOpenaireId()).add(c.getId()); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + + return map.keySet().stream().map(k -> EntityCommunities.newInstance(k, map.get(k).stream().collect(Collectors.toList()))).collect(Collectors.toList()); + + } + + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java new file mode 100644 index 0000000000..3af396a0af --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java @@ -0,0 +1,39 @@ +package eu.dnetlib.dhp.api.model; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; + +import java.io.Serializable; +import java.util.List; + +/** + * @author miriam.baglioni + * @Date 13/02/24 + */ +public class EntityCommunities implements Serializable { + private String entityId; + private List communitiesId; + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public List getCommunitiesId() { + return communitiesId; + } + + public void setCommunitiesId(List communitiesId) { + this.communitiesId = communitiesId; + } + + public static EntityCommunities newInstance(String dsid, List csid){ + EntityCommunities dsc = new EntityCommunities(); + dsc.entityId = dsid; + dsc.communitiesId = csid; + return dsc; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index e20fcb081a..4bd7707221 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -5,7 +5,14 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.api.model.EntityCommunities; +import eu.dnetlib.dhp.api.model.DatasourceCommunitiesList; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -23,11 +30,11 @@ import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; public class SparkBulkTagJob { @@ -89,9 +96,80 @@ public class SparkBulkTagJob { spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); + execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); + execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); }); } + private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) { + Dataset projects = readPath(spark, inputPath + "project", Project.class); + Dataset pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class)); + + projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") + .map((MapFunction, Project>) t2 -> { + Project ds = t2._1(); + if (t2._2() != null){ + List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + t2._2().getCommunitiesId().forEach(c -> { + if(!context.contains(c)){ + Context con = new Context(); + con.setId(c); + con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); + ds.getContext().add(con) + } + }); + } + return ds; + } ,Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "project"); + + readPath(spark, outputPath + "project", Datasource.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "project"); + } + + + private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List datasourceCommunities) { + Dataset datasource = readPath(spark, inputPath + "datasource", Datasource.class); + + Dataset dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); + + datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") + .map((MapFunction, Datasource>) t2 -> { + Datasource ds = t2._1(); + if (t2._2() != null){ + List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + t2._2().getCommunitiesId().forEach(c -> { + if(!context.contains(c)){ + Context con = new Context(); + con.setId(c); + con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); + ds.getContext().add(con) + } + }); + } + return ds; + } ,Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "datasource"); + + readPath(spark, outputPath + "datasource", Datasource.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "datasource"); + } + + private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) { @@ -182,4 +260,6 @@ public class SparkBulkTagJob { }; } + } + diff --git a/pom.xml b/pom.xml index 3b6e3a82be..c8514a3cb5 100644 --- a/pom.xml +++ b/pom.xml @@ -888,7 +888,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [4.17.3] + [5.17.3] [4.0.3] [6.0.5] [3.1.6]