From 6e1f383e4a06a0a8abb923134808013c29d56e58 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 13 Feb 2024 16:37:14 +0100 Subject: [PATCH 1/6] [Tagging Projects and Datasource] first extention of bulktagging to add the context to projects and datasource --- .../eu/dnetlib/dhp/api/QueryCommunityAPI.java | 2 + .../main/java/eu/dnetlib/dhp/api/Utils.java | 53 ++++++++---- .../dhp/api/model/EntityCommunities.java | 39 +++++++++ .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 82 ++++++++++++++++++- pom.xml | 2 +- 5 files changed, 161 insertions(+), 17 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index cf33c65099..28110549c4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; +import java.util.List; import org.jetbrains.annotations.NotNull; @@ -77,4 +78,5 @@ public class QueryCommunityAPI { return body; } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index 06d0f95c25..bbdfbb96e1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -3,14 +3,13 @@ package eu.dnetlib.dhp.api; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.stream.Collectors; -import javax.management.Query; - +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +23,6 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import eu.dnetlib.dhp.bulktag.community.Provider; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; -import eu.dnetlib.dhp.resulttocommunityfromorganization.SparkResultToCommunityFromOrganizationJob; /** * @author miriam.baglioni @@ -58,7 +56,7 @@ public class Utils implements Serializable { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) return null; Provider p = new Provider(); - p.setOpenaireId("10|" + d.getOpenaireId()); + p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class)+"|" + d.getOpenaireId()); p.setSelectionConstraints(d.getSelectioncriteria()); if (p.getSelectionConstraints() != null) p.getSelectionConstraints().setSelection(resolver); @@ -113,6 +111,7 @@ public class Utils implements Serializable { */ public static CommunityEntityMap getCommunityOrganization(String baseURL) throws IOException { CommunityEntityMap organizationMap = new CommunityEntityMap(); + String entityPrefix = ModelSupport.getIdPrefix(Organization.class); getValidCommunities(baseURL) .forEach(community -> { String id = community.getId(); @@ -124,9 +123,9 @@ public class Utils implements Serializable { if (!organizationMap .keySet() .contains( - "20|" + o)) - organizationMap.put("20|" + o, new ArrayList<>()); - organizationMap.get("20|" + o).add(community.getId()); + entityPrefix + "|" + o)) + organizationMap.put(entityPrefix + "|" + o, new ArrayList<>()); + organizationMap.get(entityPrefix + "|" + o).add(community.getId()); }); } catch (IOException e) { throw new RuntimeException(e); @@ -138,7 +137,7 @@ public class Utils implements Serializable { public static CommunityEntityMap getCommunityProjects(String baseURL) throws IOException { CommunityEntityMap projectMap = new CommunityEntityMap(); - + String entityPrefix = ModelSupport.getIdPrefix(Project.class); getValidCommunities(baseURL) .forEach(community -> { int page = -1; @@ -155,9 +154,9 @@ public class Utils implements Serializable { ContentModel.class); if (cm.getContent().size() > 0) { cm.getContent().forEach(p -> { - if (!projectMap.keySet().contains("40|" + p.getOpenaireId())) - projectMap.put("40|" + p.getOpenaireId(), new ArrayList<>()); - projectMap.get("40|" + p.getOpenaireId()).add(community.getId()); + if (!projectMap.keySet().contains(entityPrefix + "|" + p.getOpenaireId())) + projectMap.put(entityPrefix + "|" + p.getOpenaireId(), new ArrayList<>()); + projectMap.get(entityPrefix + "|" + p.getOpenaireId()).add(community.getId()); }); } } catch (IOException e) { @@ -174,4 +173,28 @@ public class Utils implements Serializable { .map(community -> community.getId()) .collect(Collectors.toList()); } + + public static List getDatasourceCommunities(String baseURL)throws IOException{ + List validCommunities = getValidCommunities(baseURL); + HashMap> map = new HashMap<>(); + validCommunities.forEach(c -> { + try { + new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) + .forEach(d -> { + if (!map.keySet().contains(d.getOpenaireId())) + map.put(d.getOpenaireId(), new HashSet<>()); + + map.get(d.getOpenaireId()).add(c.getId()); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + + return map.keySet().stream().map(k -> EntityCommunities.newInstance(k, map.get(k).stream().collect(Collectors.toList()))).collect(Collectors.toList()); + + } + + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java new file mode 100644 index 0000000000..3af396a0af --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java @@ -0,0 +1,39 @@ +package eu.dnetlib.dhp.api.model; + +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; + +import java.io.Serializable; +import java.util.List; + +/** + * @author miriam.baglioni + * @Date 13/02/24 + */ +public class EntityCommunities implements Serializable { + private String entityId; + private List communitiesId; + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public List getCommunitiesId() { + return communitiesId; + } + + public void setCommunitiesId(List communitiesId) { + this.communitiesId = communitiesId; + } + + public static EntityCommunities newInstance(String dsid, List csid){ + EntityCommunities dsc = new EntityCommunities(); + dsc.entityId = dsid; + dsc.communitiesId = csid; + return dsc; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index e20fcb081a..4bd7707221 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -5,7 +5,14 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; +import java.util.stream.Collectors; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.api.model.EntityCommunities; +import eu.dnetlib.dhp.api.model.DatasourceCommunitiesList; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -23,11 +30,11 @@ import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; public class SparkBulkTagJob { @@ -89,9 +96,80 @@ public class SparkBulkTagJob { spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); + execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); + execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); }); } + private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) { + Dataset projects = readPath(spark, inputPath + "project", Project.class); + Dataset pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class)); + + projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") + .map((MapFunction, Project>) t2 -> { + Project ds = t2._1(); + if (t2._2() != null){ + List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + t2._2().getCommunitiesId().forEach(c -> { + if(!context.contains(c)){ + Context con = new Context(); + con.setId(c); + con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); + ds.getContext().add(con) + } + }); + } + return ds; + } ,Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "project"); + + readPath(spark, outputPath + "project", Datasource.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "project"); + } + + + private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List datasourceCommunities) { + Dataset datasource = readPath(spark, inputPath + "datasource", Datasource.class); + + Dataset dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); + + datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") + .map((MapFunction, Datasource>) t2 -> { + Datasource ds = t2._1(); + if (t2._2() != null){ + List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + t2._2().getCommunitiesId().forEach(c -> { + if(!context.contains(c)){ + Context con = new Context(); + con.setId(c); + con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); + ds.getContext().add(con) + } + }); + } + return ds; + } ,Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(outputPath + "datasource"); + + readPath(spark, outputPath + "datasource", Datasource.class) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(inputPath + "datasource"); + } + + private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) { @@ -182,4 +260,6 @@ public class SparkBulkTagJob { }; } + } + diff --git a/pom.xml b/pom.xml index 3b6e3a82be..c8514a3cb5 100644 --- a/pom.xml +++ b/pom.xml @@ -888,7 +888,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [4.17.3] + [5.17.3] [4.0.3] [6.0.5] [3.1.6] From 83bb97be8337a2c26ed5eaed1d5edeb8d5890b54 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 14 Feb 2024 11:23:47 +0100 Subject: [PATCH 2/6] [Tagging Projects and Datasource] added test to check datasource tagging. Fixed issue --- .../main/java/eu/dnetlib/dhp/api/Utils.java | 13 +++- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 25 ++++++-- .../bulktag/input_bulkTag_parameters.json | 2 +- .../dnetlib/dhp/bulktag/BulkTagJobTest.java | 61 ++++++++++++++++++- .../publication/update_datasource/datasource | 3 + .../publication/update_datasource/project | 0 6 files changed, 95 insertions(+), 9 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/project diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index bbdfbb96e1..c990d6ebe8 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -10,6 +10,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Project; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -177,6 +178,8 @@ public class Utils implements Serializable { public static List getDatasourceCommunities(String baseURL)throws IOException{ List validCommunities = getValidCommunities(baseURL); HashMap> map = new HashMap<>(); + String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|" ; + validCommunities.forEach(c -> { try { new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) @@ -191,10 +194,18 @@ public class Utils implements Serializable { } }); + List temp = map.keySet().stream() + .map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))).collect(Collectors.toList()); - return map.keySet().stream().map(k -> EntityCommunities.newInstance(k, map.get(k).stream().collect(Collectors.toList()))).collect(Collectors.toList()); + return temp; } + @NotNull + private static List getCollect(String k, HashMap> map) { + List temp = map.get(k).stream().collect(Collectors.toList()); + return temp; + } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 4bd7707221..33c416b0de 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -9,7 +9,6 @@ import java.util.stream.Collectors; import eu.dnetlib.dhp.api.model.CommunityEntityMap; import eu.dnetlib.dhp.api.model.EntityCommunities; -import eu.dnetlib.dhp.api.model.DatasourceCommunitiesList; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Project; @@ -109,14 +108,20 @@ public class SparkBulkTagJob { .map((MapFunction, Project>) t2 -> { Project ds = t2._1(); if (t2._2() != null){ - List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + List context = + Optional.ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); + + if(!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); t2._2().getCommunitiesId().forEach(c -> { if(!context.contains(c)){ Context con = new Context(); con.setId(c); con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con) + ds.getContext().add(con); } }); } @@ -139,19 +144,27 @@ public class SparkBulkTagJob { Dataset datasource = readPath(spark, inputPath + "datasource", Datasource.class); Dataset dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); - + datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") .map((MapFunction, Datasource>) t2 -> { Datasource ds = t2._1(); if (t2._2() != null){ - List context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList()); + + List context = + Optional.ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); + + if(!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); + t2._2().getCommunitiesId().forEach(c -> { if(!context.contains(c)){ Context con = new Context(); con.setId(c); con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con) + ds.getContext().add(con); } }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json index ce1a8ecab6..f8fe91223e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json @@ -33,6 +33,6 @@ "paramName": "bu", "paramLongName": "baseURL", "paramDescription": "this parameter is to specify the api to be queried (beta or production)", - "paramRequired": false + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 01be4d0a3a..882e259cf6 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -325,7 +325,7 @@ public class BulkTagJobTest { "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - + "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", pathMap }); @@ -349,6 +349,8 @@ public class BulkTagJobTest { org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + idExplodeCommunity.show(false); + Assertions.assertEquals(5, idExplodeCommunity.count()); Assertions .assertEquals( @@ -383,6 +385,63 @@ public class BulkTagJobTest { .count()); } + @Test + void datasourceTag() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") + .getPath(); + SparkBulkTagJob + .main( + new String[] { + + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + + "-outputPath", workingDir.toString() + "/", + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-pathMap", pathMap + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); + + Assertions.assertEquals(3, tmp.count()); + org.apache.spark.sql.Dataset verificationDataset = spark + .createDataset(tmp.rdd(), Encoders.bean(Datasource.class)); + + verificationDataset.createOrReplaceTempView("datasource"); + + String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " + + "from datasource " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; + + org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); + + idExplodeCommunity.show(false); + + Assertions.assertEquals(3, idExplodeCommunity.count()); + Assertions + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + Assertions + .assertEquals( + 3, + idExplodeCommunity + .filter("name = 'Bulktagging for Community - Datasource'") + .count()); + + Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count()); + Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count()); + + + } + @Test void bulktagByZenodoCommunityTest() throws Exception { final String sourcePath = getClass() diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/datasource b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/datasource index e69de29bb2..98cd3649ad 100644 --- a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/datasource +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/datasource @@ -0,0 +1,3 @@ +{"accessinfopackage":[],"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"consenttermsofuse":false,"contentpolicies":[{"classid":"Journal articles","classname":"Journal articles","schemeid":"eosc:contentpolicies","schemename":"eosc:contentpolicies"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"datasourcetype":{"classid":"pubsrepository::journal","classname":"Journal","schemeid":"dnet:datasource_typologies","schemename":"dnet:datasource_typologies"},"datasourcetypeui":{"classid":"Journal archive","classname":"Journal archive","schemeid":"dnet:datasource_typologies_ui","schemename":"dnet:datasource_typologies_ui"},"dateofcollection":"2019-07-26","englishname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"Tobacco Use Insights"},"eoscdatasourcetype":{"classid":"Journal archive","classname":"Journal Archive","schemeid":"dnet:eosc_datasource_types","schemename":"dnet:eosc_datasource_types"},"eosctype":{"classid":"Data Source","classname":"Data Source","schemeid":"","schemename":""},"extraInfo":[],"fulltextdownload":false,"id":"10|re3data_____::a507cdacc5bbcc08761c92185dee5cab","journal":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"issnOnline":"1179-173X","issnPrinted":"","name":"Tobacco Use Insights"},"languages":[],"lastupdatetimestamp":1680789947124,"latitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"longitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"namespaceprefix":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"doaj1179173X"},"odlanguages":[],"odnumberofitems":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"officialname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"Tobacco Use Insights"},"openairecompatibility":{"classid":"openaire3.0","classname":"collected from a compatible aggregator","schemeid":"dnet:datasourceCompatibilityLevel","schemename":"dnet:datasourceCompatibilityLevel"},"originalId":["doajarticles::1179-173X"],"pid":[],"policies":[],"researchentitytypes":["Literature"],"subjects":[{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"qualifier":{"classid":"keyword","classname":"keyword","schemeid":"dnet:subject_classification_typologies","schemename":"dnet:subject_classification_typologies"},"value":"Medicine: Public aspects of medicine"}],"thematic":false,"versioncontrol":false,"versioning":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":false},"websiteurl":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"https://journals.sagepub.com/home/tui"}} +{"accessinfopackage":[],"collectedfrom":[{"key":"10|openaire____::2e06c1122c7df43765fdcf91080824fa","value":"EOSC Service Catalogue"}],"consenttermsofuse":false,"contactemail":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eosc@ill.eu"},"contentpolicies":[],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"datasourcetype":{"classid":"service","classname":"service","schemeid":"dnet:datasource_typologies","schemename":"dnet:datasource_typologies"},"datasourcetypeui":{"classid":"service","classname":"service","schemeid":"dnet:datasource_typologies_ui","schemename":"dnet:datasource_typologies_ui"},"dateofcollection":"2022-07-13","description":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue is a database of software used mainly for data analysis of neutron and photon experiments. This database can be freely consulted. It gives an overview of software available for neutron and photon experiments and their use with respect to instruments at experimental facilities."},"englishname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue"},"eosctype":{"classid":"Service","classname":"Service","schemeid":"","schemename":""},"extraInfo":[],"fulltextdownload":false,"id":"10|doajarticles::c6cd4b532e12868c1d760a8d7cda6815","languages":["eng"],"lastupdatetimestamp":1680789947124,"latitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"logourl":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"https://software.pan-data.eu/bundles/app/images/pandata-logo.png"},"longitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"namespaceprefix":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eoscdbb03112"},"odlanguages":[{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eng"}],"odnumberofitems":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"officialname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue"},"openairecompatibility":{"classid":"openaire3.0","classname":"Not yet registered","schemeid":"dnet:datasourceCompatibilityLevel","schemename":"dnet:datasourceCompatibilityLevel"},"originalId":["eosc________::ill::ill.panosc_software_catalogue"],"pid":[],"policies":[],"researchentitytypes":[],"researchproductaccesspolicies":[],"researchproductmetadataaccesspolicies":[],"subjects":[],"thematic":false,"versioncontrol":false,"versioning":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":false},"websiteurl":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"https://software.pan-data.eu/"}} +{"accessinfopackage":[],"collectedfrom":[{"key":"10|openaire____::2e06c1122c7df43765fdcf91080824fa","value":"EOSC Service Catalogue"}],"consenttermsofuse":false,"contactemail":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eosc@ill.eu"},"contentpolicies":[],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"datasourcetype":{"classid":"service","classname":"service","schemeid":"dnet:datasource_typologies","schemename":"dnet:datasource_typologies"},"datasourcetypeui":{"classid":"service","classname":"service","schemeid":"dnet:datasource_typologies_ui","schemename":"dnet:datasource_typologies_ui"},"dateofcollection":"2022-07-13","description":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue is a database of software used mainly for data analysis of neutron and photon experiments. This database can be freely consulted. It gives an overview of software available for neutron and photon experiments and their use with respect to instruments at experimental facilities."},"englishname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue"},"eosctype":{"classid":"Service","classname":"Service","schemeid":"","schemename":""},"extraInfo":[],"fulltextdownload":false,"id":"10|eosc________::7ef2576047f040612b983a27347471fc","languages":["eng"],"lastupdatetimestamp":1680789947124,"latitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"logourl":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"https://software.pan-data.eu/bundles/app/images/pandata-logo.png"},"longitude":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"namespaceprefix":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eoscdbb03112"},"odlanguages":[{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"eng"}],"odnumberofitems":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"0.0"},"officialname":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"PaNOSC Software Catalogue"},"openairecompatibility":{"classid":"openaire3.0","classname":"Not yet registered","schemeid":"dnet:datasourceCompatibilityLevel","schemename":"dnet:datasourceCompatibilityLevel"},"originalId":["eosc________::ill::ill.panosc_software_catalogue"],"pid":[],"policies":[],"researchentitytypes":[],"researchproductaccesspolicies":[],"researchproductmetadataaccesspolicies":[],"subjects":[],"thematic":false,"versioncontrol":false,"versioning":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":false},"websiteurl":{"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.900"},"value":"https://software.pan-data.eu/"}} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/project b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/project new file mode 100644 index 0000000000..e69de29bb2 From 8dae10b442ac0d6a1207868615b49e95e21b18a3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 14 Feb 2024 14:57:08 +0100 Subject: [PATCH 3/6] - --- .../main/java/eu/dnetlib/dhp/MoveResult.java | 2 +- .../eu/dnetlib/dhp/api/QueryCommunityAPI.java | 1 - .../main/java/eu/dnetlib/dhp/api/Utils.java | 35 ++-- .../dhp/api/model/EntityCommunities.java | 47 ++--- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 184 ++++++++++------- .../dnetlib/dhp/bulktag/actions/Action.java | 43 ++-- .../bulktag/actions/ExecSubstringAction.java | 65 +++--- .../dnetlib/dhp/bulktag/actions/MapModel.java | 30 +-- .../dhp/bulktag/actions/Parameters.java | 29 +-- .../dhp/bulktag/community/ProtoMap.java | 4 +- .../dhp/bulktag/community/ResultTagger.java | 36 ++-- .../eu/dnetlib/dhp/wf/main/job.properties | 31 +-- .../dnetlib/dhp/bulktag/BulkTagJobTest.java | 189 ++++++++++-------- 13 files changed, 388 insertions(+), 308 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java index c71ccb4391..6731f2332b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java @@ -30,7 +30,7 @@ public class MoveResult implements Serializable { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - MoveResult.class + MoveResult.class .getResourceAsStream( "/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json")); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index 28110549c4..e56cdab725 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -78,5 +78,4 @@ public class QueryCommunityAPI { return body; } - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index c990d6ebe8..27fb37e5b6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -6,10 +6,6 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +20,10 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import eu.dnetlib.dhp.bulktag.community.Provider; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; /** * @author miriam.baglioni @@ -57,7 +57,7 @@ public class Utils implements Serializable { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) return null; Provider p = new Provider(); - p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class)+"|" + d.getOpenaireId()); + p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId()); p.setSelectionConstraints(d.getSelectioncriteria()); if (p.getSelectionConstraints() != null) p.getSelectionConstraints().setSelection(resolver); @@ -175,27 +175,31 @@ public class Utils implements Serializable { .collect(Collectors.toList()); } - public static List getDatasourceCommunities(String baseURL)throws IOException{ + public static List getDatasourceCommunities(String baseURL) throws IOException { List validCommunities = getValidCommunities(baseURL); HashMap> map = new HashMap<>(); - String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|" ; + String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|"; validCommunities.forEach(c -> { try { - new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) - .forEach(d -> { - if (!map.keySet().contains(d.getOpenaireId())) - map.put(d.getOpenaireId(), new HashSet<>()); + new ObjectMapper() + .readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) + .forEach(d -> { + if (!map.keySet().contains(d.getOpenaireId())) + map.put(d.getOpenaireId(), new HashSet<>()); - map.get(d.getOpenaireId()).add(c.getId()); - }); + map.get(d.getOpenaireId()).add(c.getId()); + }); } catch (IOException e) { throw new RuntimeException(e); } }); - List temp = map.keySet().stream() - .map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))).collect(Collectors.toList()); + List temp = map + .keySet() + .stream() + .map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))) + .collect(Collectors.toList()); return temp; @@ -207,5 +211,4 @@ public class Utils implements Serializable { return temp; } - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java index 3af396a0af..cac02c0721 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java @@ -1,39 +1,40 @@ -package eu.dnetlib.dhp.api.model; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Datasource; +package eu.dnetlib.dhp.api.model; import java.io.Serializable; import java.util.List; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; + /** * @author miriam.baglioni * @Date 13/02/24 */ public class EntityCommunities implements Serializable { - private String entityId; - private List communitiesId; + private String entityId; + private List communitiesId; - public String getEntityId() { - return entityId; - } + public String getEntityId() { + return entityId; + } - public void setEntityId(String entityId) { - this.entityId = entityId; - } + public void setEntityId(String entityId) { + this.entityId = entityId; + } - public List getCommunitiesId() { - return communitiesId; - } + public List getCommunitiesId() { + return communitiesId; + } - public void setCommunitiesId(List communitiesId) { - this.communitiesId = communitiesId; - } + public void setCommunitiesId(List communitiesId) { + this.communitiesId = communitiesId; + } - public static EntityCommunities newInstance(String dsid, List csid){ - EntityCommunities dsc = new EntityCommunities(); - dsc.entityId = dsid; - dsc.communitiesId = csid; - return dsc; - } + public static EntityCommunities newInstance(String dsid, List csid) { + EntityCommunities dsc = new EntityCommunities(); + dsc.entityId = dsid; + dsc.communitiesId = csid; + return dsc; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 33c416b0de..10e9f6e0e2 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -7,11 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.api.model.CommunityEntityMap; -import eu.dnetlib.dhp.api.model.EntityCommunities; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -27,10 +22,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.api.model.EntityCommunities; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -53,6 +53,7 @@ public class SparkBulkTagJob { .getResourceAsStream( "/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json")); + log.info(args.toString()); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -71,7 +72,8 @@ public class SparkBulkTagJob { final String baseURL = parser.get("baseURL"); log.info("baseURL: {}", baseURL); - ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class); + log.info("pathMap: {}", parser.get("pathMap")); + ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class); log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); SparkConf conf = new SparkConf(); @@ -100,89 +102,123 @@ public class SparkBulkTagJob { }); } - private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) { + private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, + CommunityEntityMap communityProjects) { Dataset projects = readPath(spark, inputPath + "project", Project.class); - Dataset pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class)); + Dataset pc = spark + .createDataset( + communityProjects + .keySet() + .stream() + .map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))) + .collect(Collectors.toList()), + Encoders.bean(EntityCommunities.class)); - projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") - .map((MapFunction, Project>) t2 -> { - Project ds = t2._1(); - if (t2._2() != null){ - List context = - Optional.ofNullable(ds.getContext()) - .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) - .orElse(new ArrayList<>()); + projects + .joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") + .map((MapFunction, Project>) t2 -> { + Project ds = t2._1(); + if (t2._2() != null) { + List context = Optional + .ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); - if(!Optional.ofNullable(ds.getContext()).isPresent()) - ds.setContext(new ArrayList<>()); - t2._2().getCommunitiesId().forEach(c -> { - if(!context.contains(c)){ - Context con = new Context(); - con.setId(c); - con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, - OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con); - } - }); - } - return ds; - } ,Encoders.bean(Project.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "project"); + if (!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); + t2._2().getCommunitiesId().forEach(c -> { + if (!context.contains(c)) { + Context con = new Context(); + con.setId(c); + con + .setDataInfo( + Arrays + .asList( + OafMapperUtils + .dataInfo( + false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + TaggingConstants.CLASS_ID_DATASOURCE, + TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "1"))); + ds.getContext().add(con); + } + }); + } + return ds; + }, Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "project"); readPath(spark, outputPath + "project", Datasource.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + "project"); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + "project"); } - - private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List datasourceCommunities) { + private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, + List datasourceCommunities) { Dataset datasource = readPath(spark, inputPath + "datasource", Datasource.class); - Dataset dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); + Dataset dc = spark + .createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); - datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") - .map((MapFunction, Datasource>) t2 -> { - Datasource ds = t2._1(); - if (t2._2() != null){ + datasource + .joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") + .map((MapFunction, Datasource>) t2 -> { + Datasource ds = t2._1(); + if (t2._2() != null) { - List context = - Optional.ofNullable(ds.getContext()) - .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) - .orElse(new ArrayList<>()); + List context = Optional + .ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); - if(!Optional.ofNullable(ds.getContext()).isPresent()) - ds.setContext(new ArrayList<>()); + if (!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); + + t2._2().getCommunitiesId().forEach(c -> { + if (!context.contains(c)) { + Context con = new Context(); + con.setId(c); + con + .setDataInfo( + Arrays + .asList( + OafMapperUtils + .dataInfo( + false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + TaggingConstants.CLASS_ID_DATASOURCE, + TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "1"))); + ds.getContext().add(con); + } + }); + } + return ds; + }, Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "datasource"); - t2._2().getCommunitiesId().forEach(c -> { - if(!context.contains(c)){ - Context con = new Context(); - con.setId(c); - con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, - OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con); - } - }); - } - return ds; - } ,Encoders.bean(Datasource.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "datasource"); - readPath(spark, outputPath + "datasource", Datasource.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + "datasource"); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + "datasource"); } - private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) { @@ -273,6 +309,4 @@ public class SparkBulkTagJob { }; } - } - diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java index 6c9e7694f3..987e7afef4 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,31 +9,31 @@ import java.util.List; * @Date 22/01/24 */ public class Action implements Serializable { - private String clazz; - private String method; - private List params; + private String clazz; + private String method; + private List params; - public String getClazz() { - return clazz; - } + public String getClazz() { + return clazz; + } - public void setClazz(String clazz) { - this.clazz = clazz; - } + public void setClazz(String clazz) { + this.clazz = clazz; + } - public String getMethod() { - return method; - } + public String getMethod() { + return method; + } - public void setMethod(String method) { - this.method = method; - } + public void setMethod(String method) { + this.method = method; + } - public List getParams() { - return params; - } + public List getParams() { + return params; + } - public void setParams(List params) { - this.params = params; - } + public void setParams(List params) { + this.params = params; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java index fd3091fd0f..0ada4ebfbe 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,47 +9,47 @@ import java.io.Serializable; */ public class ExecSubstringAction implements Serializable { - private String value; - private String from; - private String to; + private String value; + private String from; + private String to; - public String getValue() { - return value; - } + public String getValue() { + return value; + } - public void setValue(String value) { - this.value = value; - } + public void setValue(String value) { + this.value = value; + } - public String getFrom() { - return from; - } + public String getFrom() { + return from; + } - public void setFrom(String from) { - this.from = from; - } + public void setFrom(String from) { + this.from = from; + } - public String getTo() { - return to; - } + public String getTo() { + return to; + } - public void setTo(String to) { - this.to = to; - } + public void setTo(String to) { + this.to = to; + } - public String execSubstring(){ - int to = Integer.valueOf(this.to); - int from = Integer.valueOf(this.from); + public String execSubstring() { + int to = Integer.valueOf(this.to); + int from = Integer.valueOf(this.from); - if(to < from || from > this.value.length()) - return ""; + if (to < from || from > this.value.length()) + return ""; - if(from < 0) - from = 0; - if (to > this.value.length()) - to = this.value.length(); + if (from < 0) + from = 0; + if (to > this.value.length()) + to = this.value.length(); - return this.value.substring(from, to); + return this.value.substring(from, to); - } + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java index ef3eb43ccb..6a0d20b579 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,23 +9,22 @@ import java.io.Serializable; */ public class MapModel implements Serializable { - private String path; - private Action action; + private String path; + private Action action; + public String getPath() { + return path; + } - public String getPath() { - return path; - } + public void setPath(String path) { + this.path = path; + } - public void setPath(String path) { - this.path = path; - } + public Action getAction() { + return action; + } - public Action getAction() { - return action; - } - - public void setAction(Action action) { - this.action = action; - } + public void setAction(Action action) { + this.action = action; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java index d23634605b..973b00b770 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -7,22 +8,22 @@ import java.io.Serializable; * @Date 22/01/24 */ public class Parameters implements Serializable { - private String paramName; - private String paramValue; + private String paramName; + private String paramValue; - public String getParamName() { - return paramName; - } + public String getParamName() { + return paramName; + } - public void setParamName(String paramName) { - this.paramName = paramName; - } + public void setParamName(String paramName) { + this.paramName = paramName; + } - public String getParamValue() { - return paramValue; - } + public String getParamValue() { + return paramValue; + } - public void setParamValue(String paramValue) { - this.paramValue = paramValue; - } + public void setParamValue(String paramValue) { + this.paramValue = paramValue; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java index 15b4d10b74..dc75aec37e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java @@ -1,11 +1,11 @@ package eu.dnetlib.dhp.bulktag.community; -import eu.dnetlib.dhp.bulktag.actions.MapModel; - import java.io.Serializable; import java.util.HashMap; +import eu.dnetlib.dhp.bulktag.actions.MapModel; + public class ProtoMap extends HashMap implements Serializable { public ProtoMap() { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 67422c2adb..2ea229e3e1 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -10,9 +10,6 @@ import java.lang.reflect.Method; import java.util.*; import java.util.stream.Collectors; -import com.jayway.jsonpath.PathNotFoundException; -import eu.dnetlib.dhp.bulktag.actions.MapModel; -import eu.dnetlib.dhp.bulktag.actions.Parameters; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +17,10 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import eu.dnetlib.dhp.bulktag.actions.MapModel; +import eu.dnetlib.dhp.bulktag.actions.Parameters; import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -40,7 +40,8 @@ public class ResultTagger implements Serializable { return (tmp != clist.size()); } - private Map> getParamMap(final Result result, Map params) throws NoSuchMethodException, InvocationTargetException { + private Map> getParamMap(final Result result, Map params) + throws NoSuchMethodException, InvocationTargetException { Map> param = new HashMap<>(); String json = new Gson().toJson(result, Result.class); DocumentContext jsonContext = JsonPath.parse(json); @@ -54,30 +55,34 @@ public class ResultTagger implements Serializable { try { String path = mapModel.getPath(); Object obj = jsonContext.read(path); - List pathValue ; - if(obj instanceof java.lang.String) - pathValue = Arrays.asList((String)obj); + List pathValue; + if (obj instanceof java.lang.String) + pathValue = Arrays.asList((String) obj); else - pathValue = (List)obj; - if(Optional.ofNullable(mapModel.getAction()).isPresent()){ + pathValue = (List) obj; + if (Optional.ofNullable(mapModel.getAction()).isPresent()) { Class c = Class.forName(mapModel.getAction().getClazz()); Object class_instance = c.newInstance(); Method setField = c.getMethod("setValue", String.class); setField.invoke(class_instance, pathValue.get(0)); - for(Parameters p : mapModel.getAction().getParams()){ + for (Parameters p : mapModel.getAction().getParams()) { setField = c.getMethod("set" + p.getParamName(), String.class); setField.invoke(class_instance, p.getParamValue()); } - param.put(key,Arrays.asList((String)c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance))); + param + .put( + key, Arrays + .asList((String) c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance))); } - else{ + else { param.put(key, pathValue); } - } catch (PathNotFoundException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { + } catch (PathNotFoundException | ClassNotFoundException | InstantiationException + | IllegalAccessException e) { param.put(key, new ArrayList<>()); } } @@ -86,9 +91,8 @@ public class ResultTagger implements Serializable { } public R enrichContextCriteria( - final R result, final CommunityConfiguration conf, final Map criteria) throws InvocationTargetException, NoSuchMethodException { - - + final R result, final CommunityConfiguration conf, final Map criteria) + throws InvocationTargetException, NoSuchMethodException { // Verify if the entity is deletedbyinference. In case verify if to clean the context list // from all the zenodo communities diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 05db040903..f252f64634 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -7,20 +7,23 @@ datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926 #allowedtypes=pubsrepository::institutional allowedtypes=Institutional outputPath=/tmp/miriam/graph/11_graph_orcid -pathMap ={"author":"$['author'][*]['fullname']", \ - "title":"$['title'][*]['value']",\ - "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ - "orcid_pending":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" ,\ - "contributor" : "$['contributor'][*]['value']",\ - "description" : "$['description'][*]['value']",\ - "subject" :"$['subject'][*]['value']" , \ - "fos" : "$['subject'][?(@['qualifier']['classid']=='FOS')].value" ,\ - "sdg" : "$['subject'][?(@['qualifier']['classid']=='SDG')].value",\ - "journal":"$['journal'].name",\ - "hostedby":"$['instance'][*]['hostedby']['key']",\ - "collectedfrom":"$['instance'][*]['collectedfrom']['key']",\ - "publisher":"$['publisher'].value",\ - "publicationyear":"$['dateofacceptance'].value"} +pathMap ={"author":{"path":"$['author'][*]['fullname']"}, \ + "title":{"path":"$['title'][*]['value']"},\ + "orcid":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']"} ,\ + "orcid_pending":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']"} ,\ + "contributor" : {"path":"$['contributor'][*]['value']"},\ + "description" : {"path":"$['description'][*]['value']"},\ + "subject" :{"path":"$['subject'][*]['value']"}, \ + "fos" : {"path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value"} ,\ + "sdg" : {"path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value"},\ + "journal":{"path":"$['journal'].name"},\ + "hostedby":{"path":"$['instance'][*]['hostedby']['key']"},\ + "collectedfrom":{"path":"$['instance'][*]['collectedfrom']['key']"},\ + "publisher":{"path":"$['publisher'].value"},\ + "publicationyear":{"path":"$['dateofacceptance'].value", "action":{"class":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction",\ + "method":"execSubstring",\ + "params":[{"param_name":"From","param_value":0},\ + {"param_name":"To","param_value":4}]}}} blacklist=empty allowedpids=orcid;orcid_pending baseURL = https://services.openaire.eu/openaire/community/ diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 882e259cf6..25ed68e039 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -33,25 +33,25 @@ public class BulkTagJobTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + - " \"title\":{\"path\":\"$['title'][*]['value']\"}, "+ - " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + - " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"+ - "\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"},"+ - " \"description\" : {\"path\":\"$['description'][*]['value']\"},"+ - " \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " + - " \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , "+ - "\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," + - "\"journal\":{\"path\":\"$['journal'].name\"}," + - "\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," + - "\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," + - "\"publisher\":{\"path\":\"$['publisher'].value\"}," + - "\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " + - " \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," + - "\"method\":\"execSubstring\","+ - "\"params\":[" + - "{\"paramName\":\"From\", \"paramValue\":0}, " + - "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; - + " \"title\":{\"path\":\"$['title'][*]['value']\"}, " + + " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + + " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ," + + + "\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"}," + + " \"description\" : {\"path\":\"$['description'][*]['value']\"}," + + " \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " + + " \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , " + + "\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," + + "\"journal\":{\"path\":\"$['journal'].name\"}," + + "\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," + + "\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," + + "\"publisher\":{\"path\":\"$['publisher'].value\"}," + + "\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " + + " \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," + + "\"method\":\"execSubstring\"," + + "\"params\":[" + + "{\"paramName\":\"From\", \"paramValue\":0}, " + + "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; private static SparkSession spark; @@ -325,7 +325,7 @@ public class BulkTagJobTest { "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - "-baseURL", "https://services.openaire.eu/openaire/community/", + "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", pathMap }); @@ -388,38 +388,38 @@ public class BulkTagJobTest { @Test void datasourceTag() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") - .getPath(); + .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") + .getPath(); SparkBulkTagJob - .main( - new String[] { + .main( + new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", sourcePath, - "-taggingConf", taggingConf, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, - "-outputPath", workingDir.toString() + "/", - "-baseURL", "https://services.openaire.eu/openaire/community/", - "-pathMap", pathMap - }); + "-outputPath", workingDir.toString() + "/", + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-pathMap", pathMap + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); + .textFile(workingDir.toString() + "/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); Assertions.assertEquals(3, tmp.count()); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Datasource.class)); + .createDataset(tmp.rdd(), Encoders.bean(Datasource.class)); verificationDataset.createOrReplaceTempView("datasource"); String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " - + "from datasource " - + "lateral view explode(context) c as MyT " - + "lateral view explode(MyT.datainfo) d as MyD " - + "where MyD.inferenceprovenance = 'bulktagging'"; + + "from datasource " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); @@ -427,19 +427,18 @@ public class BulkTagJobTest { Assertions.assertEquals(3, idExplodeCommunity.count()); Assertions - .assertEquals( - 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); Assertions - .assertEquals( - 3, - idExplodeCommunity - .filter("name = 'Bulktagging for Community - Datasource'") - .count()); + .assertEquals( + 3, + idExplodeCommunity + .filter("name = 'Bulktagging for Community - Datasource'") + .count()); Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count()); Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count()); - } @Test @@ -1668,60 +1667,94 @@ public class BulkTagJobTest { Assertions.assertEquals(0, spark.sql(query).count()); } - @Test void pubdateTest() throws Exception { - final String pathMap = BulkTagJobTest.pathMap; SparkBulkTagJob - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", - getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(), - "-taggingConf", - IOUtils - .toString( - BulkTagJobTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), - "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(), + "-taggingConf", + IOUtils + .toString( + BulkTagJobTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), + "-outputPath", workingDir.toString() + "/", + "-pathMap", pathMap + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); Assertions.assertEquals(10, tmp.count()); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); verificationDataset.createOrReplaceTempView("dataset"); - String query = "select id, MyT.id community, MyD.provenanceaction.classid " - + "from dataset " - + "lateral view explode(context) c as MyT " - + "lateral view explode(MyT.datainfo) d as MyD " - + "where MyD.inferenceprovenance = 'bulktagging'"; + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; org.apache.spark.sql.Dataset queryResult = spark.sql(query); queryResult.show(false); Assertions.assertEquals(5, queryResult.count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")).count()); - + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")) + .count()); } - - } From 43da7e1191ff4b0a9cd28792ddb8272b3639d757 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 19 Feb 2024 16:12:59 +0100 Subject: [PATCH 4/6] [Tagging Projects and Datasource] changed the way the pathMap parameter is passed. It was too long and was truncated --- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 39 ++++++++++- .../dhp/bulktag/community/ProtoMap.java | 2 +- .../eu/dnetlib/dhp/wf/main/job.properties | 2 +- .../bulktag/input_bulkTag_parameters.json | 7 +- .../bulktag/oozie_app/config-default.xml | 4 +- .../bulktag/oozie_app/workflow.xml | 1 + .../dnetlib/dhp/bulktag/BulkTagJobTest.java | 70 +++++++++++++++++-- .../eu/dnetlib/dhp/bulktag/pathMap/pathMap | 58 +++++++++++++++ 8 files changed, 171 insertions(+), 12 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 10e9f6e0e2..f78a4de550 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -4,10 +4,23 @@ package eu.dnetlib.dhp.bulktag; import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; +import org.apache.avro.TestAnnotation; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; @@ -18,8 +31,10 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; +import com.sun.media.sound.ModelInstrumentComparator; import eu.dnetlib.dhp.api.Utils; import eu.dnetlib.dhp.api.model.CommunityEntityMap; @@ -73,8 +88,20 @@ public class SparkBulkTagJob { log.info("baseURL: {}", baseURL); log.info("pathMap: {}", parser.get("pathMap")); - ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class); - log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); + String protoMappingPath = parser.get("pathMap"); + // log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + Configuration configuration = new Configuration(); + configuration.set("fs.defaultFS", hdfsNameNode); + FileSystem fs = FileSystem.get(configuration); + + String temp = IOUtils.toString(fs.open(new Path(protoMappingPath)), StandardCharsets.UTF_8); + log.info("protoMap: {}", temp); + ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class); + log.info("pathMap: {}", new Gson().toJson(protoMap)); SparkConf conf = new SparkConf(); CommunityConfiguration cc; @@ -96,7 +123,8 @@ public class SparkBulkTagJob { isSparkSessionManaged, spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); - execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc); + execBulkTag( + spark, inputPath, outputPath, protoMap, cc); execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); }); @@ -256,6 +284,11 @@ public class SparkBulkTagJob { ProtoMap protoMappingParams, CommunityConfiguration communityConfiguration) { + try { + System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } ModelSupport.entityTypes .keySet() .parallelStream() diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java index dc75aec37e..41edbcec66 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java @@ -7,8 +7,8 @@ import java.util.HashMap; import eu.dnetlib.dhp.bulktag.actions.MapModel; public class ProtoMap extends HashMap implements Serializable { - public ProtoMap() { super(); } + } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index f252f64634..52c2cafcee 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -1,4 +1,4 @@ -sourcePath=/tmp/beta_provision/graph/10_graph_orcid_enriched +sourcePath=/tmp/beta_provision/graph/09_graph_orcid_enriched resumeFrom=ResultProject allowedsemrelsorcidprop=isSupplementedBy;isSupplementTo allowedsemrelsresultproject=isSupplementedBy;isSupplementTo diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json index f8fe91223e..36c9600fec 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json @@ -34,5 +34,10 @@ "paramLongName": "baseURL", "paramDescription": "this parameter is to specify the api to be queried (beta or production)", "paramRequired": true - } + },{ + "paramName": "nn", + "paramLongName": "nameNode", + "paramDescription": "this parameter is to specify the api to be queried (beta or production)", + "paramRequired": true +} ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml index 2695253e62..c1675239c9 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/config-default.xml @@ -53,10 +53,10 @@ memoryOverhead - 3G + 4G partitions - 3284 + 15000 \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index c7a9e8a263..c4b4b7d641 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -76,6 +76,7 @@ --outputPath${workingDir}/bulktag/ --pathMap${pathMap} --baseURL${baseURL} + --nameNode${nameNode} diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 25ed68e039..4d563866ba 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -6,14 +6,19 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.ZENODO_COMMUNITY import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -25,14 +30,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import eu.dnetlib.dhp.bulktag.community.ProtoMap; import eu.dnetlib.dhp.schema.oaf.*; public class BulkTagJobTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + + public static final String pathMap = "{\"protoMap\":{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + " \"title\":{\"path\":\"$['title'][*]['value']\"}, " + " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ," @@ -51,7 +58,7 @@ public class BulkTagJobTest { "\"method\":\"execSubstring\"," + "\"params\":[" + "{\"paramName\":\"From\", \"paramValue\":0}, " + - "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; + "{\"paramName\":\"To\",\"paramValue\":4}]}}}}"; private static SparkSession spark; @@ -231,6 +238,14 @@ public class BulkTagJobTest { @Test void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception { + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); + final String sourcePath = getClass() .getResource( "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") @@ -246,7 +261,8 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap", + "-nameNode", "local" }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -316,6 +332,7 @@ public class BulkTagJobTest { final String sourcePath = getClass() .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getPath(); + SparkBulkTagJob .main( new String[] { @@ -390,6 +407,13 @@ public class BulkTagJobTest { final String sourcePath = getClass() .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") .getPath(); + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); SparkBulkTagJob .main( new String[] { @@ -400,7 +424,9 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", - "-pathMap", pathMap + + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1757,4 +1783,40 @@ public class BulkTagJobTest { } + @Test + public void prova() throws Exception { + LocalFileSystem fs = FileSystem.getLocal(new Configuration()); + fs + .copyFromLocalFile( + false, new org.apache.hadoop.fs.Path(getClass() + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getPath()), + new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); + + final String sourcePath = getClass() + .getResource( + "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") + .getPath(); + + ProtoMap prova = new Gson() + .fromJson( + "{\"author\":{\"path\":\"$['author'][]['fullname']\"},\"title\":{\"path\":\"$['title'][]['value']\"},\"orcid\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid')]['value']\"},\"orcid_pending\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid_pending')]['value']\"},\"contributor\":{\"path\":\"$['contributor'][]['value']\"},\"description\":{\"path\":\"$['description'][]['value']\"},\"subject\":{\"path\":\"$['subject'][]['value']\"},\"fos\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"},\"sdg\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"},\"journal\":{\"path\":\"$['journal'].name\"},\"hostedby\":{\"path\":\"$['instance'][]['hostedby']['key']\"},\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"},\"publisher\":{\"path\":\"$['publisher'].value\"},\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\",\"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\",\"method\":\"execSubstring\",\"params\":[{\"paramName\":\"From\",\"paramValue\":0},{\"paramName\":\"To\",\"paramValue\":4}]}}}", + ProtoMap.class); + SparkBulkTagJob + .main( + new String[] { + + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, + + "-outputPath", workingDir.toString() + "/", + + "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", + "-baseURL", "none", + "-nameNode", "local" + }); + + } + } diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap new file mode 100644 index 0000000000..e7bbfe941c --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/bulktag/pathMap/pathMap @@ -0,0 +1,58 @@ +{ + "author":{ + "path":"$['author'][*]['fullname']" + }, + "title":{ + "path":"$['title'][*]['value']" + }, + "orcid":{ + "path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" + }, + "orcid_pending":{ + "path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" + }, + "contributor":{ + "path":"$['contributor'][*]['value']" + }, + "description":{ + "path":"$['description'][*]['value']" + }, + "subject":{ + "path":"$['subject'][*]['value']" + }, + "fos":{ + "path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value" + }, + "sdg":{ + "path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value" + }, + "journal":{ + "path":"$['journal'].name" + }, + "hostedby":{ + "path":"$['instance'][*]['hostedby']['key']" + }, + "collectedfrom":{ + "path":"$['instance'][*]['collectedfrom']['key']" + }, + "publisher":{ + "path":"$['publisher'].value" + }, + "publicationyear":{ + "path":"$['dateofacceptance'].value", + "action":{ + "clazz":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction", + "method":"execSubstring", + "params":[ + { + "paramName":"From", + "paramValue":0 + }, + { + "paramName":"To", + "paramValue":4 + } + ] + } + } +} \ No newline at end of file From 94b931f7bd2c8f048e188ba0af6fac2f0c01920b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Tue, 26 Mar 2024 14:25:19 +0100 Subject: [PATCH 5/6] [BulkTagging - tag datasource and projects]merging with branch beta --- .../dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java | 2 +- .../src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 8adc889201..040c897829 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable { return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); - // project.setId(bipProjectScores.getProjectId()); + project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index f78a4de550..68005ea649 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -89,7 +89,7 @@ public class SparkBulkTagJob { log.info("pathMap: {}", parser.get("pathMap")); String protoMappingPath = parser.get("pathMap"); - // log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); + final String hdfsNameNode = parser.get("nameNode"); log.info("nameNode: {}", hdfsNameNode); From 75551ad4ec584886d91ee2959a63d1175ba79d80 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 26 Mar 2024 14:53:16 +0100 Subject: [PATCH 6/6] code formatting --- .../main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java | 1 - .../eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java | 5 +++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 68005ea649..b09543da17 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -90,7 +90,6 @@ public class SparkBulkTagJob { log.info("pathMap: {}", parser.get("pathMap")); String protoMappingPath = parser.get("pathMap"); - final String hdfsNameNode = parser.get("nameNode"); log.info("nameNode: {}", hdfsNameNode); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java index abae266a13..52ed373379 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -37,7 +38,7 @@ public class ExecSubstringAction implements Serializable { } public String execSubstring() { - return this.value.substring(Integer.valueOf(this.from), Integer.valueOf(this.to)); + return this.value.substring(Integer.valueOf(this.from), Integer.valueOf(this.to)); - } + } }