[Annotation]First tentative version toe xtend bulk tagging for annotation. Maybe not the correct way. Application of annotation on deduped records?

This commit is contained in:
Miriam Baglioni 2024-07-03 12:19:47 +02:00
parent c7634c55c7
commit 740cfa77fb
6 changed files with 165 additions and 40 deletions

View File

@ -8,6 +8,7 @@ import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -92,6 +93,10 @@ public class SparkBulkTagJob {
ProtoMap protoMap = new Gson().fromJson(temp, ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMap));
SelectionConstraints taggingConstraints = new Gson()
.fromJson(parser.get("taggingCriteria"), SelectionConstraints.class);
taggingConstraints.setSelection(VerbResolverFactory.newInstance());
SparkConf conf = new SparkConf();
CommunityConfiguration cc;
@ -113,7 +118,7 @@ public class SparkBulkTagJob {
spark -> {
extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag(
spark, inputPath, outputPath, protoMap, cc);
spark, inputPath, outputPath, protoMap, cc, taggingConstraints);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL));
});
@ -271,7 +276,8 @@ public class SparkBulkTagJob {
String inputPath,
String outputPath,
ProtoMap protoMappingParams,
CommunityConfiguration communityConfiguration) {
CommunityConfiguration communityConfiguration,
SelectionConstraints taggingConstraints) {
try {
System.out.println(new ObjectMapper().writeValueAsString(protoMappingParams));
@ -289,21 +295,30 @@ public class SparkBulkTagJob {
readPath(spark, inputPath + e.name(), resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.map((MapFunction<R, Tagging>) value -> resultTagger
.enrichContextCriteria(
value, communityConfiguration, protoMappingParams),
Encoders.bean(resultClazz))
value, communityConfiguration, protoMappingParams, taggingConstraints),
Encoders.bean(Tagging.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + e.name());// writing the tagging in the working dir for entity
readPath(spark, outputPath + e.name(), resultClazz) // copy the tagging in the actual result output path
readPath(spark, outputPath + e.name(), Tagging.class)
.map((MapFunction<Tagging, R>) t -> (R) t.getResult(), Encoders.bean(resultClazz) )// copy the tagging in the actual result output path
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(inputPath + e.name());
readPath(spark, outputPath + e.name(), Tagging.class)
.map((MapFunction<Tagging, String>) t -> t.getTag(), Encoders.STRING() )// copy the tagging in the actual result output path
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json("/user/miriam.baglioni/graphTagging/" + e.name());
});
}

View File

@ -0,0 +1,32 @@
package eu.dnetlib.dhp.bulktag;
import java.io.Serializable;
import eu.dnetlib.dhp.schema.oaf.Result;
public class Tagging <R extends Result> implements Serializable {
private String tag;
private R result;
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public R getResult() {
return result;
}
public void setResult(R result) {
this.result = result;
}
public static <R extends Result> Tagging newInstance(R result, String tag){
Tagging t = new Tagging<>();
t.result = result;
t.tag = tag;
return t;
}
}

View File

@ -10,6 +10,8 @@ import java.lang.reflect.Method;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.bulktag.Tagging;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,6 +26,7 @@ import eu.dnetlib.dhp.bulktag.actions.Parameters;
import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/** Created by miriam on 02/08/2018. */
public class ResultTagger implements Serializable {
@ -90,17 +93,18 @@ public class ResultTagger implements Serializable {
}
public <R extends Result> R enrichContextCriteria(
final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria)
public <R extends Result> Tagging enrichContextCriteria(
final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria, SelectionConstraints taggingConstraints)
throws InvocationTargetException, NoSuchMethodException {
// Verify if the entity is deletedbyinference. In case verify if to clean the context list
// from all the zenodo communities
if (result.getDataInfo().getDeletedbyinference()) {
clearContext(result);
return result;
return Tagging.newInstance(result, null);
}
String retString = null;
final Map<String, List<String>> param = getParamMap(result, criteria);
// Execute the EOSCTag for the services
@ -118,6 +122,10 @@ public class ResultTagger implements Serializable {
break;
}
//adding code for tagging of results searching supplementaryMaterial
if(taggingConstraints.getCriteria().stream().anyMatch(crit -> crit.verifyCriteria(param)))
retString = "supplementary";
// communities contains all the communities to be not added to the context
final Set<String> removeCommunities = new HashSet<>();
@ -246,7 +254,7 @@ public class ResultTagger implements Serializable {
/* Verify if there is something to bulktag */
if (communities.isEmpty()) {
return result;
return Tagging.newInstance(result, retString);
}
result.getContext().forEach(c -> {
@ -313,7 +321,7 @@ public class ResultTagger implements Serializable {
result.getContext().stream().map(Context::getId).collect(Collectors.toSet()));
if (communities.isEmpty())
return result;
return Tagging.newInstance(result, retString);
List<Context> toaddcontext = communities
.stream()
@ -373,7 +381,7 @@ public class ResultTagger implements Serializable {
.collect(Collectors.toList());
result.getContext().addAll(toaddcontext);
return result;
return Tagging.newInstance(result, retString);
}
}

View File

@ -39,5 +39,10 @@
"paramLongName": "nameNode",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
},{
"paramName": "tc",
"paramLongName": "taggingCriteria",
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
"paramRequired": true
}
]

View File

@ -77,6 +77,7 @@
<arg>--pathMap</arg><arg>${pathMap}</arg>
<arg>--baseURL</arg><arg>${baseURL}</arg>
<arg>--nameNode</arg><arg>${nameNode}</arg>
<arg>--taggingCriteria</arg><arg>${taggingCriteria}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -68,6 +68,8 @@ public class BulkTagJobTest {
private static String taggingConf = "";
private static String taggingCriteria = "{\"criteria\":[{\"constraint\":[{\"verb\":\"starts_with_caseinsensitive\",\"field\":\"title\",\"value\":\"supplementary material\"}]}]}";
static {
try {
taggingConf = IOUtils
@ -119,7 +121,10 @@ public class BulkTagJobTest {
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(),
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -156,7 +161,10 @@ public class BulkTagJobTest {
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -242,15 +250,15 @@ public class BulkTagJobTest {
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/pathMap")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
final String pathMap = workingDir.toString() + "/data/bulktagging/protoMap";
final String sourcePath = getClass()
.getResource(
"/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/")
.getPath();
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
@ -262,7 +270,9 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap",
"-nameNode", "local"
"-nameNode", "local",
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -342,8 +352,11 @@ public class BulkTagJobTest {
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -424,7 +437,8 @@ public class BulkTagJobTest {
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-nameNode", "local"
"-nameNode", "local",
"-taggingCriteria", taggingCriteria
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -481,7 +495,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -602,7 +619,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -730,7 +750,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -830,7 +853,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -873,7 +899,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -927,7 +956,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -976,7 +1008,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1194,7 +1229,11 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1312,7 +1351,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1432,7 +1474,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1472,7 +1517,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1517,7 +1565,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1554,7 +1605,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1629,7 +1683,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1667,7 +1724,10 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
// "-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", pathMap,
"-taggingConf", taggingConf
"-taggingConf", taggingConf,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1708,7 +1768,10 @@ public class BulkTagJobTest {
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")),
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
"-pathMap", pathMap,
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -1811,8 +1874,9 @@ public class BulkTagJobTest {
"-outputPath", workingDir.toString() + "/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-baseURL", "none",
"-nameNode", "local"
"-taggingCriteria", taggingCriteria,
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-nameNode", "local"
});
}