diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java index 354741690..0a7a4913e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/SparkBulkTagJob.java @@ -8,6 +8,7 @@ import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; +import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -92,6 +93,10 @@ public class SparkBulkTagJob { ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class); log.info("pathMap: {}", new Gson().toJson(protoMap)); + SelectionConstraints taggingConstraints = new Gson() + .fromJson(parser.get("taggingCriteria"), SelectionConstraints.class); + taggingConstraints.setSelection(VerbResolverFactory.newInstance()); + SparkConf conf = new SparkConf(); CommunityConfiguration cc; @@ -113,7 +118,7 @@ public class SparkBulkTagJob { spark -> { extendCommunityConfigurationForEOSC(spark, inputPath, cc); execBulkTag( - spark, inputPath, outputPath, protoMap, cc); + spark, inputPath, outputPath, protoMap, cc, taggingConstraints); execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL)); }); @@ -271,7 +276,8 @@ public class SparkBulkTagJob { String inputPath, String outputPath, ProtoMap protoMappingParams, - CommunityConfiguration communityConfiguration) { + CommunityConfiguration communityConfiguration, + SelectionConstraints taggingConstraints) { try { System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams)); @@ -289,21 +295,30 @@ public class SparkBulkTagJob { readPath(spark, inputPath + e.name(), resultClazz) .map(patchResult(), Encoders.bean(resultClazz)) .filter(Objects::nonNull) - .map( - (MapFunction) value -> resultTagger + .map((MapFunction) value -> resultTagger .enrichContextCriteria( - value, communityConfiguration, protoMappingParams), - Encoders.bean(resultClazz)) + value, communityConfiguration, protoMappingParams, taggingConstraints), + Encoders.bean(Tagging.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + e.name());// writing the tagging in the working dir for entity - readPath(spark, outputPath + e.name(), resultClazz) // copy the tagging in the actual result output path + readPath(spark, outputPath + e.name(), Tagging.class) + .map((MapFunction) t -> (R) t.getResult(), Encoders.bean(resultClazz) )// copy the tagging in the actual result output path .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(inputPath + e.name()); + + readPath(spark, outputPath + e.name(), Tagging.class) + .map((MapFunction) t -> t.getTag(), Encoders.STRING() )// copy the tagging in the actual result output path + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json("/user/miriam.baglioni/graphTagging/" + e.name()); + }); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/Tagging.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/Tagging.java new file mode 100644 index 000000000..d01d4b805 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/Tagging.java @@ -0,0 +1,32 @@ +package eu.dnetlib.dhp.bulktag; + +import java.io.Serializable; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class Tagging implements Serializable { + private String tag; + private R result; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public R getResult() { + return result; + } + + public void setResult(R result) { + this.result = result; + } + + public static Tagging newInstance(R result, String tag){ + Tagging t = new Tagging<>(); + t.result = result; + t.tag = tag; + return t; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java index 2ea229e3e..75189e7de 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/bulktag/community/ResultTagger.java @@ -10,6 +10,8 @@ import java.lang.reflect.Method; import java.util.*; import java.util.stream.Collectors; + +import eu.dnetlib.dhp.bulktag.Tagging; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +26,7 @@ import eu.dnetlib.dhp.bulktag.actions.Parameters; import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; /** Created by miriam on 02/08/2018. */ public class ResultTagger implements Serializable { @@ -90,17 +93,18 @@ public class ResultTagger implements Serializable { } - public R enrichContextCriteria( - final R result, final CommunityConfiguration conf, final Map criteria) + public Tagging enrichContextCriteria( + final R result, final CommunityConfiguration conf, final Map criteria, SelectionConstraints taggingConstraints) throws InvocationTargetException, NoSuchMethodException { // Verify if the entity is deletedbyinference. In case verify if to clean the context list // from all the zenodo communities if (result.getDataInfo().getDeletedbyinference()) { clearContext(result); - return result; + return Tagging.newInstance(result, null); } + String retString = null; final Map> param = getParamMap(result, criteria); // Execute the EOSCTag for the services @@ -118,6 +122,10 @@ public class ResultTagger implements Serializable { break; } +//adding code for tagging of results searching supplementaryMaterial + if(taggingConstraints.getCriteria().stream().anyMatch(crit -> crit.verifyCriteria(param))) + retString = "supplementary"; + // communities contains all the communities to be not added to the context final Set removeCommunities = new HashSet<>(); @@ -246,7 +254,7 @@ public class ResultTagger implements Serializable { /* Verify if there is something to bulktag */ if (communities.isEmpty()) { - return result; + return Tagging.newInstance(result, retString); } result.getContext().forEach(c -> { @@ -313,7 +321,7 @@ public class ResultTagger implements Serializable { result.getContext().stream().map(Context::getId).collect(Collectors.toSet())); if (communities.isEmpty()) - return result; + return Tagging.newInstance(result, retString); List toaddcontext = communities .stream() @@ -373,7 +381,7 @@ public class ResultTagger implements Serializable { .collect(Collectors.toList()); result.getContext().addAll(toaddcontext); - return result; + return Tagging.newInstance(result, retString); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json index 36c9600fe..478e3640c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json @@ -39,5 +39,10 @@ "paramLongName": "nameNode", "paramDescription": "this parameter is to specify the api to be queried (beta or production)", "paramRequired": true +},{ + "paramName": "tc", + "paramLongName": "taggingCriteria", + "paramDescription": "this parameter is to specify the api to be queried (beta or production)", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml index c4b4b7d64..4a6dd88e7 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/bulktag/oozie_app/workflow.xml @@ -77,6 +77,7 @@ --pathMap${pathMap} --baseURL${baseURL} --nameNode${nameNode} + --taggingCriteria${taggingCriteria} diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java index a5280a3b3..5e51b687f 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/bulktag/BulkTagJobTest.java @@ -68,6 +68,8 @@ public class BulkTagJobTest { private static String taggingConf = ""; + private static String taggingCriteria = "{\"criteria\":[{\"constraint\":[{\"verb\":\"starts_with_caseinsensitive\",\"field\":\"title\",\"value\":\"supplementary material\"}]}]}"; + static { try { taggingConf = IOUtils @@ -119,7 +121,10 @@ public class BulkTagJobTest { getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(), "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -156,7 +161,10 @@ public class BulkTagJobTest { "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -242,15 +250,15 @@ public class BulkTagJobTest { fs .copyFromLocalFile( false, new org.apache.hadoop.fs.Path(getClass() - .getResource("/eu/dnetlib/dhp/bulktag/pathMap/") + .getResource("/eu/dnetlib/dhp/bulktag/pathMap/pathMap") .getPath()), new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap")); - + final String pathMap = workingDir.toString() + "/data/bulktagging/protoMap"; final String sourcePath = getClass() .getResource( "/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/") .getPath(); - final String pathMap = BulkTagJobTest.pathMap; + SparkBulkTagJob .main( new String[] { @@ -262,7 +270,9 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap", - "-nameNode", "local" + "-nameNode", "local", + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/" }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); @@ -342,8 +352,11 @@ public class BulkTagJobTest { "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", - "-baseURL", "https://services.openaire.eu/openaire/community/", - "-pathMap", pathMap + + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -424,7 +437,8 @@ public class BulkTagJobTest { "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", - "-nameNode", "local" + "-nameNode", "local", + "-taggingCriteria", taggingCriteria }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -481,7 +495,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -602,7 +619,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -730,7 +750,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -830,7 +853,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -873,7 +899,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -927,7 +956,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -976,7 +1008,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1194,7 +1229,11 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1312,7 +1351,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1432,7 +1474,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1472,7 +1517,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1517,7 +1565,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1554,7 +1605,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1629,7 +1683,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1667,7 +1724,10 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", // "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", pathMap, - "-taggingConf", taggingConf + "-taggingConf", taggingConf, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1708,7 +1768,10 @@ public class BulkTagJobTest { .getResourceAsStream( "/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), "-outputPath", workingDir.toString() + "/", - "-pathMap", pathMap + "-pathMap", pathMap, + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -1811,8 +1874,9 @@ public class BulkTagJobTest { "-outputPath", workingDir.toString() + "/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", - "-baseURL", "none", - "-nameNode", "local" + "-taggingCriteria", taggingCriteria, + "-baseURL", "https://services.openaire.eu/openaire/community/", + "-nameNode", "local" }); }