forked from D-Net/dnet-hadoop
[AffiliationAffRo]align beta with new affiliation from publisher webpage introduced in production. AffRo collectedfrom OpenAIRE to discriminate against WebCrawl
This commit is contained in:
parent
89fcf4086c
commit
468f2aa5a5
|
@ -44,6 +44,8 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference";
|
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference";
|
||||||
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE";
|
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE";
|
||||||
public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation";
|
public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation";
|
||||||
|
public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556";
|
||||||
|
public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE";
|
||||||
|
|
||||||
public static <I extends Result> void main(String[] args) throws Exception {
|
public static <I extends Result> void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
@ -74,6 +76,9 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
final String webcrawlInputPath = parser.get("webCrawlInputPath");
|
final String webcrawlInputPath = parser.get("webCrawlInputPath");
|
||||||
log.info("webcrawlInputPath: {}", webcrawlInputPath);
|
log.info("webcrawlInputPath: {}", webcrawlInputPath);
|
||||||
|
|
||||||
|
final String publisherInputPath = parser.get("publisherInputPath");
|
||||||
|
log.info("publisherInputPath: {}", publisherInputPath);
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
@ -84,7 +89,15 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
Constants.removeOutputDir(spark, outputPath);
|
Constants.removeOutputDir(spark, outputPath);
|
||||||
|
createActionSet(
|
||||||
|
spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath,
|
||||||
|
publisherInputPath, outputPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath,
|
||||||
|
String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath,
|
||||||
|
String outputPath) {
|
||||||
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
List<KeyValue> collectedFromCrossref = OafMapperUtils
|
||||||
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
|
||||||
|
@ -106,19 +119,36 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
spark, dataciteInputPath, collectedFromDatacite);
|
spark, dataciteInputPath, collectedFromDatacite);
|
||||||
|
|
||||||
List<KeyValue> collectedFromWebCrawl = OafMapperUtils
|
List<KeyValue> collectedFromWebCrawl = OafMapperUtils
|
||||||
.listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME);
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||||
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
||||||
spark, webcrawlInputPath, collectedFromWebCrawl);
|
spark, webcrawlInputPath, collectedFromWebCrawl);
|
||||||
|
|
||||||
|
List<KeyValue> collectedfromPublisher = OafMapperUtils
|
||||||
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||||
|
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
|
||||||
|
spark, publisherlInputPath, collectedfromPublisher);
|
||||||
|
|
||||||
crossrefRelations
|
crossrefRelations
|
||||||
.union(pubmedRelations)
|
.union(pubmedRelations)
|
||||||
.union(openAPCRelations)
|
.union(openAPCRelations)
|
||||||
.union(dataciteRelations)
|
.union(dataciteRelations)
|
||||||
.union(webCrawlRelations)
|
.union(webCrawlRelations)
|
||||||
|
.union(publisherRelations)
|
||||||
.saveAsHadoopFile(
|
.saveAsHadoopFile(
|
||||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
||||||
|
List<KeyValue> collectedfrom) {
|
||||||
|
|
||||||
|
Dataset<Row> df = spark
|
||||||
|
.read()
|
||||||
|
.schema("`DOI` STRING, `Organizations` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
|
||||||
|
.json(inputPath)
|
||||||
|
.where("DOI is not null");
|
||||||
|
|
||||||
|
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
|
||||||
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
||||||
|
@ -132,6 +162,10 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.where("DOI is not null");
|
.where("DOI is not null");
|
||||||
|
|
||||||
|
return getTextTextJavaPairRDD(collectedfrom, df);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) {
|
||||||
// unroll nested arrays
|
// unroll nested arrays
|
||||||
df = df
|
df = df
|
||||||
.withColumn("matching", functions.explode(new Column("Matchings")))
|
.withColumn("matching", functions.explode(new Column("Matchings")))
|
||||||
|
|
|
@ -78,6 +78,10 @@ public class PrepareAffiliationRelationsTest {
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
|
String publisherAffiliationRelationPath = getClass()
|
||||||
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publishers")
|
||||||
|
.getPath();
|
||||||
|
|
||||||
String outputPath = workingDir.toString() + "/actionSet";
|
String outputPath = workingDir.toString() + "/actionSet";
|
||||||
|
|
||||||
PrepareAffiliationRelations
|
PrepareAffiliationRelations
|
||||||
|
@ -89,6 +93,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
"-openapcInputPath", crossrefAffiliationRelationPath,
|
"-openapcInputPath", crossrefAffiliationRelationPath,
|
||||||
"-dataciteInputPath", crossrefAffiliationRelationPath,
|
"-dataciteInputPath", crossrefAffiliationRelationPath,
|
||||||
"-webCrawlInputPath", crossrefAffiliationRelationPath,
|
"-webCrawlInputPath", crossrefAffiliationRelationPath,
|
||||||
|
"-publisherInputPath", publisherAffiliationRelationPath,
|
||||||
"-outputPath", outputPath
|
"-outputPath", outputPath
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -105,7 +110,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(120, tmp.count());
|
assertEquals(138, tmp.count());
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -116,7 +121,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
60, execVerification
|
69, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -124,7 +129,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
60, execVerification
|
69, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -145,5 +150,12 @@ public class PrepareAffiliationRelationsTest {
|
||||||
.get(0)
|
.get(0)
|
||||||
.getString(4));
|
.getString(4));
|
||||||
|
|
||||||
|
final String publisherid = ID_PREFIX
|
||||||
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s00217-010-1268-9"));
|
||||||
|
final String rorId = "20|ror_________::" + IdentifierFactory.md5("https://ror.org/03265fv13");
|
||||||
|
|
||||||
|
Assertions
|
||||||
|
.assertEquals(
|
||||||
|
1, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue