forked from D-Net/dnet-hadoop
[Tagging Projects and Datasource] added test to check datasource tagging. Fixed issue
This commit is contained in:
parent
6e1f383e4a
commit
83bb97be83
|
@ -10,6 +10,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -177,6 +178,8 @@ public class Utils implements Serializable {
|
||||||
public static List<EntityCommunities> getDatasourceCommunities(String baseURL)throws IOException{
|
public static List<EntityCommunities> getDatasourceCommunities(String baseURL)throws IOException{
|
||||||
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
|
List<CommunityModel> validCommunities = getValidCommunities(baseURL);
|
||||||
HashMap<String, Set<String>> map = new HashMap<>();
|
HashMap<String, Set<String>> map = new HashMap<>();
|
||||||
|
String entityPrefix = ModelSupport.getIdPrefix(Datasource.class) + "|" ;
|
||||||
|
|
||||||
validCommunities.forEach(c -> {
|
validCommunities.forEach(c -> {
|
||||||
try {
|
try {
|
||||||
new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
|
new ObjectMapper().readValue(QueryCommunityAPI.communityDatasource(c.getId(), baseURL), DatasourceList.class)
|
||||||
|
@ -191,10 +194,18 @@ public class Utils implements Serializable {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
List<EntityCommunities> temp = map.keySet().stream()
|
||||||
|
.map(k -> EntityCommunities.newInstance(entityPrefix + k, getCollect(k, map))).collect(Collectors.toList());
|
||||||
|
|
||||||
return map.keySet().stream().map(k -> EntityCommunities.newInstance(k, map.get(k).stream().collect(Collectors.toList()))).collect(Collectors.toList());
|
return temp;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private static List<String> getCollect(String k, HashMap<String, Set<String>> map) {
|
||||||
|
List<String> temp = map.get(k).stream().collect(Collectors.toList());
|
||||||
|
return temp;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
|
||||||
import eu.dnetlib.dhp.api.model.EntityCommunities;
|
import eu.dnetlib.dhp.api.model.EntityCommunities;
|
||||||
import eu.dnetlib.dhp.api.model.DatasourceCommunitiesList;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||||
|
@ -109,14 +108,20 @@ public class SparkBulkTagJob {
|
||||||
.map((MapFunction<Tuple2<Project, EntityCommunities>, Project>) t2 -> {
|
.map((MapFunction<Tuple2<Project, EntityCommunities>, Project>) t2 -> {
|
||||||
Project ds = t2._1();
|
Project ds = t2._1();
|
||||||
if (t2._2() != null){
|
if (t2._2() != null){
|
||||||
List<String> context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList());
|
List<String> context =
|
||||||
|
Optional.ofNullable(ds.getContext())
|
||||||
|
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
|
||||||
|
.orElse(new ArrayList<>());
|
||||||
|
|
||||||
|
if(!Optional.ofNullable(ds.getContext()).isPresent())
|
||||||
|
ds.setContext(new ArrayList<>());
|
||||||
t2._2().getCommunitiesId().forEach(c -> {
|
t2._2().getCommunitiesId().forEach(c -> {
|
||||||
if(!context.contains(c)){
|
if(!context.contains(c)){
|
||||||
Context con = new Context();
|
Context con = new Context();
|
||||||
con.setId(c);
|
con.setId(c);
|
||||||
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
||||||
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
||||||
ds.getContext().add(con)
|
ds.getContext().add(con);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -139,19 +144,27 @@ public class SparkBulkTagJob {
|
||||||
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
|
Dataset<Datasource> datasource = readPath(spark, inputPath + "datasource", Datasource.class);
|
||||||
|
|
||||||
Dataset<EntityCommunities> dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
|
Dataset<EntityCommunities> dc = spark.createDataset(datasourceCommunities, Encoders.bean(EntityCommunities.class));
|
||||||
|
|
||||||
datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
|
datasource.joinWith(dc, datasource.col("id").equalTo(dc.col("entityId")), "left")
|
||||||
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
|
.map((MapFunction<Tuple2<Datasource, EntityCommunities>, Datasource>) t2 -> {
|
||||||
Datasource ds = t2._1();
|
Datasource ds = t2._1();
|
||||||
if (t2._2() != null){
|
if (t2._2() != null){
|
||||||
List<String> context = ds.getContext().stream().map(c -> c.getId()).collect(Collectors.toList());
|
|
||||||
|
List<String> context =
|
||||||
|
Optional.ofNullable(ds.getContext())
|
||||||
|
.map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList()))
|
||||||
|
.orElse(new ArrayList<>());
|
||||||
|
|
||||||
|
if(!Optional.ofNullable(ds.getContext()).isPresent())
|
||||||
|
ds.setContext(new ArrayList<>());
|
||||||
|
|
||||||
t2._2().getCommunitiesId().forEach(c -> {
|
t2._2().getCommunitiesId().forEach(c -> {
|
||||||
if(!context.contains(c)){
|
if(!context.contains(c)){
|
||||||
Context con = new Context();
|
Context con = new Context();
|
||||||
con.setId(c);
|
con.setId(c);
|
||||||
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false,TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
|
||||||
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
OafMapperUtils.qualifier(TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "1")));
|
||||||
ds.getContext().add(con)
|
ds.getContext().add(con);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,6 @@
|
||||||
"paramName": "bu",
|
"paramName": "bu",
|
||||||
"paramLongName": "baseURL",
|
"paramLongName": "baseURL",
|
||||||
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
|
"paramDescription": "this parameter is to specify the api to be queried (beta or production)",
|
||||||
"paramRequired": false
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -325,7 +325,7 @@ public class BulkTagJobTest {
|
||||||
"-taggingConf", taggingConf,
|
"-taggingConf", taggingConf,
|
||||||
|
|
||||||
"-outputPath", workingDir.toString() + "/",
|
"-outputPath", workingDir.toString() + "/",
|
||||||
|
"-baseURL", "https://services.openaire.eu/openaire/community/",
|
||||||
"-pathMap", pathMap
|
"-pathMap", pathMap
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -349,6 +349,8 @@ public class BulkTagJobTest {
|
||||||
|
|
||||||
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
|
idExplodeCommunity.show(false);
|
||||||
|
|
||||||
Assertions.assertEquals(5, idExplodeCommunity.count());
|
Assertions.assertEquals(5, idExplodeCommunity.count());
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
|
@ -383,6 +385,63 @@ public class BulkTagJobTest {
|
||||||
.count());
|
.count());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void datasourceTag() throws Exception {
|
||||||
|
final String sourcePath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
|
||||||
|
.getPath();
|
||||||
|
SparkBulkTagJob
|
||||||
|
.main(
|
||||||
|
new String[] {
|
||||||
|
|
||||||
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
|
"-sourcePath", sourcePath,
|
||||||
|
"-taggingConf", taggingConf,
|
||||||
|
|
||||||
|
"-outputPath", workingDir.toString() + "/",
|
||||||
|
"-baseURL", "https://services.openaire.eu/openaire/community/",
|
||||||
|
"-pathMap", pathMap
|
||||||
|
});
|
||||||
|
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
JavaRDD<Datasource> tmp = sc
|
||||||
|
.textFile(workingDir.toString() + "/datasource")
|
||||||
|
.map(item -> OBJECT_MAPPER.readValue(item, Datasource.class));
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, tmp.count());
|
||||||
|
org.apache.spark.sql.Dataset<Datasource> verificationDataset = spark
|
||||||
|
.createDataset(tmp.rdd(), Encoders.bean(Datasource.class));
|
||||||
|
|
||||||
|
verificationDataset.createOrReplaceTempView("datasource");
|
||||||
|
|
||||||
|
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
|
||||||
|
+ "from datasource "
|
||||||
|
+ "lateral view explode(context) c as MyT "
|
||||||
|
+ "lateral view explode(MyT.datainfo) d as MyD "
|
||||||
|
+ "where MyD.inferenceprovenance = 'bulktagging'";
|
||||||
|
|
||||||
|
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
|
||||||
|
|
||||||
|
idExplodeCommunity.show(false);
|
||||||
|
|
||||||
|
Assertions.assertEquals(3, idExplodeCommunity.count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3, idExplodeCommunity.filter("provenance = 'community:datasource'").count());
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
3,
|
||||||
|
idExplodeCommunity
|
||||||
|
.filter("name = 'Bulktagging for Community - Datasource'")
|
||||||
|
.count());
|
||||||
|
|
||||||
|
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count());
|
||||||
|
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count());
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void bulktagByZenodoCommunityTest() throws Exception {
|
void bulktagByZenodoCommunityTest() throws Exception {
|
||||||
final String sourcePath = getClass()
|
final String sourcePath = getClass()
|
||||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue