forked from D-Net/dnet-hadoop
Add Action Set creation for affiliations inferred from the OpenAPC data
This commit is contained in:
parent
eca021f4d6
commit
f0dc12634b
|
@ -64,6 +64,9 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
final String pubmedInputPath = parser.get("pubmedInputPath");
|
final String pubmedInputPath = parser.get("pubmedInputPath");
|
||||||
log.info("pubmedInputPath: {}", pubmedInputPath);
|
log.info("pubmedInputPath: {}", pubmedInputPath);
|
||||||
|
|
||||||
|
final String openapcInputPath = parser.get("openapcInputPath");
|
||||||
|
log.info("openapcInputPath: {}", openapcInputPath);
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
log.info("outputPath: {}", outputPath);
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
@ -85,8 +88,14 @@ public class PrepareAffiliationRelations implements Serializable {
|
||||||
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
|
||||||
spark, pubmedInputPath, collectedFromPubmed);
|
spark, pubmedInputPath, collectedFromPubmed);
|
||||||
|
|
||||||
|
List<KeyValue> collectedFromOpenAPC = OafMapperUtils
|
||||||
|
.listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC");
|
||||||
|
JavaPairRDD<Text, Text> openAPCRelations = prepareAffiliationRelations(
|
||||||
|
spark, openapcInputPath, collectedFromOpenAPC);
|
||||||
|
|
||||||
crossrefRelations
|
crossrefRelations
|
||||||
.union(pubmedRelations)
|
.union(pubmedRelations)
|
||||||
|
.union(openAPCRelations)
|
||||||
.saveAsHadoopFile(
|
.saveAsHadoopFile(
|
||||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,12 @@
|
||||||
"paramDescription": "the path to get the input data from Pubmed",
|
"paramDescription": "the path to get the input data from Pubmed",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"paramName": "oip",
|
||||||
|
"paramLongName": "openapcInputPath",
|
||||||
|
"paramDescription": "the path to get the input data from OpenAPC",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"paramName": "o",
|
"paramName": "o",
|
||||||
"paramLongName": "outputPath",
|
"paramLongName": "outputPath",
|
||||||
|
|
|
@ -31,6 +31,7 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
|
||||||
# The following is needed as a property of a workflow
|
# The following is needed as a property of a workflow
|
||||||
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
oozie.wf.application.path=${oozieTopWfApplicationPath}
|
||||||
|
|
||||||
crossrefInputPath=/data/bip-affiliations/data.json
|
crossrefInputPath=/data/bip-affiliations/crossref-data.json
|
||||||
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
|
||||||
|
openapcInputPath=/data/bip-affiliations/openapc-data.json
|
||||||
outputPath=/tmp/crossref-affiliations-output-v5
|
outputPath=/tmp/crossref-affiliations-output-v5
|
||||||
|
|
|
@ -9,6 +9,10 @@
|
||||||
<name>pubmedInputPath</name>
|
<name>pubmedInputPath</name>
|
||||||
<description>the path where to find the inferred affiliation relations from Pubmed</description>
|
<description>the path where to find the inferred affiliation relations from Pubmed</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>openapcInputPath</name>
|
||||||
|
<description>the path where to find the inferred affiliation relations from OpenAPC</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>outputPath</name>
|
<name>outputPath</name>
|
||||||
<description>the path where to store the actionset</description>
|
<description>the path where to store the actionset</description>
|
||||||
|
@ -102,6 +106,7 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
<arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
|
||||||
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
|
||||||
|
<arg>--openapcInputPath</arg><arg>${openapcInputPath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -78,10 +78,6 @@ public class PrepareAffiliationRelationsTest {
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
||||||
.getPath();
|
.getPath();
|
||||||
|
|
||||||
String pubmedAffiliationRelationsPath = getClass()
|
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
|
|
||||||
.getPath();
|
|
||||||
|
|
||||||
String outputPath = workingDir.toString() + "/actionSet";
|
String outputPath = workingDir.toString() + "/actionSet";
|
||||||
|
|
||||||
PrepareAffiliationRelations
|
PrepareAffiliationRelations
|
||||||
|
@ -89,7 +85,8 @@ public class PrepareAffiliationRelationsTest {
|
||||||
new String[] {
|
new String[] {
|
||||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||||
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
"-crossrefInputPath", crossrefAffiliationRelationPath,
|
||||||
"-pubmedInputPath", pubmedAffiliationRelationsPath,
|
"-pubmedInputPath", crossrefAffiliationRelationPath,
|
||||||
|
"-openapcInputPath", crossrefAffiliationRelationPath,
|
||||||
"-outputPath", outputPath
|
"-outputPath", outputPath
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -106,7 +103,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// count the number of relations
|
// count the number of relations
|
||||||
assertEquals(40, tmp.count());
|
assertEquals(60, tmp.count());
|
||||||
|
|
||||||
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
|
||||||
dataset.createOrReplaceTempView("result");
|
dataset.createOrReplaceTempView("result");
|
||||||
|
@ -117,7 +114,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
// verify that we have equal number of bi-directional relations
|
// verify that we have equal number of bi-directional relations
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
20, execVerification
|
30, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
@ -125,7 +122,7 @@ public class PrepareAffiliationRelationsTest {
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
20, execVerification
|
30, execVerification
|
||||||
.filter(
|
.filter(
|
||||||
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
|
||||||
.collectAsList()
|
.collectAsList()
|
||||||
|
|
Loading…
Reference in New Issue