Merge pull request 'bulkTaggingPathMapExtention' (#381) from bulkTaggingPathMapExtention into beta

Reviewed-on: D-Net/dnet-hadoop#381
This commit is contained in:
Claudio Atzori 2024-03-25 16:02:01 +01:00
commit c3c9bdb59c
16 changed files with 1812 additions and 20 deletions

View File

@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> { return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
Project project = new Project(); Project project = new Project();
project.setId(bipProjectScores.getProjectId()); //project.setId(bipProjectScores.getProjectId());
project.setMeasures(bipProjectScores.toMeasures()); project.setMeasures(bipProjectScores.toMeasures());
return project; return project;
}, Encoders.bean(Project.class)) }, Encoders.bean(Project.class))

View File

@ -0,0 +1,39 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;
import java.util.List;
/**
* @author miriam.baglioni
* @Date 22/01/24
*/
public class Action implements Serializable {
private String clazz;
private String method;
private List<Parameters> params;
public String getClazz() {
return clazz;
}
public void setClazz(String clazz) {
this.clazz = clazz;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public List<Parameters> getParams() {
return params;
}
public void setParams(List<Parameters> params) {
this.params = params;
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 19/01/24
*/
public class ExecSubstringAction implements Serializable {
private String value;
private String from;
private String to;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String execSubstring() {
return this.value.substring(Integer.valueOf(this.from), Integer.valueOf(this.to));
}
}

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 22/01/24
*/
public class MapModel implements Serializable {
private String path;
private Action action;
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public Action getAction() {
return action;
}
public void setAction(Action action) {
this.action = action;
}
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 22/01/24
*/
public class Parameters implements Serializable {
private String paramName;
private String paramValue;
public String getParamName() {
return paramName;
}
public void setParamName(String paramName) {
this.paramName = paramName;
}
public String getParamValue() {
return paramValue;
}
public void setParamValue(String paramValue) {
this.paramValue = paramValue;
}
}

View File

@ -4,7 +4,9 @@ package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
public class ProtoMap extends HashMap<String, String> implements Serializable { import eu.dnetlib.dhp.bulktag.actions.MapModel;
public class ProtoMap extends HashMap<String, MapModel> implements Serializable {
public ProtoMap() { public ProtoMap() {
super(); super();

View File

@ -5,6 +5,8 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*; import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -15,7 +17,10 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import eu.dnetlib.dhp.bulktag.actions.MapModel;
import eu.dnetlib.dhp.bulktag.actions.Parameters;
import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag; import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -35,27 +40,59 @@ public class ResultTagger implements Serializable {
return (tmp != clist.size()); return (tmp != clist.size());
} }
private Map<String, List<String>> getParamMap(final Result result, Map<String, String> params) { private Map<String, List<String>> getParamMap(final Result result, Map<String, MapModel> params)
throws NoSuchMethodException, InvocationTargetException {
Map<String, List<String>> param = new HashMap<>(); Map<String, List<String>> param = new HashMap<>();
String json = new Gson().toJson(result, Result.class); String json = new Gson().toJson(result, Result.class);
DocumentContext jsonContext = JsonPath.parse(json); DocumentContext jsonContext = JsonPath.parse(json);
if (params == null) { if (params == null) {
params = new HashMap<>(); params = new HashMap<>();
} }
for (String key : params.keySet()) { for (String key : params.keySet()) {
MapModel mapModel = params.get(key);
try { try {
param.put(key, jsonContext.read(params.get(key))); String path = mapModel.getPath();
} catch (com.jayway.jsonpath.PathNotFoundException e) { Object obj = jsonContext.read(path);
List<String> pathValue;
if (obj instanceof java.lang.String)
pathValue = Arrays.asList((String) obj);
else
pathValue = (List<String>) obj;
if (Optional.ofNullable(mapModel.getAction()).isPresent()) {
Class<?> c = Class.forName(mapModel.getAction().getClazz());
Object class_instance = c.newInstance();
Method setField = c.getMethod("setValue", String.class);
setField.invoke(class_instance, pathValue.get(0));
for (Parameters p : mapModel.getAction().getParams()) {
setField = c.getMethod("set" + p.getParamName(), String.class);
setField.invoke(class_instance, p.getParamValue());
}
param
.put(
key, Arrays
.asList((String) c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance)));
}
else {
param.put(key, pathValue);
}
} catch (PathNotFoundException | ClassNotFoundException | InstantiationException
| IllegalAccessException e) {
param.put(key, new ArrayList<>()); param.put(key, new ArrayList<>());
} }
} }
return param; return param;
} }
public <R extends Result> R enrichContextCriteria( public <R extends Result> R enrichContextCriteria(
final R result, final CommunityConfiguration conf, final Map<String, String> criteria) { final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria)
throws InvocationTargetException, NoSuchMethodException {
final Map<String, List<String>> param = getParamMap(result, criteria);
// Verify if the entity is deletedbyinference. In case verify if to clean the context list // Verify if the entity is deletedbyinference. In case verify if to clean the context list
// from all the zenodo communities // from all the zenodo communities
@ -64,6 +101,8 @@ public class ResultTagger implements Serializable {
return result; return result;
} }
final Map<String, List<String>> param = getParamMap(result, criteria);
// Execute the EOSCTag for the services // Execute the EOSCTag for the services
switch (result.getResulttype().getClassid()) { switch (result.getResulttype().getClassid()) {
case PUBLICATION_RESULTTYPE_CLASSID: case PUBLICATION_RESULTTYPE_CLASSID:

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/11/22
*/
@VerbClass("greater_than")
public class GreatThanVerb implements Selection, Serializable {
private String param;
public GreatThanVerb() {
}
public GreatThanVerb(final String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
return value.compareTo(param) > 0;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;
/**
* @author miriam.baglioni
* @Date 11/11/22
*/
@VerbClass("lesser_than")
public class LessThanVerb implements Selection, Serializable {
private String param;
public LessThanVerb() {
}
public LessThanVerb(final String param) {
this.param = param;
}
@Override
public boolean apply(String value) {
return value.compareTo(param) < 0;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}

View File

@ -13,6 +13,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
@ -31,18 +32,26 @@ public class BulkTagJobTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\"," public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," +
+ " \"title\" : \"$['title'][*]['value']\"," " \"title\":{\"path\":\"$['title'][*]['value']\"}, " +
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\"," " \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " +
+ " \"contributor\" : \"$['contributor'][*]['value']\"," " \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"
+ " \"description\" : \"$['description'][*]['value']\", " +
+ " \"subject\" :\"$['subject'][*]['value']\" , " + "\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"}," +
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," + " \"description\" : {\"path\":\"$['description'][*]['value']\"}," +
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," + " \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " +
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " + " \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , " +
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"," + "\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," +
"\"publisher\":\"$['publisher'].value\"," + "\"journal\":{\"path\":\"$['journal'].name\"}," +
"\"publicationyear\":\"$['dateofacceptance'].value\"} "; "\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," +
"\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," +
"\"publisher\":{\"path\":\"$['publisher'].value\"}," +
"\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " +
" \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," +
"\"method\":\"execSubstring\"," +
"\"params\":[" +
"{\"paramName\":\"From\", \"paramValue\":0}, " +
"{\"paramName\":\"To\",\"paramValue\":4}]}}}";
private static SparkSession spark; private static SparkSession spark;
@ -1600,4 +1609,94 @@ public class BulkTagJobTest {
Assertions.assertEquals(0, spark.sql(query).count()); Assertions.assertEquals(0, spark.sql(query).count());
} }
@Test
void pubdateTest() throws Exception {
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath",
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(),
"-taggingConf",
IOUtils
.toString(
BulkTagJobTest.class
.getResourceAsStream(
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")),
"-outputPath", workingDir.toString() + "/",
"-pathMap", pathMap
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Dataset> tmp = sc
.textFile(workingDir.toString() + "/dataset")
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
Assertions.assertEquals(10, tmp.count());
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> queryResult = spark.sql(query);
queryResult.show(false);
Assertions.assertEquals(5, queryResult.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343"))
.count());
}
} }