Merge pull request '[BulkTag]added tagging for the organization relevant for the community.' (#461) from tagOrganization into beta

Reviewed-on: #461
This commit is contained in:
Claudio Atzori 2024-07-17 11:11:52 +02:00
commit e94ae771ff
5 changed files with 171 additions and 20 deletions

View File

@ -33,10 +33,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.*; import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2; import scala.Tuple2;
@ -114,27 +111,35 @@ public class SparkBulkTagJob {
extendCommunityConfigurationForEOSC(spark, inputPath, cc); extendCommunityConfigurationForEOSC(spark, inputPath, cc);
execBulkTag( execBulkTag(
spark, inputPath, outputPath, protoMap, cc); spark, inputPath, outputPath, protoMap, cc);
execEntityTag(
spark, inputPath + "organization", outputPath + "organization",
Utils.getCommunityOrganization(baseURL), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION,
TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
execEntityTag(
spark, inputPath + "project", outputPath + "project", Utils.getCommunityProjects(baseURL),
Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL)); execDatasourceTag(spark, inputPath, outputPath, Utils.getDatasourceCommunities(baseURL));
execProjectTag(spark, inputPath, outputPath, Utils.getCommunityProjects(baseURL));
}); });
} }
private static void execProjectTag(SparkSession spark, String inputPath, String outputPath, private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath,
CommunityEntityMap communityProjects) { CommunityEntityMap communityEntity, Class<E> entityClass,
Dataset<Project> projects = readPath(spark, inputPath + "project", Project.class); String classID, String calssName) {
Dataset<E> entity = readPath(spark, inputPath, entityClass);
Dataset<EntityCommunities> pc = spark Dataset<EntityCommunities> pc = spark
.createDataset( .createDataset(
communityProjects communityEntity
.keySet() .keySet()
.stream() .stream()
.map(k -> EntityCommunities.newInstance(k, communityProjects.get(k))) .map(k -> EntityCommunities.newInstance(k, communityEntity.get(k)))
.collect(Collectors.toList()), .collect(Collectors.toList()),
Encoders.bean(EntityCommunities.class)); Encoders.bean(EntityCommunities.class));
projects entity
.joinWith(pc, projects.col("id").equalTo(pc.col("entityId")), "left") .joinWith(pc, entity.col("id").equalTo(pc.col("entityId")), "left")
.map((MapFunction<Tuple2<Project, EntityCommunities>, Project>) t2 -> { .map((MapFunction<Tuple2<E, EntityCommunities>, E>) t2 -> {
Project ds = t2._1(); E ds = t2._1();
if (t2._2() != null) { if (t2._2() != null) {
List<String> context = Optional List<String> context = Optional
.ofNullable(ds.getContext()) .ofNullable(ds.getContext())
@ -156,8 +161,8 @@ public class SparkBulkTagJob {
false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false,
OafMapperUtils OafMapperUtils
.qualifier( .qualifier(
TaggingConstants.CLASS_ID_DATASOURCE, classID,
TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE, calssName,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS),
"1"))); "1")));
@ -166,17 +171,17 @@ public class SparkBulkTagJob {
}); });
} }
return ds; return ds;
}, Encoders.bean(Project.class)) }, Encoders.bean(entityClass))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "project"); .json(outputPath);
readPath(spark, outputPath + "project", Project.class) readPath(spark, outputPath, entityClass)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(inputPath + "project"); .json(inputPath);
} }
private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath, private static void execDatasourceTag(SparkSession spark, String inputPath, String outputPath,

View File

@ -13,6 +13,9 @@ public class TaggingConstants {
public static final String CLASS_ID_CZENODO = "community:zenodocommunity"; public static final String CLASS_ID_CZENODO = "community:zenodocommunity";
public static final String CLASS_ID_ADVANCED_CONSTRAINT = "community:advconstraint"; public static final String CLASS_ID_ADVANCED_CONSTRAINT = "community:advconstraint";
public static final String CLASS_ID_PROJECT = "community:project";
public static final String CLASS_ID_ORGANIZATION = "community:organization";
public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/"; public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject"; public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";
@ -20,5 +23,8 @@ public class TaggingConstants {
public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo"; public static final String CLASS_NAME_BULKTAG_ZENODO = "Bulktagging for Community - Zenodo";
public static final String CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT = "Bulktagging for Community - Advanced Constraints"; public static final String CLASS_NAME_BULKTAG_ADVANCED_CONSTRAINT = "Bulktagging for Community - Advanced Constraints";
public static final String CLASS_NAME_BULKTAG_PROJECT = "Bulktagging for Community - Project";
public static final String CLASS_NAME_BULKTAG_ORGANIZATION = "Bulktagging for Community - Organization";
public static final String TAGGING_TRUST = "0.8"; public static final String TAGGING_TRUST = "0.8";
} }

View File

@ -465,6 +465,138 @@ public class BulkTagJobTest {
} }
@Test
void organizationTag() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Organization> tmp = sc
.textFile(workingDir.toString() + "/organization")
.map(item -> OBJECT_MAPPER.readValue(item, Organization.class));
Assertions.assertEquals(4, tmp.count());
org.apache.spark.sql.Dataset<Organization> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Organization.class));
verificationDataset.createOrReplaceTempView("organization");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from organization "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
idExplodeCommunity.show(false);
Assertions.assertEquals(3, idExplodeCommunity.count());
Assertions
.assertEquals(
3, idExplodeCommunity.filter("provenance = 'community:organization'").count());
Assertions
.assertEquals(
3,
idExplodeCommunity
.filter("name = 'Bulktagging for Community - Organization'")
.count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'netherlands'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'beopen'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'mes'").count());
}
@Test
void projectTag() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/")
.getPath();
LocalFileSystem fs = FileSystem.getLocal(new Configuration());
fs
.copyFromLocalFile(
false, new org.apache.hadoop.fs.Path(getClass()
.getResource("/eu/dnetlib/dhp/bulktag/pathMap/")
.getPath()),
new org.apache.hadoop.fs.Path(workingDir.toString() + "/data/bulktagging/protoMap"));
SparkBulkTagJob
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-sourcePath", sourcePath,
"-taggingConf", taggingConf,
"-outputPath", workingDir.toString() + "/",
"-baseURL", "https://services.openaire.eu/openaire/community/",
"-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap",
"-nameNode", "local"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Project> tmp = sc
.textFile(workingDir.toString() + "/project")
.map(item -> OBJECT_MAPPER.readValue(item, Project.class));
Assertions.assertEquals(4, tmp.count());
org.apache.spark.sql.Dataset<Project> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Project.class));
verificationDataset.createOrReplaceTempView("project");
String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name "
+ "from project "
+ "lateral view explode(context) c as MyT "
+ "lateral view explode(MyT.datainfo) d as MyD "
+ "where MyD.inferenceprovenance = 'bulktagging'";
org.apache.spark.sql.Dataset<Row> idExplodeCommunity = spark.sql(query);
idExplodeCommunity.show(false);
Assertions.assertEquals(4, idExplodeCommunity.count());
Assertions
.assertEquals(
4, idExplodeCommunity.filter("provenance = 'community:project'").count());
Assertions
.assertEquals(
4,
idExplodeCommunity
.filter("name = 'Bulktagging for Community - Project'")
.count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'enermaps'").count());
Assertions.assertEquals(1, idExplodeCommunity.filter("community = 'clarin'").count());
Assertions.assertEquals(2, idExplodeCommunity.filter("community = 'dh-ch'").count());
}
@Test @Test
void bulktagByZenodoCommunityTest() throws Exception { void bulktagByZenodoCommunityTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()