diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 1f2145d66..62556b16b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -11,6 +11,7 @@ import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.Subject; @@ -93,4 +94,9 @@ public class Constants { return s; } + + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java new file mode 100644 index 000000000..a9c610de7 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -0,0 +1,160 @@ + +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; + +/** + * Creates action sets for Crossref affiliation relations inferred by BIP! + */ +public class PrepareAffiliationRelations implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________::"; + public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; + public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Constants.removeOutputDir(spark, outputPath); + prepareAffiliationRelations(spark, inputPath, outputPath); + }); + } + + private static void prepareAffiliationRelations(SparkSession spark, String inputPath, + String outputPath) { + + // load and parse affiliation relations from HDFS + Dataset df = spark + .read() + .schema("`DOI` STRING, `Matchings` ARRAY,`Confidence`:DOUBLE>>") + .json(inputPath); + + // unroll nested arrays + df = df + .withColumn("matching", functions.explode(new Column("Matchings"))) + .withColumn("rorid", functions.explode(new Column("matching.RORid"))) + .select( + new Column("DOI").as("doi"), + new Column("rorid"), + new Column("matching.Confidence").as("confidence")); + + // prepare action sets for affiliation relations + df + .toJavaRDD() + .flatMap((FlatMapFunction) row -> { + + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); + + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); + + Qualifier qualifier = OafMapperUtils + .qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); + + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(row.getAs("confidence"))); + + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); + + }) + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + + } + + private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + return Arrays + .asList( + OafMapperUtils + .getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null), + OafMapperUtils + .getRelation( + affId, + paperId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + null, + dataInfo, + null)); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index 1be2a96fd..5f3493d56 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -168,7 +168,7 @@ public class GenerateRorActionSetJob { } - private static String calculateOpenaireId(final String rorId) { + public static String calculateOpenaireId(final String rorId) { return String.format("20|%s::%s", Constants.ROR_NS_PREFIX, DHPUtils.md5(rorId)); } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json new file mode 100644 index 000000000..7663a454b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the URL from where to get the programme file", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties new file mode 100644 index 000000000..43d86ee09 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -0,0 +1,35 @@ +# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties --- +# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos +# dhp.hadoop.frontend.user.name=ilias.kanellos +# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl +# dhp.hadoop.frontend.port.ssh=22 +# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie +# jobTracker=yarnRM +# nameNode=hdfs://nameservice1 +# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log +# maven.executable=mvn + +# Some memory and driver settings for more demanding tasks +sparkDriverMemory=10G +sparkExecutorMemory=10G +sparkExecutorCores=4 +sparkShufflePartitions=7680 + +# The above is given differently in an example I found online +oozie.action.sharelib.for.spark=spark2 +oozieActionShareLibForSpark2=spark2 +spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 +spark2EventLogDir=/user/spark/spark2ApplicationHistory +sparkSqlWarehouseDir=/user/hive/warehouse +hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 +# This MAY avoid the no library used error +oozie.use.system.libpath=true +# Some stuff copied from openaire's jobs +spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener +spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener + +# The following is needed as a property of a workflow +oozie.wf.application.path=${oozieTopWfApplicationPath} + +inputPath=/user/schatz/affiliations/data-v3.1.json +outputPath=/tmp/crossref-affiliations-output-v3.1 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml new file mode 100644 index 000000000..9930cfe17 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -0,0 +1,107 @@ + + + + + inputPath + the path where to find the inferred affiliation relations + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + yarn + cluster + Produces the atomic action with the inferred by BIP! affiliation relations from Crossref + eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${inputPath} + --outputPath${outputPath} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java new file mode 100644 index 000000000..72aabde7f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -0,0 +1,145 @@ + +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; + +public class PrepareAffiliationRelationsTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final String ID_PREFIX = "50|doi_________::"; + private static final Logger log = LoggerFactory + .getLogger(PrepareAffiliationRelationsTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); + + log.info("Using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testMatch() throws Exception { + + String affiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); + + String outputPath = workingDir.toString() + "/actionSet"; + + PrepareAffiliationRelations + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", affiliationRelationsPath, + "-outputPath", outputPath + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + +// for (Relation r : tmp.collect()) { +// System.out.println( +// r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() +// ); +// } + // count the number of relations + assertEquals(16, tmp.count()); + + Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + dataset.createOrReplaceTempView("result"); + + Dataset execVerification = spark + .sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); + + // verify that we have equal number of bi-directional relations + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") + .collectAsList() + .size()); + + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") + .collectAsList() + .size()); + + // check confidence value of a specific relation + String sourceDOI = "10.1105/tpc.8.3.343"; + + final String sourceOpenaireId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); + + Assertions + .assertEquals( + "0.7071067812", execVerification + .filter( + "source='" + sourceOpenaireId + "'") + .collectAsList() + .get(0) + .getString(4)); + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json new file mode 100644 index 000000000..3b067dcc8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json @@ -0,0 +1,6 @@ +{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]} +{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]} +{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]} +{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]} +{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]} +{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]} \ No newline at end of file