[createASfromAffRo] adding the provenance datasource used to get the relation (no datasource can be webcrawl = publisher, rawaff means oalex)

This commit is contained in:
Miriam Baglioni 2024-10-18 16:22:21 +02:00
parent 56b05cde0b
commit 0e5dd14538
2 changed files with 34 additions and 29 deletions

View File

@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable {
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
spark, crossrefInputPath, collectedfromOpenAIRE); spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref");
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
spark, pubmedInputPath, collectedfromOpenAIRE); spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed");
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel( JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
spark, openapcInputPath, collectedfromOpenAIRE); spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc");
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel(
spark, dataciteInputPath, collectedfromOpenAIRE); spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite");
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel(
spark, webcrawlInputPath, collectedfromOpenAIRE); spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff");
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher( JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel(
spark, publisherlInputPath, collectedfromOpenAIRE); spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl");
crossrefRelations crossrefRelations
.union(pubmedRelations) .union(pubmedRelations)
@ -133,7 +133,8 @@ public class PrepareAffiliationRelations implements Serializable {
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark, private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark,
String inputPath, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom,
String dataprovenance) {
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
@ -142,12 +143,13 @@ public class PrepareAffiliationRelations implements Serializable {
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); return getTextTextJavaPairRDDNew(
collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance);
} }
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom, String dataprovenance) {
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
@ -155,13 +157,14 @@ public class PrepareAffiliationRelations implements Serializable {
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); return getTextTextJavaPairRDD(
collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance);
} }
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
String inputPath, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom, String dataprovenance) {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
@ -170,12 +173,12 @@ public class PrepareAffiliationRelations implements Serializable {
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df); return getTextTextJavaPairRDD(collectedfrom, df, dataprovenance);
} }
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelationsNewModel(SparkSession spark, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelationsNewModel(SparkSession spark,
String inputPath, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom, String dataprovenance) {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
.read() .read()
@ -184,10 +187,11 @@ public class PrepareAffiliationRelations implements Serializable {
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDDNew(collectedfrom, df); return getTextTextJavaPairRDDNew(collectedfrom, df, dataprovenance);
} }
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) { private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df,
String dataprovenance) {
// unroll nested arrays // unroll nested arrays
df = df df = df
.withColumn("matching", functions.explode(new Column("Matchings"))) .withColumn("matching", functions.explode(new Column("Matchings")))
@ -219,7 +223,7 @@ public class PrepareAffiliationRelations implements Serializable {
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(
false, false,
BIP_INFERENCE_PROVENANCE, dataprovenance,
true, true,
false, false,
qualifier, qualifier,
@ -235,7 +239,8 @@ public class PrepareAffiliationRelations implements Serializable {
new Text(OBJECT_MAPPER.writeValueAsString(aa)))); new Text(OBJECT_MAPPER.writeValueAsString(aa))));
} }
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df) { private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df,
String dataprovenance) {
// unroll nested arrays // unroll nested arrays
df = df df = df
.withColumn("matching", functions.explode(new Column("Matchings"))) .withColumn("matching", functions.explode(new Column("Matchings")))
@ -276,7 +281,7 @@ public class PrepareAffiliationRelations implements Serializable {
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(
false, false,
BIP_INFERENCE_PROVENANCE, dataprovenance,
true, true,
false, false,
qualifier, qualifier,

View File

@ -98,9 +98,9 @@ public class PrepareAffiliationRelationsTest {
"-crossrefInputPath", crossrefAffiliationRelationPathNew, "-crossrefInputPath", crossrefAffiliationRelationPathNew,
"-pubmedInputPath", crossrefAffiliationRelationPath, "-pubmedInputPath", crossrefAffiliationRelationPath,
"-openapcInputPath", crossrefAffiliationRelationPathNew, "-openapcInputPath", crossrefAffiliationRelationPathNew,
"-dataciteInputPath", crossrefAffiliationRelationPath, "-dataciteInputPath", crossrefAffiliationRelationPathNew,
"-webCrawlInputPath", crossrefAffiliationRelationPath, "-webCrawlInputPath", crossrefAffiliationRelationPathNew,
"-publisherInputPath", publisherAffiliationRelationOldPath, "-publisherInputPath", publisherAffiliationRelationPath,
"-outputPath", outputPath "-outputPath", outputPath
}); });
@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest {
.map(aa -> ((Relation) aa.getPayload())); .map(aa -> ((Relation) aa.getPayload()));
// count the number of relations // count the number of relations
assertEquals(150, tmp.count());// 18 + 24 *3 + 30 * 2 = assertEquals(162, tmp.count());// 18 + 24 + 30 * 4 =
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result"); dataset.createOrReplaceTempView("result");
@ -123,7 +123,7 @@ public class PrepareAffiliationRelationsTest {
// verify that we have equal number of bi-directional relations // verify that we have equal number of bi-directional relations
Assertions Assertions
.assertEquals( .assertEquals(
75, execVerification 81, execVerification
.filter( .filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList() .collectAsList()
@ -131,7 +131,7 @@ public class PrepareAffiliationRelationsTest {
Assertions Assertions
.assertEquals( .assertEquals(
75, execVerification 81, execVerification
.filter( .filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList() .collectAsList()
@ -158,7 +158,7 @@ public class PrepareAffiliationRelationsTest {
Assertions Assertions
.assertEquals( .assertEquals(
2, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count()); 4, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count());
Assertions Assertions
.assertEquals( .assertEquals(
@ -173,7 +173,7 @@ public class PrepareAffiliationRelationsTest {
Assertions Assertions
.assertEquals( .assertEquals(
3, execVerification 1, execVerification
.filter( .filter(
"source = '" + ID_PREFIX "source = '" + ID_PREFIX
+ IdentifierFactory + IdentifierFactory