1
0
Fork 0
This commit is contained in:
Miriam Baglioni 2024-02-14 14:57:08 +01:00
parent 83bb97be83
commit 8dae10b442
13 changed files with 388 additions and 308 deletions

View File

@ -78,5 +78,4 @@ public class QueryCommunityAPI {
return body;
}
}

View File

@ -6,10 +6,6 @@ import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -24,6 +20,10 @@ import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.Provider;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
/**
* @author miriam.baglioni
@ -182,7 +182,8 @@ public class Utils implements Serializable {
validCommunities.forEach(c -> {
try {
new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
new ObjectMapper()
.readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
.forEach(d -> {
if (!map.keySet().contains(d.getOpenaireId()))
map.put(d.getOpenaireId(), new HashSet<>());
@ -194,8 +195,11 @@ public class Utils implements Serializable {
}
});
List<EntityCommunities> temp = map.keySet().stream()
.map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))).collect(Collectors.toList());
List<EntityCommunities> temp = map
.keySet()
.stream()
.map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map)))
.collect(Collectors.toList());
return temp;
@ -207,5 +211,4 @@ public class Utils implements Serializable {
return temp;
}
}

View File

@ -1,11 +1,12 @@
package eu.dnetlib.dhp.api.model;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
package eu.dnetlib.dhp.api.model;
import java.io.Serializable;
import java.util.List;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
/**
* @author miriam.baglioni
* @Date 13/02/24

View File

@ -7,11 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import java.util.stream.Collectors;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.EntityCommunities;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Project;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
@ -27,10 +22,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.EntityCommunities;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
@ -53,6 +53,7 @@ public class SparkBulkTagJob {
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json"));
log.info(args.toString());
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
@ -71,7 +72,8 @@ public class SparkBulkTagJob {
final String baseURL = parser.get("baseURL");
log.info("baseURL: {}", baseURL);
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
log.info("pathMap: {}", parser.get("pathMap"));
ProtoMap protoMappingParams = new Gson().fromJson(parser.get("pathMap") + "}}", ProtoMap.class);
log.info("pathMap: {}", new Gson().toJson(protoMappingParams));
SparkConf conf = new SparkConf();
@ -100,16 +102,25 @@ public class SparkBulkTagJob {
});
}
private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityProjects) {
private static void execProjectTag(SparkSession spark, String inputPath, String outputPath,
CommunityEntityMap communityProjects) {
Dataset<Project> projects = readPath(spark, inputPath + "project", Project.class);
Dataset<EntityCommunities> pc = spark.createDataset(communityProjects.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class));
Dataset<EntityCommunities> pc = spark
.createDataset(
communityProjects
.keySet()
.stream()
.map(k -> EntityCommunities.newInstance(k, communityProjects.get(k)))
.collect(Collectors.toList()),
Encoders.bean(EntityCommunities.class));
projects.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left")
projects
.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left")
.map((MapFunction<Tuple2<Project, EntityCommunities>, Project>) t2 -> {
Project ds = t2._1();
if (t2._2() != null) {
List<String> context =
Optional.ofNullable(ds.getContext())
List<String> context = Optional
.ofNullable(ds.getContext())
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
.orElse(new ArrayList<>());
@ -119,8 +130,20 @@ public class SparkBulkTagJob {
if (!context.contains(c)) {
Context con = new Context();
con.setId(c);
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
con
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"1")));
ds.getContext().add(con);
}
});
@ -139,19 +162,21 @@ public class SparkBulkTagJob {
.json(inputPath + "project");
}
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, List<EntityCommunities> datasourceCommunities) {
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath,
List<EntityCommunities> datasourceCommunities) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
Dataset<EntityCommunities> dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
Dataset<EntityCommunities> dc = spark
.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
datasource
.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
Datasource ds = t2._1();
if (t2._2() != null) {
List<String> context =
Optional.ofNullable(ds.getContext())
List<String> context = Optional
.ofNullable(ds.getContext())
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
.orElse(new ArrayList<>());
@ -162,8 +187,20 @@ public class SparkBulkTagJob {
if (!context.contains(c)) {
Context con = new Context();
con.setId(c);
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
con
.setDataInfo(
Arrays
.asList(
OafMapperUtils
.dataInfo(
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils
.qualifier(
TaggingConstants.CLASS_ID_DATASOURCE,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"1")));
ds.getContext().add(con);
}
});
@ -182,7 +219,6 @@ public class SparkBulkTagJob {
.json(inputPath + "datasource");
}
private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath,
CommunityConfiguration cc) {
@ -273,6 +309,4 @@ public class SparkBulkTagJob {
};
}
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;
@ -11,7 +12,6 @@ public class MapModel implements Serializable {
private String path;
private Action action;
public String getPath() {
return path;
}

View File

@ -1,3 +1,4 @@
package eu.dnetlib.dhp.bulktag.actions;
import java.io.Serializable;

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.bulktag.community;
import eu.dnetlib.dhp.bulktag.actions.MapModel;
import java.io.Serializable;
import java.util.HashMap;
import eu.dnetlib.dhp.bulktag.actions.MapModel;
public class ProtoMap extends HashMap<String, MapModel> implements Serializable {
public ProtoMap() {

View File

@ -10,9 +10,6 @@ import java.lang.reflect.Method;
import java.util.*;
import java.util.stream.Collectors;
import com.jayway.jsonpath.PathNotFoundException;
import eu.dnetlib.dhp.bulktag.actions.MapModel;
import eu.dnetlib.dhp.bulktag.actions.Parameters;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,7 +17,10 @@ import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import eu.dnetlib.dhp.bulktag.actions.MapModel;
import eu.dnetlib.dhp.bulktag.actions.Parameters;
import eu.dnetlib.dhp.bulktag.eosc.EoscIFTag;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
@ -40,7 +40,8 @@ public class ResultTagger implements Serializable {
return (tmp != clist.size());
}
private Map<String, List<String>> getParamMap(final Result result, Map<String, MapModel> params) throws NoSuchMethodException, InvocationTargetException {
private Map<String, List<String>> getParamMap(final Result result, Map<String, MapModel> params)
throws NoSuchMethodException, InvocationTargetException {
Map<String, List<String>> param = new HashMap<>();
String json = new Gson().toJson(result, Result.class);
DocumentContext jsonContext = JsonPath.parse(json);
@ -69,7 +70,10 @@ public class ResultTagger implements Serializable {
setField.invoke(class_instance, p.getParamValue());
}
param.put(key,Arrays.asList((String)c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance)));
param
.put(
key, Arrays
.asList((String) c.getMethod(mapModel.getAction().getMethod()).invoke(class_instance)));
}
@ -77,7 +81,8 @@ public class ResultTagger implements Serializable {
param.put(key, pathValue);
}
} catch (PathNotFoundException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
} catch (PathNotFoundException | ClassNotFoundException | InstantiationException
| IllegalAccessException e) {
param.put(key, new ArrayList<>());
}
}
@ -86,9 +91,8 @@ public class ResultTagger implements Serializable {
}
public <R extends Result> R enrichContextCriteria(
final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria) throws InvocationTargetException, NoSuchMethodException {
final R result, final CommunityConfiguration conf, final Map<String, MapModel> criteria)
throws InvocationTargetException, NoSuchMethodException {
// Verify if the entity is deletedbyinference. In case verify if to clean the context list
// from all the zenodo communities

View File

@ -7,20 +7,23 @@ datasourceWhitelistForCountryPropagation=10|opendoar____::16e6a3326dd7d868cbc926
#allowedtypes=pubsrepository::institutional
allowedtypes=Institutional
outputPath=/tmp/miriam/graph/11_graph_orcid
pathMap ={"author":"$['author'][*]['fullname']", \
"title":"$['title'][*]['value']",\
"orcid":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']" ,\
"orcid_pending":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']" ,\
"contributor" : "$['contributor'][*]['value']",\
"description" : "$['description'][*]['value']",\
"subject" :"$['subject'][*]['value']" , \
"fos" : "$['subject'][?(@['qualifier']['classid']=='FOS')].value" ,\
"sdg" : "$['subject'][?(@['qualifier']['classid']=='SDG')].value",\
"journal":"$['journal'].name",\
"hostedby":"$['instance'][*]['hostedby']['key']",\
"collectedfrom":"$['instance'][*]['collectedfrom']['key']",\
"publisher":"$['publisher'].value",\
"publicationyear":"$['dateofacceptance'].value"}
pathMap ={"author":{"path":"$['author'][*]['fullname']"}, \
"title":{"path":"$['title'][*]['value']"},\
"orcid":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']"} ,\
"orcid_pending":{"path":"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']"} ,\
"contributor" : {"path":"$['contributor'][*]['value']"},\
"description" : {"path":"$['description'][*]['value']"},\
"subject" :{"path":"$['subject'][*]['value']"}, \
"fos" : {"path":"$['subject'][?(@['qualifier']['classid']=='FOS')].value"} ,\
"sdg" : {"path":"$['subject'][?(@['qualifier']['classid']=='SDG')].value"},\
"journal":{"path":"$['journal'].name"},\
"hostedby":{"path":"$['instance'][*]['hostedby']['key']"},\
"collectedfrom":{"path":"$['instance'][*]['collectedfrom']['key']"},\
"publisher":{"path":"$['publisher'].value"},\
"publicationyear":{"path":"$['dateofacceptance'].value", "action":{"class":"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction",\
"method":"execSubstring",\
"params":[{"param_name":"From","param_value":0},\
{"param_name":"To","param_value":4}]}}}
blacklist=empty
allowedpids=orcid;orcid_pending
baseURL = https://services.openaire.eu/openaire/community/

View File

@ -35,7 +35,8 @@ public class BulkTagJobTest {
public static final String pathMap = "{\"author\":{\"path\":\"$['author'][*]['fullname']\"}," +
" \"title\":{\"path\":\"$['title'][*]['value']\"}, " +
" \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} , " +
" \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"+
" \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,"
+
"\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"}," +
" \"description\" : {\"path\":\"$['description'][*]['value']\"}," +
" \"subject\" :{\"path\":\"$['subject'][*]['value']\"}, " +
@ -52,7 +53,6 @@ public class BulkTagJobTest {
"{\"paramName\":\"From\", \"paramValue\":0}, " +
"{\"paramName\":\"To\",\"paramValue\":4}]}}}";
private static SparkSession spark;
private static Path workingDir;
@ -439,7 +439,6 @@ public class BulkTagJobTest {
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count());
}
@Test
@ -1668,11 +1667,9 @@ public class BulkTagJobTest {
Assertions.assertEquals(0, spark.sql(query).count());
}
@Test
void pubdateTest() throws Exception {
final String pathMap = BulkTagJobTest.pathMap;
SparkBulkTagJob
.main(
@ -1702,7 +1699,6 @@ public class BulkTagJobTest {
verificationDataset.createOrReplaceTempView("dataset");
String query = "select id, MyT.id community, MyD.provenanceaction.classid "
+ "from dataset "
+ "lateral view explode(context) c as MyT "
@ -1713,15 +1709,52 @@ public class BulkTagJobTest {
queryResult.show(false);
Assertions.assertEquals(5, queryResult.count());
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")).count());
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")).count());
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")).count());
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")).count());
Assertions.assertEquals(1, queryResult.filter((FilterFunction<Row>) r -> r.getAs("id").equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")).count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961"))
.count());
Assertions
.assertEquals(
1,
queryResult
.filter(
(FilterFunction<Row>) r -> r
.getAs("id")
.equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343"))
.count());
}
}