Import affiliation relations from Crossref #320

Merged
miriam.baglioni merged 12 commits from 8876 into beta 2023-08-07 10:45:31 +02:00
9 changed files with 510 additions and 1 deletions

View File

@ -11,6 +11,7 @@ import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.Subject;
@ -93,4 +94,9 @@ public class Constants {
return s;
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,160 @@
package eu.dnetlib.dhp.actionmanager.bipaffiliations;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
/**
* Creates action sets for Crossref affiliation relations inferred by BIP!
*/
public class PrepareAffiliationRelations implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String ID_PREFIX = "50|doi_________::";
public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference";
public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!";
public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref";
public static <I extends Result> void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareAffiliationRelations.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Constants.removeOutputDir(spark, outputPath);
prepareAffiliationRelations(spark, inputPath, outputPath);
});
}
private static <I extends Result> void prepareAffiliationRelations(SparkSession spark, String inputPath,
String outputPath) {
// load and parse affiliation relations from HDFS
Dataset<Row> df = spark
.read()
.schema("`DOI` STRING, `Matchings` ARRAY<STRUCT<`RORid`:ARRAY<STRING>,`Confidence`:DOUBLE>>")
.json(inputPath);
// unroll nested arrays
df = df
.withColumn("matching", functions.explode(new Column("Matchings")))
.withColumn("rorid", functions.explode(new Column("matching.RORid")))
.select(
new Column("DOI").as("doi"),
new Column("rorid"),
new Column("matching.Confidence").as("confidence"));
// prepare action sets for affiliation relations
df
.toJavaRDD()
.flatMap((FlatMapFunction<Row, Relation>) row -> {
// DOI to OpenAIRE id
final String paperId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi")));
// ROR id to OpenAIRE id
final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid"));
Qualifier qualifier = OafMapperUtils
.qualifier(
BIP_AFFILIATIONS_CLASSID,
BIP_AFFILIATIONS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS);
// format data info; setting `confidence` into relation's `trust`
DataInfo dataInfo = OafMapperUtils
.dataInfo(
false,
BIP_INFERENCE_PROVENANCE,
true,
false,
qualifier,
Double.toString(row.getAs("confidence")));
// return bi-directional relations
return getAffiliationRelationPair(paperId, affId, dataInfo).iterator();
})
.map(p -> new AtomicAction(Relation.class, p))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
private static List<Relation> getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) {
return Arrays
.asList(
OafMapperUtils
.getRelation(
paperId,
affId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.HAS_AUTHOR_INSTITUTION,
null,
dataInfo,
null),
OafMapperUtils
.getRelation(
affId,
schatz marked this conversation as resolved

It is advisable to compress output file here (using /data/bip-affiliations/data.json as the input the total disk size for output file is reduced from 50Gb to 1.5Gb)
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);

It is advisable to compress output file here (using /data/bip-affiliations/data.json as the input the total disk size for output file is reduced from 50Gb to 1.5Gb) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
Review

You are right, thanks for pointing that.

You are right, thanks for pointing that.
paperId,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
null,
dataInfo,
null));
}
}

View File

@ -168,7 +168,7 @@ public class GenerateRorActionSetJob {
}
private static String calculateOpenaireId(final String rorId) {
public static String calculateOpenaireId(final String rorId) {
return String.format("20|%s::%s", Constants.ROR_NS_PREFIX, DHPUtils.md5(rorId));
}

View File

@ -0,0 +1,20 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "ip",
"paramLongName": "inputPath",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -0,0 +1,35 @@
# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties ---
# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos
# dhp.hadoop.frontend.user.name=ilias.kanellos
# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl
# dhp.hadoop.frontend.port.ssh=22
# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie
# jobTracker=yarnRM
# nameNode=hdfs://nameservice1
# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log
# maven.executable=mvn
# Some memory and driver settings for more demanding tasks
sparkDriverMemory=10G
sparkExecutorMemory=10G
sparkExecutorCores=4
sparkShufflePartitions=7680
# The above is given differently in an example I found online
oozie.action.sharelib.for.spark=spark2
oozieActionShareLibForSpark2=spark2
spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
spark2EventLogDir=/user/spark/spark2ApplicationHistory
sparkSqlWarehouseDir=/user/hive/warehouse
hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
# This MAY avoid the no library used error
oozie.use.system.libpath=true
# Some stuff copied from openaire's jobs
spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener
# The following is needed as a property of a workflow
oozie.wf.application.path=${oozieTopWfApplicationPath}
inputPath=/user/schatz/affiliations/data-v3.1.json
outputPath=/tmp/crossref-affiliations-output-v3.1

View File

@ -0,0 +1,30 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>hiveMetastoreUris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>hiveJdbcUrl</name>
<value>jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000</value>
</property>
<property>
<name>hiveDbName</name>
<value>openaire</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,107 @@
<workflow-app name="BipAffiliations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>inputPath</name>
<description>the path where to find the inferred affiliation relations</description>
</property>
<property>
<name>outputPath</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="deleteoutputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="deleteoutputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="atomicactions"/>
<error to="Kill"/>
</action>
<action name="atomicactions">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the atomic action with the inferred by BIP! affiliation relations from Crossref</name>
<class>eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,145 @@
package eu.dnetlib.dhp.actionmanager.bipaffiliations;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
public class PrepareAffiliationRelationsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
private static final String ID_PREFIX = "50|doi_________::";
private static final Logger log = LoggerFactory
.getLogger(PrepareAffiliationRelationsTest.class);
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName());
log.info("Using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PrepareAffiliationRelationsTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testMatch() throws Exception {
String affiliationRelationsPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json")
.getPath();
String outputPath = workingDir.toString() + "/actionSet";
PrepareAffiliationRelations
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath", affiliationRelationsPath,
"-outputPath", outputPath
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.sequenceFile(outputPath, Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Relation) aa.getPayload()));
// for (Relation r : tmp.collect()) {
// System.out.println(
// r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred()
// );
// }
// count the number of relations
assertEquals(16, tmp.count());
Dataset<Relation> dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class));
dataset.createOrReplaceTempView("result");
Dataset<Row> execVerification = spark
.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r");
// verify that we have equal number of bi-directional relations
Assertions
.assertEquals(
8, execVerification
.filter(
"relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'")
.collectAsList()
.size());
Assertions
.assertEquals(
8, execVerification
.filter(
"relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'")
.collectAsList()
.size());
// check confidence value of a specific relation
String sourceDOI = "10.1105/tpc.8.3.343";
final String sourceOpenaireId = ID_PREFIX
+ IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI));
Assertions
.assertEquals(
"0.7071067812", execVerification
.filter(
"source='" + sourceOpenaireId + "'")
.collectAsList()
.get(0)
.getString(4));
}
}

View File

@ -0,0 +1,6 @@
{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]}
{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]}
{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]}
{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]}
{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]}
{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]}