affroNewModelonBeta #494
|
@ -34,7 +34,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates action sets for Crossref affiliation relations inferred by BIP!
|
* Creates action sets for Crossref affiliation relations inferred by OpenAIRE
|
||||||
*/
|
*/
|
||||||
public class PrepareAffiliationRelations implements Serializable {
|
public class PrepareAffiliationRelations implements Serializable {
|
||||||
|
|
||||||
|
@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
|
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelationsNewModel(
|
||||||
spark, crossrefInputPath, collectedfromOpenAIRE);
|
spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref");
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||||
spark, pubmedInputPath, collectedfromOpenAIRE);
|
spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed");
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
|
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelationsNewModel(
|
||||||
spark, openapcInputPath, collectedfromOpenAIRE);
|
spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc");
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> dataciteRelations = prepareAffiliationRelationsNewModel(
|
||||||
spark, dataciteInputPath, collectedfromOpenAIRE);
|
spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite");
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> webCrawlRelations = prepareAffiliationRelationsNewModel(
|
||||||
spark, webcrawlInputPath, collectedfromOpenAIRE);
|
spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff");
|
||||||
|
|
||||||
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisher(
|
JavaPairRDD<Text, Text> publisherRelations = prepareAffiliationRelationFromPublisherNewModel(
|
||||||
spark, publisherlInputPath, collectedfromOpenAIRE);
|
spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl");
|
||||||
|
|
||||||
crossrefRelations
|
crossrefRelations
|
||||||
.union(pubmedRelations)
|
.union(pubmedRelations)
|
||||||
|
@ -133,7 +133,8 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark,
|
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisherNewModel(SparkSession spark,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
List<KeyValue> collectedfrom) {
|
List<KeyValue> collectedfrom,
|
||||||
|
String dataprovenance) {
|
||||||
|
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -142,12 +143,13 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.where("DOI is not null");
|
.where("DOI is not null");
|
||||||
|
|
||||||
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
|
return getTextTextJavaPairRDDNew(
|
||||||
|
collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
private static JavaPairRDD<Text, Text> prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath,
|
||||||
List<KeyValue> collectedfrom) {
|
List<KeyValue> collectedfrom, String dataprovenance) {
|
||||||
|
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -155,13 +157,14 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.where("DOI is not null");
|
.where("DOI is not null");
|
||||||
|
|
||||||
return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"));
|
return getTextTextJavaPairRDD(
|
||||||
|
collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
List<KeyValue> collectedfrom) {
|
List<KeyValue> collectedfrom, String dataprovenance) {
|
||||||
|
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
|
@ -170,12 +173,12 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.where("DOI is not null");
|
.where("DOI is not null");
|
||||||
|
|
||||||
return getTextTextJavaPairRDD(collectedfrom, df);
|
return getTextTextJavaPairRDD(collectedfrom, df, dataprovenance);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelationsNewModel(SparkSession spark,
|
private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelationsNewModel(SparkSession spark,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
List<KeyValue> collectedfrom) {
|
List<KeyValue> collectedfrom, String dataprovenance) {
|
||||||
// load and parse affiliation relations from HDFS
|
// load and parse affiliation relations from HDFS
|
||||||
Dataset<Row> df = spark
|
Dataset<Row> df = spark
|
||||||
.read()
|
.read()
|
||||||
|
@ -184,10 +187,11 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
.json(inputPath)
|
.json(inputPath)
|
||||||
.where("DOI is not null");
|
.where("DOI is not null");
|
||||||
|
|
||||||
return getTextTextJavaPairRDDNew(collectedfrom, df);
|
return getTextTextJavaPairRDDNew(collectedfrom, df, dataprovenance);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df) {
|
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDD(List<KeyValue> collectedfrom, Dataset<Row> df,
|
||||||
|
String dataprovenance) {
|
||||||
// unroll nested arrays
|
// unroll nested arrays
|
||||||
df = df
|
df = df
|
||||||
.withColumn("matching", functions.explode(new Column("Matchings")))
|
.withColumn("matching", functions.explode(new Column("Matchings")))
|
||||||
|
@ -219,7 +223,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
DataInfo dataInfo = OafMapperUtils
|
DataInfo dataInfo = OafMapperUtils
|
||||||
.dataInfo(
|
.dataInfo(
|
||||||
false,
|
false,
|
||||||
BIP_INFERENCE_PROVENANCE,
|
dataprovenance,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
qualifier,
|
qualifier,
|
||||||
|
@ -235,7 +239,8 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df) {
|
private static JavaPairRDD<Text, Text> getTextTextJavaPairRDDNew(List<KeyValue> collectedfrom, Dataset<Row> df,
|
||||||
|
String dataprovenance) {
|
||||||
// unroll nested arrays
|
// unroll nested arrays
|
||||||
df = df
|
df = df
|
||||||
.withColumn("matching", functions.explode(new Column("Matchings")))
|
.withColumn("matching", functions.explode(new Column("Matchings")))
|
||||||
|
@ -276,7 +281,7 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
DataInfo dataInfo = OafMapperUtils
|
DataInfo dataInfo = OafMapperUtils
|
||||||
.dataInfo(
|
.dataInfo(
|
||||||
false,
|
false,
|
||||||
BIP_INFERENCE_PROVENANCE,
|
dataprovenance,
|
||||||
true,
|
true,
|
||||||
false,
|
false,
|
||||||
qualifier,
|
qualifier,
|
||||||
|
|
|
@ -31,9 +31,11 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
||||||
# The following is needed as a property of a workflow
|
# The following is needed as a property of a workflow
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
|
|
||||||
crossrefInputPath=/data/bip-affiliations/crossref-data.json
|
crossrefInputPath=/data/openaire-affiliations/crossref-data.json
|
||||||
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
pubmedInputPath=/data/openaire-affiliations/pubmed-data-v4.json
|
||||||
openapcInputPath=/data/bip-affiliations/openapc-data.json
|
openapcInputPath=/data/openaire-affiliations/openapc-data.json
|
||||||
dataciteInputPath=/data/bip-affiliations/datacite-data.json
|
dataciteInputPath=/data/openaire-affiliations/datacite-data.json
|
||||||
|
webCrawlInputPath=/data/openaire-affiliations/webCrawl
|
||||||
|
publisherInputPath=/data/openaire-affiliations/publishers
|
||||||
|
|
||||||
outputPath=/tmp/crossref-affiliations-output-v5
|
outputPath=/tmp/affRoAS
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
<workflow-app name="BipAffiliations" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="OpenAIREAffiliations" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
|
@ -21,6 +21,10 @@
|
||||||
<name>webCrawlInputPath</name>
|
<name>webCrawlInputPath</name>
|
||||||
<description>the path where to find the inferred affiliation relations from webCrawl</description>
|
<description>the path where to find the inferred affiliation relations from webCrawl</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>publisherInputPath</name>
|
||||||
|
<description>the path where to find the inferred affiliation relations from publisher websites</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
<description>the path where to store the actionset</description>
|
<description>the path where to store the actionset</description>
|
||||||
|
@ -99,7 +103,7 @@
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
|
<name>Produces the atomic action with the inferred by OpenAIRE affiliation relations</name>
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
|
@ -117,6 +121,7 @@
|
||||||
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
|
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
|
||||||
<arg>--dataciteInputPath</arg><arg>${dataciteInputPath}</arg>
|
<arg>--dataciteInputPath</arg><arg>${dataciteInputPath}</arg>
|
||||||
<arg>--webCrawlInputPath</arg><arg>${webCrawlInputPath}</arg>
|
<arg>--webCrawlInputPath</arg><arg>${webCrawlInputPath}</arg>
|
||||||
|
<arg>--publisherInputPath</arg><arg>${publisherInputPath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -98,9 +98,9 @@ public class PrepareAffiliationRelationsTest {
|
||||||
"-crossrefInputPath", crossrefAffiliationRelationPathNew,
|
"-crossrefInputPath", crossrefAffiliationRelationPathNew,
|
||||||
"-pubmedInputPath", crossrefAffiliationRelationPath,
|
"-pubmedInputPath", crossrefAffiliationRelationPath,
|
||||||
"-openapcInputPath", crossrefAffiliationRelationPathNew,
|
"-openapcInputPath", crossrefAffiliationRelationPathNew,
|
||||||
"-dataciteInputPath", crossrefAffiliationRelationPath,
|
"-dataciteInputPath", crossrefAffiliationRelationPathNew,
|
||||||
"-webCrawlInputPath", crossrefAffiliationRelationPath,
|
"-webCrawlInputPath", crossrefAffiliationRelationPathNew,
|
||||||
"-publisherInputPath", publisherAffiliationRelationOldPath,
|
"-publisherInputPath", publisherAffiliationRelationPath,
|
||||||
"-outputPath", outputPath
|
"-outputPath", outputPath
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
.map(aa -> ((Relation) aa.getPayload()));
|
.map(aa -> ((Relation) aa.getPayload()));
|
||||||
|
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(150, tmp.count());// 18 + 24 *3 + 30 * 2 =
|
assertEquals(162, tmp.count());// 18 + 24 + 30 * 4 =
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -123,7 +123,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
75, execVerification
|
81, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -131,7 +131,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
75, execVerification
|
81, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -158,7 +158,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
2, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count());
|
4, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count());
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
|
@ -173,7 +173,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
3, execVerification
|
1, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"source = '" + ID_PREFIX
|
"source = '" + ID_PREFIX
|
||||||
+ IdentifierFactory
|
+ IdentifierFactory
|
||||||
|
|
|
@ -72,9 +72,9 @@ public class GraphHiveTableImporterJob {
|
||||||
final Encoder<T> clazzEncoder = Encoders.bean(clazz);
|
final Encoder<T> clazzEncoder = Encoders.bean(clazz);
|
||||||
|
|
||||||
Dataset<Row> dataset = spark
|
Dataset<Row> dataset = spark
|
||||||
.read()
|
.read()
|
||||||
.schema(clazzEncoder.schema())
|
.schema(clazzEncoder.schema())
|
||||||
.json(inputPath);
|
.json(inputPath);
|
||||||
|
|
||||||
if (numPartitions > 0) {
|
if (numPartitions > 0) {
|
||||||
log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions);
|
log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions);
|
||||||
|
|
|
@ -31,6 +31,7 @@ class ORCIDAuthorMatchersTest {
|
||||||
assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin"))
|
assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin"))
|
||||||
// assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented
|
// assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test def testDocumentationNames(): Unit = {
|
@Test def testDocumentationNames(): Unit = {
|
||||||
assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones"))
|
assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones"))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue