forked from D-Net/dnet-hadoop
Merge branch 'beta' into invisible_relations
This commit is contained in:
commit
adec6692ca
|
@ -80,16 +80,15 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
.read()
|
.read()
|
||||||
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:ARRAY<STRING>,`Confidence`:DOUBLE>>")
|
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:STRING,`Confidence`:DOUBLE>>")
|
||||||
.json(inputPath);
|
.json(inputPath);
|
||||||
|
|
||||||
// unroll nested arrays
|
// unroll nested arrays
|
||||||
df = df
|
df = df
|
||||||
.withColumn("matching", functions.explode(new Column("Matchings")))
|
.withColumn("matching", functions.explode(new Column("Matchings")))
|
||||||
.withColumn("rorid", functions.explode(new Column("matching.RORid")))
|
|
||||||
.select(
|
.select(
|
||||||
new Column("DOI").as("doi"),
|
new Column("DOI").as("doi"),
|
||||||
new Column("rorid"),
|
new Column("matching.RORid").as("rorid"),
|
||||||
new Column("matching.Confidence").as("confidence"));
|
new Column("matching.Confidence").as("confidence"));
|
||||||
|
|
||||||
// prepare action sets for affiliation relations
|
// prepare action sets for affiliation relations
|
||||||
|
@ -121,8 +120,10 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
qualifier,
|
qualifier,
|
||||||
Double.toString(row.getAs("confidence")));
|
Double.toString(row.getAs("confidence")));
|
||||||
|
|
||||||
|
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
|
||||||
|
|
||||||
// return bi-directional relations
|
// return bi-directional relations
|
||||||
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
|
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
|
||||||
|
|
||||||
})
|
})
|
||||||
.map(p -> new AtomicAction(Relation.class, p))
|
.map(p -> new AtomicAction(Relation.class, p))
|
||||||
|
@ -133,7 +134,8 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) {
|
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,
|
||||||
|
DataInfo dataInfo) {
|
||||||
return Arrays
|
return Arrays
|
||||||
.asList(
|
.asList(
|
||||||
OafMapperUtils
|
OafMapperUtils
|
||||||
|
@ -143,7 +145,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
ModelConstants.RESULT_ORGANIZATION,
|
ModelConstants.RESULT_ORGANIZATION,
|
||||||
ModelConstants.AFFILIATION,
|
ModelConstants.AFFILIATION,
|
||||||
ModelConstants.HAS_AUTHOR_INSTITUTION,
|
ModelConstants.HAS_AUTHOR_INSTITUTION,
|
||||||
null,
|
collectedfrom,
|
||||||
dataInfo,
|
dataInfo,
|
||||||
null),
|
null),
|
||||||
OafMapperUtils
|
OafMapperUtils
|
||||||
|
@ -153,7 +155,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
ModelConstants.RESULT_ORGANIZATION,
|
ModelConstants.RESULT_ORGANIZATION,
|
||||||
ModelConstants.AFFILIATION,
|
ModelConstants.AFFILIATION,
|
||||||
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
|
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
|
||||||
null,
|
collectedfrom,
|
||||||
dataInfo,
|
dataInfo,
|
||||||
null));
|
null));
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,5 +31,5 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
||||||
# The following is needed as a property of a workflow
|
# The following is needed as a property of a workflow
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
|
|
||||||
inputPath=/user/schatz/affiliations/data-v3.1.json
|
inputPath=/data/bip-affiliations/data.json
|
||||||
outputPath=/tmp/crossref-affiliations-output-v3.1
|
outputPath=/tmp/crossref-affiliations-output-v5
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(16, tmp.count());
|
assertEquals(20, tmp.count());
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
8, execVerification
|
10, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -120,14 +120,14 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
8, execVerification
|
10, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
.size());
|
.size());
|
||||||
|
|
||||||
// check confidence value of a specific relation
|
// check confidence value of a specific relation
|
||||||
String sourceDOI = "10.1105/tpc.8.3.343";
|
String sourceDOI = "10.1061/(asce)0733-9399(2002)128:7(759)";
|
||||||
|
|
||||||
final String sourceOpenaireId = ID_PREFIX
|
final String sourceOpenaireId = ID_PREFIX
|
||||||
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));
|
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]}
|
{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":"https:\/\/ror.org\/03yxnpp24","Confidence":0.7071067812},{"RORid":"https:\/\/ror.org\/01teme464","Confidence":0.89}]}
|
||||||
{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]}
|
{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":"https:\/\/ror.org\/02k40bc56","Confidence":0.7071067812}]}
|
||||||
{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]}
|
{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":"https:\/\/ror.org\/00qjgza05","Confidence":1}]}
|
||||||
{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]}
|
{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":"https:\/\/ror.org\/035xkbk20","Confidence":1},{"RORid":"https:\/\/ror.org\/05apxxy63","Confidence":1}]}
|
||||||
{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]}
|
{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":"https:\/\/ror.org\/04j198w64","Confidence":0.82}]}
|
||||||
{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]}
|
{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(588)","Matchings":[{"RORid":"https:\/\/ror.org\/03m8km719","Confidence":0.8660254038},{"RORid":"https:\/\/ror.org\/02aze4h65","Confidence":0.87}]}
|
||||||
|
{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":"https:\/\/ror.org\/057xtrt18","Confidence":0.7071067812}]}
|
Loading…
Reference in New Issue