From 8dae10b442ac0d6a1207868615b49e95e21b18a3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 14 Feb 2024 14:57:08 +0100 Subject: [PATCH] - --- .../main/java/eu/dnetlib/dhp/MoveResult.java | 2 +- .../eu/dnetlib/dhp/api/QueryCommunityAPI.java | 1 - .../main/java/eu/dnetlib/dhp/api/Utils.java | 35 ++-- .../dhp/api/model/EntityCommunities.java | 47 ++--- .../dnetlib/dhp/bulktag/SparkBulkTagJob.java | 184 ++++++++++------- .../dnetlib/dhp/bulktag/actions/Action.java | 43 ++-- .../bulktag/actions/ExecSubstringAction.java | 65 +++--- .../dnetlib/dhp/bulktag/actions/MapModel.java | 30 +-- .../dhp/bulktag/actions/Parameters.java | 29 +-- .../dhp/bulktag/community/ProtoMap.java | 4 +- .../dhp/bulktag/community/ResultTagger.java | 36 ++-- .../eu/dnetlib/dhp/wf/main/job.properties | 31 +-- .../dnetlib/dhp/bulktag/BulkTagJobTest.java | 189 ++++++++++-------- 13 files changed, 388 insertions(+), 308 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java index c71ccb439..6731f2332 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/MoveResult.java @@ -30,7 +30,7 @@ public class MoveResult implements Serializable { public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( - MoveResult.class + MoveResult.class .getResourceAsStream( "/eu/dnetlib/dhp/wf/subworkflows/input_moveresult_parameters.json")); diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java index 28110549c..e56cdab72 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/QueryCommunityAPI.java @@ -78,5 +78,4 @@ public class QueryCommunityAPI { return body; } - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java index c990d6ebe..27fb37e5b 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/Utils.java @@ -6,10 +6,6 @@ import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Datasource; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +20,10 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration; import eu.dnetlib.dhp.bulktag.community.Provider; import eu.dnetlib.dhp.bulktag.criteria.VerbResolver; import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; /** * @author miriam.baglioni @@ -57,7 +57,7 @@ public class Utils implements Serializable { if (d.getEnabled() == null || Boolean.FALSE.equals(d.getEnabled())) return null; Provider p = new Provider(); - p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class)+"|" + d.getOpenaireId()); + p.setOpenaireId(ModelSupport.getIdPrefix(Datasource.class) + "|" + d.getOpenaireId()); p.setSelectionConstraints(d.getSelectioncriteria()); if (p.getSelectionConstraints() != null) p.getSelectionConstraints().setSelection(resolver); @@ -175,27 +175,31 @@ public class Utils implements Serializable { .collect(Collectors.toList()); } - public static List getDatasourceCommunities(String baseURL)throws IOException{ + public static List getDatasourceCommunities(String baseURL) throws IOException { List validCommunities = getValidCommunities(baseURL); HashMap> map = new HashMap<>(); - String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|" ; + String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|"; validCommunities.forEach(c -> { try { - new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) - .forEach(d -> { - if (!map.keySet().contains(d.getOpenaireId())) - map.put(d.getOpenaireId(), new HashSet<>()); + new ObjectMapper() + .readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class) + .forEach(d -> { + if (!map.keySet().contains(d.getOpenaireId())) + map.put(d.getOpenaireId(), new HashSet<>()); - map.get(d.getOpenaireId()).add(c.getId()); - }); + map.get(d.getOpenaireId()).add(c.getId()); + }); } catch (IOException e) { throw new RuntimeException(e); } }); - List temp = map.keySet().stream() - .map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))).collect(Collectors.toList()); + List temp = map + .keySet() + .stream() + .map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))) + .collect(Collectors.toList()); return temp; @@ -207,5 +211,4 @@ public class Utils implements Serializable { return temp; } - } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java index 3af396a0a..cac02c072 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/api/model/EntityCommunities.java @@ -1,39 +1,40 @@ -package eu.dnetlib.dhp.api.model; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Datasource; +package eu.dnetlib.dhp.api.model; import java.io.Serializable; import java.util.List; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Datasource; + /** * @author miriam.baglioni * @Date 13/02/24 */ public class EntityCommunities implements Serializable { - private String entityId; - private List communitiesId; + private String entityId; + private List communitiesId; - public String getEntityId() { - return entityId; - } + public String getEntityId() { + return entityId; + } - public void setEntityId(String entityId) { - this.entityId = entityId; - } + public void setEntityId(String entityId) { + this.entityId = entityId; + } - public List getCommunitiesId() { - return communitiesId; - } + public List getCommunitiesId() { + return communitiesId; + } - public void setCommunitiesId(List communitiesId) { - this.communitiesId = communitiesId; - } + public void setCommunitiesId(List communitiesId) { + this.communitiesId = communitiesId; + } - public static EntityCommunities newInstance(String dsid, List csid){ - EntityCommunities dsc = new EntityCommunities(); - dsc.entityId = dsid; - dsc.communitiesId = csid; - return dsc; - } + public static EntityCommunities newInstance(String dsid, List csid) { + EntityCommunities dsc = new EntityCommunities(); + dsc.entityId = dsid; + dsc.communitiesId = csid; + return dsc; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 33c416b0d..10e9f6e0e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -7,11 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.api.model.CommunityEntityMap; -import eu.dnetlib.dhp.api.model.EntityCommunities; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Context; -import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -27,10 +22,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import eu.dnetlib.dhp.api.Utils; +import eu.dnetlib.dhp.api.model.CommunityEntityMap; +import eu.dnetlib.dhp.api.model.EntityCommunities; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.bulktag.community.*; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -53,6 +53,7 @@ public class SparkBulkTagJob { .getResourceAsStream( "/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json")); + log.info(args.toString()); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -71,7 +72,8 @@ public class SparkBulkTagJob { final String baseURL = parser.get("baseURL"); log.info("baseURL: {}", baseURL); - ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class); + log.info("pathMap: {}", parser.get("pathMap")); + ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class); log.info("pathMap: {}", new Gson().toJson(protoMappingParams)); SparkConf conf = new SparkConf(); @@ -100,89 +102,123 @@ public class SparkBulkTagJob { }); } - private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) { + private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, + CommunityEntityMap communityProjects) { Dataset projects = readPath(spark, inputPath + "project", Project.class); - Dataset pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class)); + Dataset pc = spark + .createDataset( + communityProjects + .keySet() + .stream() + .map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))) + .collect(Collectors.toList()), + Encoders.bean(EntityCommunities.class)); - projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") - .map((MapFunction, Project>) t2 -> { - Project ds = t2._1(); - if (t2._2() != null){ - List context = - Optional.ofNullable(ds.getContext()) - .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) - .orElse(new ArrayList<>()); + projects + .joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") + .map((MapFunction, Project>) t2 -> { + Project ds = t2._1(); + if (t2._2() != null) { + List context = Optional + .ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); - if(!Optional.ofNullable(ds.getContext()).isPresent()) - ds.setContext(new ArrayList<>()); - t2._2().getCommunitiesId().forEach(c -> { - if(!context.contains(c)){ - Context con = new Context(); - con.setId(c); - con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, - OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con); - } - }); - } - return ds; - } ,Encoders.bean(Project.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "project"); + if (!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); + t2._2().getCommunitiesId().forEach(c -> { + if (!context.contains(c)) { + Context con = new Context(); + con.setId(c); + con + .setDataInfo( + Arrays + .asList( + OafMapperUtils + .dataInfo( + false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + TaggingConstants.CLASS_ID_DATASOURCE, + TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "1"))); + ds.getContext().add(con); + } + }); + } + return ds; + }, Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "project"); readPath(spark, outputPath + "project", Datasource.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + "project"); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + "project"); } - - private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List datasourceCommunities) { + private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, + List datasourceCommunities) { Dataset datasource = readPath(spark, inputPath + "datasource", Datasource.class); - Dataset dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); + Dataset dc = spark + .createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class)); - datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") - .map((MapFunction, Datasource>) t2 -> { - Datasource ds = t2._1(); - if (t2._2() != null){ + datasource + .joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left") + .map((MapFunction, Datasource>) t2 -> { + Datasource ds = t2._1(); + if (t2._2() != null) { - List context = - Optional.ofNullable(ds.getContext()) - .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) - .orElse(new ArrayList<>()); + List context = Optional + .ofNullable(ds.getContext()) + .map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())) + .orElse(new ArrayList<>()); - if(!Optional.ofNullable(ds.getContext()).isPresent()) - ds.setContext(new ArrayList<>()); + if (!Optional.ofNullable(ds.getContext()).isPresent()) + ds.setContext(new ArrayList<>()); + + t2._2().getCommunitiesId().forEach(c -> { + if (!context.contains(c)) { + Context con = new Context(); + con.setId(c); + con + .setDataInfo( + Arrays + .asList( + OafMapperUtils + .dataInfo( + false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, + OafMapperUtils + .qualifier( + TaggingConstants.CLASS_ID_DATASOURCE, + TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "1"))); + ds.getContext().add(con); + } + }); + } + return ds; + }, Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "datasource"); - t2._2().getCommunitiesId().forEach(c -> { - if(!context.contains(c)){ - Context con = new Context(); - con.setId(c); - con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, - OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1"))); - ds.getContext().add(con); - } - }); - } - return ds; - } ,Encoders.bean(Datasource.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath + "datasource"); - readPath(spark, outputPath + "datasource", Datasource.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(inputPath + "datasource"); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(inputPath + "datasource"); } - private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) { @@ -273,6 +309,4 @@ public class SparkBulkTagJob { }; } - } - diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java index 6c9e7694f..987e7afef 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Action.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,31 +9,31 @@ import java.util.List; * @Date 22/01/24 */ public class Action implements Serializable { - private String clazz; - private String method; - private List params; + private String clazz; + private String method; + private List params; - public String getClazz() { - return clazz; - } + public String getClazz() { + return clazz; + } - public void setClazz(String clazz) { - this.clazz = clazz; - } + public void setClazz(String clazz) { + this.clazz = clazz; + } - public String getMethod() { - return method; - } + public String getMethod() { + return method; + } - public void setMethod(String method) { - this.method = method; - } + public void setMethod(String method) { + this.method = method; + } - public List getParams() { - return params; - } + public List getParams() { + return params; + } - public void setParams(List params) { - this.params = params; - } + public void setParams(List params) { + this.params = params; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java index fd3091fd0..0ada4ebfb 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/ExecSubstringAction.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,47 +9,47 @@ import java.io.Serializable; */ public class ExecSubstringAction implements Serializable { - private String value; - private String from; - private String to; + private String value; + private String from; + private String to; - public String getValue() { - return value; - } + public String getValue() { + return value; + } - public void setValue(String value) { - this.value = value; - } + public void setValue(String value) { + this.value = value; + } - public String getFrom() { - return from; - } + public String getFrom() { + return from; + } - public void setFrom(String from) { - this.from = from; - } + public void setFrom(String from) { + this.from = from; + } - public String getTo() { - return to; - } + public String getTo() { + return to; + } - public void setTo(String to) { - this.to = to; - } + public void setTo(String to) { + this.to = to; + } - public String execSubstring(){ - int to = Integer.valueOf(this.to); - int from = Integer.valueOf(this.from); + public String execSubstring() { + int to = Integer.valueOf(this.to); + int from = Integer.valueOf(this.from); - if(to < from || from > this.value.length()) - return ""; + if (to < from || from > this.value.length()) + return ""; - if(from < 0) - from = 0; - if (to > this.value.length()) - to = this.value.length(); + if (from < 0) + from = 0; + if (to > this.value.length()) + to = this.value.length(); - return this.value.substring(from, to); + return this.value.substring(from, to); - } + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java index ef3eb43cc..6a0d20b57 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/MapModel.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -8,23 +9,22 @@ import java.io.Serializable; */ public class MapModel implements Serializable { - private String path; - private Action action; + private String path; + private Action action; + public String getPath() { + return path; + } - public String getPath() { - return path; - } + public void setPath(String path) { + this.path = path; + } - public void setPath(String path) { - this.path = path; - } + public Action getAction() { + return action; + } - public Action getAction() { - return action; - } - - public void setAction(Action action) { - this.action = action; - } + public void setAction(Action action) { + this.action = action; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java index d23634605..973b00b77 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/actions/Parameters.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.bulktag.actions; import java.io.Serializable; @@ -7,22 +8,22 @@ import java.io.Serializable; * @Date 22/01/24 */ public class Parameters implements Serializable { - private String paramName; - private String paramValue; + private String paramName; + private String paramValue; - public String getParamName() { - return paramName; - } + public String getParamName() { + return paramName; + } - public void setParamName(String paramName) { - this.paramName = paramName; - } + public void setParamName(String paramName) { + this.paramName = paramName; + } - public String getParamValue() { - return paramValue; - } + public String getParamValue() { + return paramValue; + } - public void setParamValue(String paramValue) { - this.paramValue = paramValue; - } + public void setParamValue(String paramValue) { + this.paramValue = paramValue; + } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java index 15b4d10b7..dc75aec37 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ProtoMap.java @@ -1,11 +1,11 @@ package eu.dnetlib.dhp.bulktag.community; -import eu.dnetlib.dhp.bulktag.actions.MapModel; - import java.io.Serializable; import java.util.HashMap; +import eu.dnetlib.dhp.bulktag.actions.MapModel; + public class ProtoMap extends HashMap implements Serializable { public ProtoMap() { diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 67422c2ad..2ea229e3e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -10,9 +10,6 @@ import java.lang.reflect.Method; import java.util.*; import java.util.stream.Collectors; -import com.jayway.jsonpath.PathNotFoundException; -import eu.dnetlib.dhp.bulktag.actions.MapModel; -import eu.dnetlib.dhp.bulktag.actions.Parameters; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +17,10 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import eu.dnetlib.dhp.bulktag.actions.MapModel; +import eu.dnetlib.dhp.bulktag.actions.Parameters; import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -40,7 +40,8 @@ public class ResultTagger implements Serializable { return (tmp != clist.size()); } - private Map> getParamMap(final Result result, Map params) throws NoSuchMethodException, InvocationTargetException { + private Map> getParamMap(final Result result, Map params) + throws NoSuchMethodException, InvocationTargetException { Map> param = new HashMap<>(); String json = new Gson().toJson(result, Result.class); DocumentContext jsonContext = JsonPath.parse(json); @@ -54,30 +55,34 @@ public class ResultTagger implements Serializable { try { String path = mapModel.getPath(); Object obj = jsonContext.read(path); - List pathValue ; - if(obj instanceof java.lang.String) - pathValue = Arrays.asList((String)obj); + List pathValue; + if (obj instanceof java.lang.String) + pathValue = Arrays.asList((String) obj); else - pathValue = (List)obj; - if(Optional.ofNullable(mapModel.getAction()).isPresent()){ + pathValue = (List) obj; + if (Optional.ofNullable(mapModel.getAction()).isPresent()) { Class c = Class.forName(mapModel.getAction().getClazz()); Object class_instance = c.newInstance(); Method setField = c.getMethod("setValue", String.class); setField.invoke(class_instance, pathValue.get(0)); - for(Parameters p : mapModel.getAction().getParams()){ + for (Parameters p : mapModel.getAction().getParams()) { setField = c.getMethod("set" + p.getParamName(), String.class); setField.invoke(class_instance, p.getParamValue()); } - param.put(key,Arrays.asList((String)c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance))); + param + .put( + key, Arrays + .asList((String) c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance))); } - else{ + else { param.put(key, pathValue); } - } catch (PathNotFoundException | ClassNotFoundException | InstantiationException | IllegalAccessException e) { + } catch (PathNotFoundException | ClassNotFoundException | InstantiationException + | IllegalAccessException e) { param.put(key, new ArrayList<>()); } } @@ -86,9 +91,8 @@ public class ResultTagger implements Serializable { } public R enrichContextCriteria( - final R result, final CommunityConfiguration conf, final Map criteria) throws InvocationTargetException, NoSuchMethodException { - - + final R result, final CommunityConfiguration conf, final Map criteria) + throws InvocationTargetException, NoSuchMethodException { // Verify if the entity is deletedbyinference. In case verify if to clean the context list // from all the zenodo communities diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties index 05db04090..f252f6463 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/job.properties @@ -7,20 +7,23 @@ datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926 #allowedtypes=pubsrepository::institutional allowedtypes=Institutional outputPath=/tmp/miriam/graph/11_graph_orcid -pathMap ={"author":"$['author'][*]['fullname']", \ - "title":"$['title'][*]['value']",\ - "orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\ - "orcid_pending":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" ,\ - "contributor" : "$['contributor'][*]['value']",\ - "description" : "$['description'][*]['value']",\ - "subject" :"$['subject'][*]['value']" , \ - "fos" : "$['subject'][?(@['qualifier']['classid']=='FOS')].value" ,\ - "sdg" : "$['subject'][?(@['qualifier']['classid']=='SDG')].value",\ - "journal":"$['journal'].name",\ - "hostedby":"$['instance'][*]['hostedby']['key']",\ - "collectedfrom":"$['instance'][*]['collectedfrom']['key']",\ - "publisher":"$['publisher'].value",\ - "publicationyear":"$['dateofacceptance'].value"} +pathMap ={"author":{"path":"$['author'][*]['fullname']"}, \ + "title":{"path":"$['title'][*]['value']"},\ + "orcid":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']"} ,\ + "orcid_pending":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']"} ,\ + "contributor" : {"path":"$['contributor'][*]['value']"},\ + "description" : {"path":"$['description'][*]['value']"},\ + "subject" :{"path":"$['subject'][*]['value']"}, \ + "fos" : {"path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value"} ,\ + "sdg" : {"path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value"},\ + "journal":{"path":"$['journal'].name"},\ + "hostedby":{"path":"$['instance'][*]['hostedby']['key']"},\ + "collectedfrom":{"path":"$['instance'][*]['collectedfrom']['key']"},\ + "publisher":{"path":"$['publisher'].value"},\ + "publicationyear":{"path":"$['dateofacceptance'].value", "action":{"class":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction",\ + "method":"execSubstring",\ + "params":[{"param_name":"From","param_value":0},\ + {"param_name":"To","param_value":4}]}}} blacklist=empty allowedpids=orcid;orcid_pending baseURL = https://services.openaire.eu/openaire/community/ diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index 882e259cf..25ed68e03 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -33,25 +33,25 @@ public class BulkTagJobTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," + - " \"title\":{\"path\":\"$['title'][*]['value']\"}, "+ - " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + - " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"+ - "\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"},"+ - " \"description\" : {\"path\":\"$['description'][*]['value']\"},"+ - " \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " + - " \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , "+ - "\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," + - "\"journal\":{\"path\":\"$['journal'].name\"}," + - "\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," + - "\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," + - "\"publisher\":{\"path\":\"$['publisher'].value\"}," + - "\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " + - " \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," + - "\"method\":\"execSubstring\","+ - "\"params\":[" + - "{\"paramName\":\"From\", \"paramValue\":0}, " + - "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; - + " \"title\":{\"path\":\"$['title'][*]['value']\"}, " + + " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " + + " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ," + + + "\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"}," + + " \"description\" : {\"path\":\"$['description'][*]['value']\"}," + + " \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " + + " \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , " + + "\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," + + "\"journal\":{\"path\":\"$['journal'].name\"}," + + "\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," + + "\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," + + "\"publisher\":{\"path\":\"$['publisher'].value\"}," + + "\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " + + " \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," + + "\"method\":\"execSubstring\"," + + "\"params\":[" + + "{\"paramName\":\"From\", \"paramValue\":0}, " + + "{\"paramName\":\"To\",\"paramValue\":4}]}}}"; private static SparkSession spark; @@ -325,7 +325,7 @@ public class BulkTagJobTest { "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - "-baseURL", "https://services.openaire.eu/openaire/community/", + "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", pathMap }); @@ -388,38 +388,38 @@ public class BulkTagJobTest { @Test void datasourceTag() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") - .getPath(); + .getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/") + .getPath(); SparkBulkTagJob - .main( - new String[] { + .main( + new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", sourcePath, - "-taggingConf", taggingConf, + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", sourcePath, + "-taggingConf", taggingConf, - "-outputPath", workingDir.toString() + "/", - "-baseURL", "https://services.openaire.eu/openaire/community/", - "-pathMap", pathMap - }); + "-outputPath", workingDir.toString() + "/", + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-pathMap", pathMap + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/datasource") - .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); + .textFile(workingDir.toString() + "/datasource") + .map(item -> OBJECT_MAPPER.readValue(item, Datasource.class)); Assertions.assertEquals(3, tmp.count()); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Datasource.class)); + .createDataset(tmp.rdd(), Encoders.bean(Datasource.class)); verificationDataset.createOrReplaceTempView("datasource"); String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name " - + "from datasource " - + "lateral view explode(context) c as MyT " - + "lateral view explode(MyT.datainfo) d as MyD " - + "where MyD.inferenceprovenance = 'bulktagging'"; + + "from datasource " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query); @@ -427,19 +427,18 @@ public class BulkTagJobTest { Assertions.assertEquals(3, idExplodeCommunity.count()); Assertions - .assertEquals( - 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); + .assertEquals( + 3, idExplodeCommunity.filter("provenance = 'community:datasource'").count()); Assertions - .assertEquals( - 3, - idExplodeCommunity - .filter("name = 'Bulktagging for Community - Datasource'") - .count()); + .assertEquals( + 3, + idExplodeCommunity + .filter("name = 'Bulktagging for Community - Datasource'") + .count()); Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count()); Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count()); - } @Test @@ -1668,60 +1667,94 @@ public class BulkTagJobTest { Assertions.assertEquals(0, spark.sql(query).count()); } - @Test void pubdateTest() throws Exception { - final String pathMap = BulkTagJobTest.pathMap; SparkBulkTagJob - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-sourcePath", - getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(), - "-taggingConf", - IOUtils - .toString( - BulkTagJobTest.class - .getResourceAsStream( - "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), - "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-sourcePath", + getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(), + "-taggingConf", + IOUtils + .toString( + BulkTagJobTest.class + .getResourceAsStream( + "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), + "-outputPath", workingDir.toString() + "/", + "-pathMap", pathMap + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/dataset") - .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); + .textFile(workingDir.toString() + "/dataset") + .map(item -> OBJECT_MAPPER.readValue(item, Dataset.class)); Assertions.assertEquals(10, tmp.count()); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); + .createDataset(tmp.rdd(), Encoders.bean(Dataset.class)); verificationDataset.createOrReplaceTempView("dataset"); - String query = "select id, MyT.id community, MyD.provenanceaction.classid " - + "from dataset " - + "lateral view explode(context) c as MyT " - + "lateral view explode(MyT.datainfo) d as MyD " - + "where MyD.inferenceprovenance = 'bulktagging'"; + + "from dataset " + + "lateral view explode(context) c as MyT " + + "lateral view explode(MyT.datainfo) d as MyD " + + "where MyD.inferenceprovenance = 'bulktagging'"; org.apache.spark.sql.Dataset queryResult = spark.sql(query); queryResult.show(false); Assertions.assertEquals(5, queryResult.count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")).count()); - Assertions.assertEquals(1, queryResult.filter((FilterFunction) r -> r.getAs("id").equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")).count()); - + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")) + .count()); + Assertions + .assertEquals( + 1, + queryResult + .filter( + (FilterFunction) r -> r + .getAs("id") + .equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")) + .count()); } - - }