[AffiliationFromPublisher]Extend the action set to include also the affiliation extracted from publishers. Rename to openaire datasource the collected from of the relations generated by affro on the raw aff string got from OA. The new relations taken from publishers have the same collected from (OpenAIRE)

This commit is contained in:
Miriam Baglioni 2024-08-06 15:34:37 +02:00
parent 8c185a7b1a
commit 675de07138
1 changed files with 58 additions and 30 deletions

View File

@ -44,6 +44,8 @@ public class PrepareAffiliationRelations implements Serializable {
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference"; public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE"; public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE";
public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation"; public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation";
public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556";
public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE";
public static <I extends Result> void main(String[] args) throws Exception { public static <I extends Result> void main(String[] args) throws Exception {
@ -74,6 +76,9 @@ public class PrepareAffiliationRelations implements Serializable {
final String webcrawlInputPath = parser.get("webCrawlInputPath"); final String webcrawlInputPath = parser.get("webCrawlInputPath");
log.info("webcrawlInputPath: {}", webcrawlInputPath); log.info("webcrawlInputPath: {}", webcrawlInputPath);
final String publisherlInputPath = parser.get("publisherlInputPath");
log.info("publisherlInputPath: {}", publisherlInputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
@ -106,21 +111,40 @@ public class PrepareAffiliationRelations implements Serializable {
spark, dataciteInputPath, collectedFromDatacite); spark, dataciteInputPath, collectedFromDatacite);
List<KeyValue> collectedFromWebCrawl = OafMapperUtils List<KeyValue> collectedFromWebCrawl = OafMapperUtils
.listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME); .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations( JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
spark, webcrawlInputPath, collectedFromWebCrawl); spark, webcrawlInputPath, collectedFromWebCrawl);
List<KeyValue> collectedfromPublisher = OafMapperUtils
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
spark, publisherlInputPath, collectedfromPublisher);
crossrefRelations crossrefRelations
.union(pubmedRelations) .union(pubmedRelations)
.union(openAPCRelations) .union(openAPCRelations)
.union(dataciteRelations) .union(dataciteRelations)
.union(webCrawlRelations) .union(webCrawlRelations)
.union(publisherRelations)
.saveAsHadoopFile( .saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
}); });
} }
private static JavaPairRDD<Text,Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
List<KeyValue> collectedfrom){
Dataset<Row> df = spark
.read()
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<RORid`:STRING,`Confidence`:DOUBLE>>")
.json(inputPath)
.where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
}
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
String inputPath, String inputPath,
List<KeyValue> collectedfrom) { List<KeyValue> collectedfrom) {
@ -132,6 +156,10 @@ public class PrepareAffiliationRelations implements Serializable {
.json(inputPath) .json(inputPath)
.where("DOI is not null"); .where("DOI is not null");
return getTextTextJavaPairRDD(collectedfrom, df);
}
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) {
// unroll nested arrays // unroll nested arrays
df = df df = df
.withColumn("matching", functions.explode(new Column("Matchings"))) .withColumn("matching", functions.explode(new Column("Matchings")))
@ -142,41 +170,41 @@ public class PrepareAffiliationRelations implements Serializable {
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
return df return df
.toJavaRDD() .toJavaRDD()
.flatMap((FlatMapFunction<Row, Relation>) row -> { .flatMap((FlatMapFunction<Row, Relation>) row -> {
// DOI to OpenAIRE id // DOI to OpenAIRE id
final String paperId = ID_PREFIX final String paperId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
// ROR id to OpenAIRE id // ROR id to OpenAIRE id
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
Qualifier qualifier = OafMapperUtils Qualifier qualifier = OafMapperUtils
.qualifier( .qualifier(
BIP_AFFILIATIONS_CLASSID, BIP_AFFILIATIONS_CLASSID,
BIP_AFFILIATIONS_CLASSNAME, BIP_AFFILIATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS); ModelConstants.DNET_PROVENANCE_ACTIONS);
// format data info; setting `confidence` into relation's `trust` // format data info; setting `confidence` into relation's `trust`
DataInfo dataInfo = OafMapperUtils DataInfo dataInfo = OafMapperUtils
.dataInfo( .dataInfo(
false, false,
BIP_INFERENCE_PROVENANCE, BIP_INFERENCE_PROVENANCE,
true, true,
false, false,
qualifier, qualifier,
Double.toString(row.getAs("confidence"))); Double.toString(row.getAs("confidence")));
// return bi-directional relations // return bi-directional relations
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
}) })
.map(p -> new AtomicAction(Relation.class, p)) .map(p -> new AtomicAction(Relation.class, p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))); new Text(OBJECT_MAPPER.writeValueAsString(aa))));
} }
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom, private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,