|
|
|
@ -89,64 +89,66 @@ public class PrepareAffiliationRelations implements Serializable {
|
|
|
|
|
isSparkSessionManaged,
|
|
|
|
|
spark -> {
|
|
|
|
|
Constants.removeOutputDir(spark, outputPath);
|
|
|
|
|
createActionSet(spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, publisherInputPath, outputPath);
|
|
|
|
|
createActionSet(
|
|
|
|
|
spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath,
|
|
|
|
|
publisherInputPath, outputPath);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath, String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath, String outputPath) {
|
|
|
|
|
private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath,
|
|
|
|
|
String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath,
|
|
|
|
|
String outputPath) {
|
|
|
|
|
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
|
|
|
|
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
|
|
|
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
|
|
|
|
spark, crossrefInputPath, collectedFromCrossref);
|
|
|
|
|
spark, crossrefInputPath, collectedFromCrossref);
|
|
|
|
|
|
|
|
|
|
List<KeyValue> collectedFromPubmed = OafMapperUtils
|
|
|
|
|
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
|
|
|
|
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
|
|
|
|
spark, pubmedInputPath, collectedFromPubmed);
|
|
|
|
|
spark, pubmedInputPath, collectedFromPubmed);
|
|
|
|
|
|
|
|
|
|
List<KeyValue> collectedFromOpenAPC = OafMapperUtils
|
|
|
|
|
.listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC");
|
|
|
|
|
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
|
|
|
|
spark, openapcInputPath, collectedFromOpenAPC);
|
|
|
|
|
spark, openapcInputPath, collectedFromOpenAPC);
|
|
|
|
|
|
|
|
|
|
List<KeyValue> collectedFromDatacite = OafMapperUtils
|
|
|
|
|
.listKeyValues(ModelConstants.DATACITE_ID, "Datacite");
|
|
|
|
|
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
|
|
|
|
|
spark, dataciteInputPath, collectedFromDatacite);
|
|
|
|
|
spark, dataciteInputPath, collectedFromDatacite);
|
|
|
|
|
|
|
|
|
|
List<KeyValue> collectedFromWebCrawl = OafMapperUtils
|
|
|
|
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
|
|
|
|
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
|
|
|
|
spark, webcrawlInputPath, collectedFromWebCrawl);
|
|
|
|
|
spark, webcrawlInputPath, collectedFromWebCrawl);
|
|
|
|
|
|
|
|
|
|
List<KeyValue> collectedfromPublisher = OafMapperUtils
|
|
|
|
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
|
|
|
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
|
|
|
|
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
|
|
|
|
|
spark, publisherlInputPath, collectedfromPublisher);
|
|
|
|
|
spark, publisherlInputPath, collectedfromPublisher);
|
|
|
|
|
|
|
|
|
|
crossrefRelations
|
|
|
|
|
.union(pubmedRelations)
|
|
|
|
|
.union(openAPCRelations)
|
|
|
|
|
.union(dataciteRelations)
|
|
|
|
|
.union(webCrawlRelations)
|
|
|
|
|
.union(publisherRelations)
|
|
|
|
|
.union(publisherRelations)
|
|
|
|
|
.saveAsHadoopFile(
|
|
|
|
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
|
|
|
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static JavaPairRDD<Text,Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
|
|
|
|
List<KeyValue> collectedfrom){
|
|
|
|
|
|
|
|
|
|
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
|
|
|
|
List<KeyValue> collectedfrom) {
|
|
|
|
|
|
|
|
|
|
Dataset<Row> df = spark
|
|
|
|
|
.read()
|
|
|
|
|
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
|
|
|
|
|
.json(inputPath)
|
|
|
|
|
.where("DOI is not null");
|
|
|
|
|
.read()
|
|
|
|
|
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
|
|
|
|
|
.json(inputPath)
|
|
|
|
|
.where("DOI is not null");
|
|
|
|
|
|
|
|
|
|
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
|
|
|
@ -174,41 +176,41 @@ public class PrepareAffiliationRelations implements Serializable {
|
|
|
|
|
|
|
|
|
|
// prepare action sets for affiliation relations
|
|
|
|
|
return df
|
|
|
|
|
.toJavaRDD()
|
|
|
|
|
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
|
|
|
|
.toJavaRDD()
|
|
|
|
|
.flatMap((FlatMapFunction<Row, Relation>) row -> {
|
|
|
|
|
|
|
|
|
|
// DOI to OpenAIRE id
|
|
|
|
|
final String paperId = ID_PREFIX
|
|
|
|
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
|
|
|
|
|
// DOI to OpenAIRE id
|
|
|
|
|
final String paperId = ID_PREFIX
|
|
|
|
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
|
|
|
|
|
|
|
|
|
|
// ROR id to OpenAIRE id
|
|
|
|
|
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
|
|
|
|
|
// ROR id to OpenAIRE id
|
|
|
|
|
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
|
|
|
|
|
|
|
|
|
|
Qualifier qualifier = OafMapperUtils
|
|
|
|
|
.qualifier(
|
|
|
|
|
BIP_AFFILIATIONS_CLASSID,
|
|
|
|
|
BIP_AFFILIATIONS_CLASSNAME,
|
|
|
|
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
|
|
|
|
ModelConstants.DNET_PROVENANCE_ACTIONS);
|
|
|
|
|
Qualifier qualifier = OafMapperUtils
|
|
|
|
|
.qualifier(
|
|
|
|
|
BIP_AFFILIATIONS_CLASSID,
|
|
|
|
|
BIP_AFFILIATIONS_CLASSNAME,
|
|
|
|
|
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
|
|
|
|
ModelConstants.DNET_PROVENANCE_ACTIONS);
|
|
|
|
|
|
|
|
|
|
// format data info; setting `confidence` into relation's `trust`
|
|
|
|
|
DataInfo dataInfo = OafMapperUtils
|
|
|
|
|
.dataInfo(
|
|
|
|
|
false,
|
|
|
|
|
BIP_INFERENCE_PROVENANCE,
|
|
|
|
|
true,
|
|
|
|
|
false,
|
|
|
|
|
qualifier,
|
|
|
|
|
Double.toString(row.getAs("confidence")));
|
|
|
|
|
// format data info; setting `confidence` into relation's `trust`
|
|
|
|
|
DataInfo dataInfo = OafMapperUtils
|
|
|
|
|
.dataInfo(
|
|
|
|
|
false,
|
|
|
|
|
BIP_INFERENCE_PROVENANCE,
|
|
|
|
|
true,
|
|
|
|
|
false,
|
|
|
|
|
qualifier,
|
|
|
|
|
Double.toString(row.getAs("confidence")));
|
|
|
|
|
|
|
|
|
|
// return bi-directional relations
|
|
|
|
|
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
|
|
|
|
// return bi-directional relations
|
|
|
|
|
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
|
|
|
|
|
|
|
|
|
})
|
|
|
|
|
.map(p -> new AtomicAction(Relation.class, p))
|
|
|
|
|
.mapToPair(
|
|
|
|
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
|
|
|
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
|
|
|
|
})
|
|
|
|
|
.map(p -> new AtomicAction(Relation.class, p))
|
|
|
|
|
.mapToPair(
|
|
|
|
|
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
|
|
|
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
|
|
|
|