Add Pubmed affiliations (inferred by BIP) as actionsets #353

Merged
claudio.atzori merged 5 commits from 9117_pubmed_affiliations into beta 2023-11-22 13:53:07 +01:00
5 changed files with 58 additions and 25 deletions

View File

@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -57,11 +58,14 @@ public class PrepareAffiliationRelations implements Serializable {
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath"); final String crossrefInputPath = parser.get("crossrefInputPath");
log.info("inputPath {}: ", inputPath); log.info("crossrefInputPath: {}", crossrefInputPath);
schatz marked this conversation as resolved Outdated

As the workflow now assumes now two input paths, I suggest to align the parameter naming convention renaming this one to crossrefInputPath.

As the workflow now assumes now two input paths, I suggest to align the parameter naming convention renaming this one to `crossrefInputPath`.
final String pubmedInputPath = parser.get("pubmedInputPath");
log.info("pubmedInputPath: {}", pubmedInputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -70,12 +74,28 @@ public class PrepareAffiliationRelations implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Constants.removeOutputDir(spark, outputPath); Constants.removeOutputDir(spark, outputPath);
prepareAffiliationRelations(spark, inputPath, outputPath);
List<KeyValue> collectedFromCrossref = OafMapperUtils
.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
JavaPairRDD<Text, Text> crossrefRelations = prepareAffiliationRelations(
spark, crossrefInputPath, collectedFromCrossref);
List<KeyValue> collectedFromPubmed = OafMapperUtils
.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed");
JavaPairRDD<Text, Text> pubmedRelations = prepareAffiliationRelations(
spark, pubmedInputPath, collectedFromPubmed);
crossrefRelations
.union(pubmedRelations)
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}); });
} }
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath, private static <I extends Result> JavaPairRDD<Text, Text> prepareAffiliationRelations(SparkSession spark,
String outputPath) { String inputPath,
List<KeyValue> collectedfrom) {
// load and parse affiliation relations from HDFS // load and parse affiliation relations from HDFS
Dataset<Row> df = spark Dataset<Row> df = spark
@ -92,7 +112,7 @@ public class PrepareAffiliationRelations implements Serializable {
new Column("matching.Confidence").as("confidence")); new Column("matching.Confidence").as("confidence"));
// prepare action sets for affiliation relations // prepare action sets for affiliation relations
df return df
.toJavaRDD() .toJavaRDD()
.flatMap((FlatMapFunction<Row, Relation>) row -> { .flatMap((FlatMapFunction<Row, Relation>) row -> {
@ -120,8 +140,6 @@ public class PrepareAffiliationRelations implements Serializable {
qualifier, qualifier,
Double.toString(row.getAs("confidence"))); Double.toString(row.getAs("confidence")));
List<KeyValue> collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref");
// return bi-directional relations // return bi-directional relations
return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator();
@ -129,9 +147,7 @@ public class PrepareAffiliationRelations implements Serializable {
.map(p -> new AtomicAction(Relation.class, p)) .map(p -> new AtomicAction(Relation.class, p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))));
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
} }
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom, private static List<Relation> getAffiliationRelationPair(String paperId, String affId, List<KeyValue> collectedfrom,

View File

@ -6,9 +6,15 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "ip", "paramName": "cip",
"paramLongName": "inputPath", "paramLongName": "crossrefInputPath",
schatz marked this conversation as resolved Outdated

As the workflow now assumes two input paths, I suggest to align the parameter naming convention renaming this one to crossrefInputPath.

As the workflow now assumes two input paths, I suggest to align the parameter naming convention renaming this one to `crossrefInputPath`.
"paramDescription": "the URL from where to get the programme file", "paramDescription": "the path to get the input data from Crossref",
"paramRequired": true
},
{
"paramName": "pip",
"paramLongName": "pubmedInputPath",
"paramDescription": "the path to get the input data from Pubmed",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -31,5 +31,6 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen
# The following is needed as a property of a workflow # The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath} oozie.wf.application.path=${oozieTopWfApplicationPath}
inputPath=/data/bip-affiliations/data.json crossrefInputPath=/data/bip-affiliations/data.json
schatz marked this conversation as resolved Outdated

parameter renaming to be reflected here as well

parameter renaming to be reflected here as well
pubmedInputPath=/data/bip-affiliations/pubmed-data.json
outputPath=/tmp/crossref-affiliations-output-v5 outputPath=/tmp/crossref-affiliations-output-v5

View File

@ -2,8 +2,12 @@
<parameters> <parameters>
<property> <property>
<name>inputPath</name> <name>crossrefInputPath</name>
schatz marked this conversation as resolved Outdated

again, parameter renaming to be reflected here as well

again, parameter renaming to be reflected here as well
<description>the path where to find the inferred affiliation relations</description> <description>the path where to find the inferred affiliation relations from Crossref</description>
</property>
<property>
<name>pubmedInputPath</name>
<description>the path where to find the inferred affiliation relations from Pubmed</description>
</property> </property>
<property> <property>
<name>outputPath</name> <name>outputPath</name>
@ -83,7 +87,7 @@
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name> <name>Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed)</name>
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class> <class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar> <jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
@ -96,7 +100,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts> </spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg> <arg>--crossrefInputPath</arg><arg>${crossrefInputPath}</arg>
schatz marked this conversation as resolved Outdated

again, parameter renaming to be reflected here as well

again, parameter renaming to be reflected here as well
<arg>--pubmedInputPath</arg><arg>${pubmedInputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -74,7 +74,11 @@ public class PrepareAffiliationRelationsTest {
@Test @Test
void testMatch() throws Exception { void testMatch() throws Exception {
String affiliationRelationsPath = getClass() String crossrefAffiliationRelationPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
String pubmedAffiliationRelationsPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath(); .getPath();
@ -84,7 +88,8 @@ public class PrepareAffiliationRelationsTest {
.main( .main(
new String[] { new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(), "-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath", affiliationRelationsPath, "-crossrefInputPath", crossrefAffiliationRelationPath,
"-pubmedInputPath", pubmedAffiliationRelationsPath,
"-outputPath", outputPath "-outputPath", outputPath
}); });
@ -101,7 +106,7 @@ public class PrepareAffiliationRelationsTest {
// ); // );
// } // }
// count the number of relations // count the number of relations
assertEquals(20, tmp.count()); assertEquals(40, tmp.count());
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result"); dataset.createOrReplaceTempView("result");
@ -112,7 +117,7 @@ public class PrepareAffiliationRelationsTest {
// verify that we have equal number of bi-directional relations // verify that we have equal number of bi-directional relations
Assertions Assertions
.assertEquals( .assertEquals(
10, execVerification 20, execVerification
.filter( .filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList() .collectAsList()
@ -120,7 +125,7 @@ public class PrepareAffiliationRelationsTest {
Assertions Assertions
.assertEquals( .assertEquals(
10, execVerification 20, execVerification
.filter( .filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList() .collectAsList()