[stats wf] indicators across stats dbs & updates in the org ids #248
|
@ -95,7 +95,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
||||||
|
|
||||||
return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
|
return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
|
||||||
Project project = new Project();
|
Project project = new Project();
|
||||||
project.setId(bipProjectScores.getProjectId());
|
//project.setId(bipProjectScores.getProjectId());
|
||||||
project.setMeasures(bipProjectScores.toMeasures());
|
project.setMeasures(bipProjectScores.toMeasures());
|
||||||
return project;
|
return project;
|
||||||
}, Encoders.bean(Project.class))
|
}, Encoders.bean(Project.class))
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package eu.dnetlib.dhp.bulktag.actions;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 22/01/24
|
||||||
|
*/
|
||||||
|
public class Action implements Serializable {
|
||||||
|
private String clazz;
|
||||||
|
private String method;
|
||||||
|
private List<Parameters> params;
|
||||||
|
|
||||||
|
public String getClazz() {
|
||||||
|
return clazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClazz(String clazz) {
|
||||||
|
this.clazz = clazz;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMethod() {
|
||||||
|
return method;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMethod(String method) {
|
||||||
|
this.method = method;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Parameters> getParams() {
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParams(List<Parameters> params) {
|
||||||
|
this.params = params;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,54 @@
|
||||||
|
package eu.dnetlib.dhp.bulktag.actions;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 19/01/24
|
||||||
|
*/
|
||||||
|
public class ExecSubstringAction implements Serializable {
|
||||||
|
|
||||||
|
private String value;
|
||||||
|
private String from;
|
||||||
|
private String to;
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setValue(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getFrom() {
|
||||||
|
return from;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFrom(String from) {
|
||||||
|
this.from = from;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTo() {
|
||||||
|
return to;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTo(String to) {
|
||||||
|
this.to = to;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String execSubstring(){
|
||||||
|
int to = Integer.valueOf(this.to);
|
||||||
|
int from = Integer.valueOf(this.from);
|
||||||
|
|
||||||
|
if(to < from || from > this.value.length())
|
||||||
|
return "";
|
||||||
|
|
||||||
|
if(from < 0)
|
||||||
|
from = 0;
|
||||||
|
if (to > this.value.length())
|
||||||
|
to = this.value.length();
|
||||||
|
|
||||||
|
return this.value.substring(from, to);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package eu.dnetlib.dhp.bulktag.actions;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 22/01/24
|
||||||
|
*/
|
||||||
|
public class MapModel implements Serializable {
|
||||||
|
|
||||||
|
private String path;
|
||||||
|
private Action action;
|
||||||
|
|
||||||
|
|
||||||
|
public String getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPath(String path) {
|
||||||
|
this.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Action getAction() {
|
||||||
|
return action;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAction(Action action) {
|
||||||
|
this.action = action;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
package eu.dnetlib.dhp.bulktag.actions;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 22/01/24
|
||||||
|
*/
|
||||||
|
public class Parameters implements Serializable {
|
||||||
|
private String paramName;
|
||||||
|
private String paramValue;
|
||||||
|
|
||||||
|
public String getParamName() {
|
||||||
|
return paramName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParamName(String paramName) {
|
||||||
|
this.paramName = paramName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getParamValue() {
|
||||||
|
return paramValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParamValue(String paramValue) {
|
||||||
|
this.paramValue = paramValue;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,10 +1,12 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.bulktag.community;
|
package eu.dnetlib.dhp.bulktag.community;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.bulktag.actions.MapModel;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
|
||||||
public class ProtoMap extends HashMap<String, String> implements Serializable {
|
public class ProtoMap extends HashMap<String, MapModel> implements Serializable {
|
||||||
|
|
||||||
public ProtoMap() {
|
public ProtoMap() {
|
||||||
super();
|
super();
|
||||||
|
|
|
@ -5,9 +5,14 @@ import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
|
||||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import com.jayway.jsonpath.PathNotFoundException;
|
||||||
|
import eu.dnetlib.dhp.bulktag.actions.MapModel;
|
||||||
|
import eu.dnetlib.dhp.bulktag.actions.Parameters;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -35,27 +40,55 @@ public class ResultTagger implements Serializable {
|
||||||
return (tmp != clist.size());
|
return (tmp != clist.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, List<String>> getParamMap(final Result result, Map<String, String> params) {
|
private Map<String, List<String>> getParamMap(final Result result, Map<String, MapModel> params) throws NoSuchMethodException, InvocationTargetException {
|
||||||
Map<String, List<String>> param = new HashMap<>();
|
Map<String, List<String>> param = new HashMap<>();
|
||||||
String json = new Gson().toJson(result, Result.class);
|
String json = new Gson().toJson(result, Result.class);
|
||||||
DocumentContext jsonContext = JsonPath.parse(json);
|
DocumentContext jsonContext = JsonPath.parse(json);
|
||||||
|
|
||||||
if (params == null) {
|
if (params == null) {
|
||||||
params = new HashMap<>();
|
params = new HashMap<>();
|
||||||
}
|
}
|
||||||
for (String key : params.keySet()) {
|
for (String key : params.keySet()) {
|
||||||
|
MapModel mapModel = params.get(key);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
param.put(key, jsonContext.read(params.get(key)));
|
String path = mapModel.getPath();
|
||||||
} catch (com.jayway.jsonpath.PathNotFoundException e) {
|
Object obj = jsonContext.read(path);
|
||||||
|
List<String> pathValue ;
|
||||||
|
if(obj instanceof java.lang.String)
|
||||||
|
pathValue = Arrays.asList((String)obj);
|
||||||
|
else
|
||||||
|
pathValue = (List<String>)obj;
|
||||||
|
if(Optional.ofNullable(mapModel.getAction()).isPresent()){
|
||||||
|
Class<?> c = Class.forName(mapModel.getAction().getClazz());
|
||||||
|
Object class_instance = c.newInstance();
|
||||||
|
Method setField = c.getMethod("setValue", String.class);
|
||||||
|
setField.invoke(class_instance, pathValue.get(0));
|
||||||
|
for(Parameters p : mapModel.getAction().getParams()){
|
||||||
|
setField = c.getMethod("set" + p.getParamName(), String.class);
|
||||||
|
setField.invoke(class_instance, p.getParamValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
param.put(key,Arrays.asList((String)c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
else{
|
||||||
|
param.put(key, pathValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (PathNotFoundException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
|
||||||
param.put(key, new ArrayList<>());
|
param.put(key, new ArrayList<>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return param;
|
return param;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public <R extends Result> R enrichContextCriteria(
|
public <R extends Result> R enrichContextCriteria(
|
||||||
final R result, final CommunityConfiguration conf, final Map<String, String> criteria) {
|
final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria) throws InvocationTargetException, NoSuchMethodException {
|
||||||
|
|
||||||
|
|
||||||
final Map<String, List<String>> param = getParamMap(result, criteria);
|
|
||||||
|
|
||||||
// Verify if the entity is deletedbyinference. In case verify if to clean the context list
|
// Verify if the entity is deletedbyinference. In case verify if to clean the context list
|
||||||
// from all the zenodo communities
|
// from all the zenodo communities
|
||||||
|
@ -64,6 +97,8 @@ public class ResultTagger implements Serializable {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Map<String, List<String>> param = getParamMap(result, criteria);
|
||||||
|
|
||||||
// Execute the EOSCTag for the services
|
// Execute the EOSCTag for the services
|
||||||
switch (result.getResulttype().getClassid()) {
|
switch (result.getResulttype().getClassid()) {
|
||||||
case PUBLICATION_RESULTTYPE_CLASSID:
|
case PUBLICATION_RESULTTYPE_CLASSID:
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.bulktag.criteria;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 11/11/22
|
||||||
|
*/
|
||||||
|
@VerbClass("greater_than")
|
||||||
|
public class GreatThanVerb implements Selection, Serializable {
|
||||||
|
|
||||||
|
private String param;
|
||||||
|
|
||||||
|
public GreatThanVerb() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public GreatThanVerb(final String param) {
|
||||||
|
this.param = param;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean apply(String value) {
|
||||||
|
return value.compareTo(param) > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getParam() {
|
||||||
|
return param;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParam(String param) {
|
||||||
|
this.param = param;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.bulktag.criteria;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author miriam.baglioni
|
||||||
|
* @Date 11/11/22
|
||||||
|
*/
|
||||||
|
@VerbClass("lesser_than")
|
||||||
|
public class LessThanVerb implements Selection, Serializable {
|
||||||
|
|
||||||
|
private String param;
|
||||||
|
|
||||||
|
public LessThanVerb() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public LessThanVerb(final String param) {
|
||||||
|
this.param = param;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean apply(String value) {
|
||||||
|
return value.compareTo(param) < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getParam() {
|
||||||
|
return param;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParam(String param) {
|
||||||
|
this.param = param;
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
@ -31,18 +32,26 @@ public class BulkTagJobTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
|
||||||
public static final String pathMap = "{ \"author\" : \"$['author'][*]['fullname']\","
|
public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," +
|
||||||
+ " \"title\" : \"$['title'][*]['value']\","
|
" \"title\":{\"path\":\"$['title'][*]['value']\"}, "+
|
||||||
+ " \"orcid\" : \"$['author'][*]['pid'][*][?(@['key']=='ORCID')]['value']\","
|
" \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " +
|
||||||
+ " \"contributor\" : \"$['contributor'][*]['value']\","
|
" \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"+
|
||||||
+ " \"description\" : \"$['description'][*]['value']\", "
|
"\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"},"+
|
||||||
+ " \"subject\" :\"$['subject'][*]['value']\" , " +
|
" \"description\" : {\"path\":\"$['description'][*]['value']\"},"+
|
||||||
"\"fos\" : \"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"," +
|
" \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " +
|
||||||
"\"sdg\" : \"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"," +
|
" \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , "+
|
||||||
"\"hostedby\" : \"$['instance'][*]['hostedby']['key']\" , " +
|
"\"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"}," +
|
||||||
"\"collectedfrom\" : \"$['instance'][*]['collectedfrom']['key']\"," +
|
"\"journal\":{\"path\":\"$['journal'].name\"}," +
|
||||||
"\"publisher\":\"$['publisher'].value\"," +
|
"\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"}," +
|
||||||
"\"publicationyear\":\"$['dateofacceptance'].value\"} ";
|
"\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"}," +
|
||||||
|
"\"publisher\":{\"path\":\"$['publisher'].value\"}," +
|
||||||
|
"\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\", " +
|
||||||
|
" \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\"," +
|
||||||
|
"\"method\":\"execSubstring\","+
|
||||||
|
"\"params\":[" +
|
||||||
|
"{\"paramName\":\"From\", \"paramValue\":0}, " +
|
||||||
|
"{\"paramName\":\"To\",\"paramValue\":4}]}}}";
|
||||||
|
|
||||||
|
|
||||||
private static SparkSession spark;
|
private static SparkSession spark;
|
||||||
|
|
||||||
|
@ -1600,4 +1609,60 @@ public class BulkTagJobTest {
|
||||||
Assertions.assertEquals(0, spark.sql(query).count());
|
Assertions.assertEquals(0, spark.sql(query).count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void pubdateTest() throws Exception {
|
||||||
|
|
||||||
|
|
||||||
|
final String pathMap = BulkTagJobTest.pathMap;
|
||||||
|
SparkBulkTagJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath",
|
||||||
|
getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(),
|
||||||
|
"-taggingConf",
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
BulkTagJobTest.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")),
|
||||||
|
"-outputPath", workingDir.toString() + "/",
|
||||||
|
"-pathMap", pathMap
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Dataset> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/dataset")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Dataset.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(10, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Dataset> verificationDataset = spark
|
||||||
|
.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("dataset");
|
||||||
|
|
||||||
|
|
||||||
|
String query = "select id, MyT.id community, MyD.provenanceaction.classid "
|
||||||
|
+ "from dataset "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> queryResult = spark.sql(query);
|
||||||
|
queryResult.show(false);
|
||||||
|
Assertions.assertEquals(5, queryResult.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")).count());
|
||||||
|
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")).count());
|
||||||
|
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")).count());
|
||||||
|
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")).count());
|
||||||
|
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")).count());
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
2
pom.xml
2
pom.xml
|
@ -888,7 +888,7 @@
|
||||||
<mockito-core.version>3.3.3</mockito-core.version>
|
<mockito-core.version>3.3.3</mockito-core.version>
|
||||||
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
<mongodb.driver.version>3.4.2</mongodb.driver.version>
|
||||||
<vtd.version>[2.12,3.0)</vtd.version>
|
<vtd.version>[2.12,3.0)</vtd.version>
|
||||||
<dhp-schemas.version>[4.17.2]</dhp-schemas.version>
|
<dhp-schemas.version>[4.17.3]</dhp-schemas.version>
|
||||||
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
<dnet-actionmanager-api.version>[4.0.3]</dnet-actionmanager-api.version>
|
||||||
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
<dnet-actionmanager-common.version>[6.0.5]</dnet-actionmanager-common.version>
|
||||||
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
<dnet-openaire-broker-common.version>[3.1.6]</dnet-openaire-broker-common.version>
|
||||||
|
|
Loading…
Reference in New Issue