From 347a889b20bcfa9f7f9f4bd67bf71086ac442d58 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 00:51:01 +0300 Subject: [PATCH 01/10] Read affiliation relations --- .../dnetlib/dhp/actionmanager/Constants.java | 5 + .../PrepareAffiliationRelations.java | 168 ++++++++++++++++++ .../AffiliationRelationDeserializer.java | 26 +++ .../model/AffiliationRelationModel.java | 15 ++ .../input_actionset_parameter.json | 20 +++ .../PrepareAffiliationRelationsTest.java | 139 +++++++++++++++ .../bipaffiliations/doi_to_ror.json | 6 + 7 files changed, 379 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 1f2145d66..8b0dab3a7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager; import java.util.Optional; +import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -93,4 +94,8 @@ public class Constants { return s; } + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java new file mode 100644 index 000000000..dce9082fd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -0,0 +1,168 @@ + +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.spark.sql.Dataset; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +/** + * created the Atomic Action for each tipe of results + */ +public class PrepareAffiliationRelations implements Serializable { + + private static final String DOI = "doi"; + private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Constants.removeOutputDir(spark, outputPath); + prepareAffiliationRelations(spark, inputPath, outputPath); + }); + } + + private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD bipDeserializeJavaRDD = sc + .textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + +// for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ +// System.out.println(rel); +// } + JavaRDD affiliationRelations = + bipDeserializeJavaRDD.flatMap(entry -> + entry.getMatchings().stream().flatMap(matching -> + matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence() + ))).collect(Collectors.toList()).iterator()); + + for(AffiliationRelationModel rel: affiliationRelations.collect()){ + System.out.println(rel); + } +// Dataset relations = spark +// .createDataset(bipDeserializeJavaRDD.flatMap(entry -> { +//// entry.keySet().stream().map(key -> { +// AffiliationRelationModel rel = new AffiliationRelationModel(entry.getDoi()) +// System.out.println(entry); +// return entry; +//// BipScore bs = new BipScore(); +//// bs.setId(key); +//// bs.setScoreList(entry.get(key)); +//// return bs; +// }).collect(Collectors.toList()).iterator()).rdd(), Encoßders.bean(AffiliationRelationModel.class)); + +// bipScores +// +// .map((MapFunction) bs -> { +// Result ret = new Result(); +// +// ret.setId(bs.getId()); +// +// ret.setMeasures(getMeasure(bs)); +// +// return ret; +// }, Encoders.bean(Result.class)) +// .toJavaRDD() +// .map(p -> new AtomicAction(Result.class, p)) +// .mapToPair( +// aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), +// new Text(OBJECT_MAPPER.writeValueAsString(aa)))) +// .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + +// } +// +// private static List getMeasure(BipScore value) { +// return value +// .getScoreList() +// .stream() +// .map(score -> { +// Measure m = new Measure(); +// m.setId(score.getId()); +// m +// .setUnit( +// score +// .getUnit() +// .stream() +// .map(unit -> { +// KeyValue kv = new KeyValue(); +// kv.setValue(unit.getValue()); +// kv.setKey(unit.getKey()); +// kv +// .setDataInfo( +// OafMapperUtils +// .dataInfo( +// false, +// UPDATE_DATA_INFO_TYPE, +// true, +// false, +// OafMapperUtils +// .qualifier( +// UPDATE_MEASURE_BIP_CLASS_ID, +// UPDATE_CLASS_NAME, +// ModelConstants.DNET_PROVENANCE_ACTIONS, +// ModelConstants.DNET_PROVENANCE_ACTIONS), +// "")); +// return kv; +// }) +// .collect(Collectors.toList())); +// return m; +// }) +// .collect(Collectors.toList()); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java new file mode 100644 index 000000000..450a8c175 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java @@ -0,0 +1,26 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class AffiliationRelationDeserializer implements Serializable { + @JsonProperty("DOI") + private String doi; + @JsonProperty("Matchings") + private List matchings; + + @Data + public static class Matching implements Serializable { + @JsonProperty("RORid") + private List rorId; + @JsonProperty("Confidence") + private double confidence; + + } + +} + diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java new file mode 100644 index 000000000..8689914ee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class AffiliationRelationModel implements Serializable { + private String doi; + private String rorId; + private double confidence; +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json new file mode 100644 index 000000000..7663a454b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the URL from where to get the programme file", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java new file mode 100644 index 000000000..eba53ccdb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -0,0 +1,139 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class PrepareAffiliationRelationsTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(PrepareAffiliationRelationsTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); + + log.info("Using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testMatch() throws Exception { + String affiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); + + PrepareAffiliationRelations + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + affiliationRelationsPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + +// JavaRDD tmp = sc +// .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) +// .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) +// .map(aa -> ((Result) aa.getPayload())); +// +// assertEquals(4, tmp.count()); +// +// Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class)); +// verificationDataset.createOrReplaceTempView("result"); +// +// Dataset execVerification = spark +// .sql( +// "Select p.id oaid, mes.id, mUnit.value from result p " + +// "lateral view explode(measures) m as mes " + +// "lateral view explode(mes.unit) u as mUnit "); +// +// Assertions.assertEquals(12, execVerification.count()); +// Assertions +// .assertEquals( +// "6.63451994567e-09", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'influence'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// Assertions +// .assertEquals( +// "0.348694533145", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'popularity_alt'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// Assertions +// .assertEquals( +// "2.16094680115e-09", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'popularity'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// + } +} + diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json new file mode 100644 index 000000000..3b067dcc8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json @@ -0,0 +1,6 @@ +{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]} +{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]} +{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]} +{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]} +{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]} +{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]} \ No newline at end of file From bbc245696e2b17e0123c1343023d53d1dbb4930b Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 15:56:12 +0300 Subject: [PATCH 02/10] Prepare actionsets for BIP affiliations --- .../PrepareAffiliationRelations.java | 170 ++++++++++-------- 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index dce9082fd..adb198fb3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -5,6 +5,8 @@ import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -12,10 +14,16 @@ import java.util.stream.Stream; import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -31,6 +39,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; /** * created the Atomic Action for each tipe of results @@ -40,6 +49,12 @@ public class PrepareAffiliationRelations implements Serializable { private static final String DOI = "doi"; private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________::"; + + private static final String TRUST = "0.91"; + + public static final String BIP_AFFILIATIONS_CLASSID = "sysimport:crosswalk:bipaffiliations"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Imported from BIP! Affiliations"; public static void main(String[] args) throws Exception { @@ -76,93 +91,90 @@ public class PrepareAffiliationRelations implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD affiliationRelationsDeserializeRDD = sc .textFile(inputPath) .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); // for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ // System.out.println(rel); // } - JavaRDD affiliationRelations = - bipDeserializeJavaRDD.flatMap(entry -> - entry.getMatchings().stream().flatMap(matching -> - matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence() - ))).collect(Collectors.toList()).iterator()); - for(AffiliationRelationModel rel: affiliationRelations.collect()){ - System.out.println(rel); - } -// Dataset relations = spark -// .createDataset(bipDeserializeJavaRDD.flatMap(entry -> { -//// entry.keySet().stream().map(key -> { -// AffiliationRelationModel rel = new AffiliationRelationModel(entry.getDoi()) -// System.out.println(entry); -// return entry; -//// BipScore bs = new BipScore(); -//// bs.setId(key); -//// bs.setScoreList(entry.get(key)); -//// return bs; -// }).collect(Collectors.toList()).iterator()).rdd(), Encoßders.bean(AffiliationRelationModel.class)); + Dataset affiliationRelations = + spark.createDataset( + affiliationRelationsDeserializeRDD.flatMap(entry -> + entry.getMatchings().stream().flatMap(matching -> + matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence() + ))).collect(Collectors.toList()) + .iterator()) + .rdd(), + Encoders.bean(AffiliationRelationModel.class)); -// bipScores -// -// .map((MapFunction) bs -> { -// Result ret = new Result(); -// -// ret.setId(bs.getId()); -// -// ret.setMeasures(getMeasure(bs)); -// -// return ret; -// }, Encoders.bean(Result.class)) -// .toJavaRDD() -// .map(p -> new AtomicAction(Result.class, p)) -// .mapToPair( -// aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), -// new Text(OBJECT_MAPPER.writeValueAsString(aa)))) -// .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + affiliationRelations + .map((MapFunction) affRel -> { + + String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + final String affId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("ror", affRel.getRorId())); + + return getRelation(paperId, affId, ModelConstants.HAS_AUTHOR_INSTITUTION); + + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); -// } -// -// private static List getMeasure(BipScore value) { -// return value -// .getScoreList() -// .stream() -// .map(score -> { -// Measure m = new Measure(); -// m.setId(score.getId()); -// m -// .setUnit( -// score -// .getUnit() -// .stream() -// .map(unit -> { -// KeyValue kv = new KeyValue(); -// kv.setValue(unit.getValue()); -// kv.setKey(unit.getKey()); -// kv -// .setDataInfo( -// OafMapperUtils -// .dataInfo( -// false, -// UPDATE_DATA_INFO_TYPE, -// true, -// false, -// OafMapperUtils -// .qualifier( -// UPDATE_MEASURE_BIP_CLASS_ID, -// UPDATE_CLASS_NAME, -// ModelConstants.DNET_PROVENANCE_ACTIONS, -// ModelConstants.DNET_PROVENANCE_ACTIONS), -// "")); -// return kv; -// }) -// .collect(Collectors.toList())); -// return m; -// }) -// .collect(Collectors.toList()); } + + public static Relation getRelation(String source, String target, String relclass) { + Relation r = new Relation(); + + r.setCollectedfrom(getCollectedFrom()); + r.setSource(source); + r.setTarget(target); + r.setRelClass(relclass); + r.setRelType(ModelConstants.RESULT_ORGANIZATION); + r.setSubRelType(ModelConstants.AFFILIATION); + r.setDataInfo(getDataInfo()); + return r; + } + + public static List getCollectedFrom() { + KeyValue kv = new KeyValue(); + kv.setKey(ModelConstants.DNET_PROVENANCE_ACTIONS); + kv.setValue(ModelConstants.DNET_PROVENANCE_ACTIONS); + + return Collections.singletonList(kv); + } + + public static DataInfo getDataInfo() { + DataInfo di = new DataInfo(); + di.setInferred(false); + di.setDeletedbyinference(false); + di.setTrust(TRUST); + di.setProvenanceaction( + getQualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS + )); + return di; + } + + public static Qualifier getQualifier(String class_id, String class_name, + String qualifierSchema) { + Qualifier pa = new Qualifier(); + pa.setClassid(class_id); + pa.setClassname(class_name); + pa.setSchemeid(qualifierSchema); + pa.setSchemename(qualifierSchema); + return pa; + } + } From 12528ed2efc9cc20bfbf1336c78fb16a69c7553e Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 18:08:33 +0300 Subject: [PATCH 03/10] Refactor PrepareAffiliationRelations.java to use OafMapperUtils common functions --- .../PrepareAffiliationRelations.java | 115 +++++++----------- .../ror/GenerateRorActionSetJob.java | 2 +- 2 files changed, 42 insertions(+), 75 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index adb198fb3..20c649a74 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -1,19 +1,14 @@ package eu.dnetlib.dhp.actionmanager.bipaffiliations; -import static eu.dnetlib.dhp.actionmanager.Constants.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; import java.util.stream.Collectors; -import java.util.stream.Stream; import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; @@ -25,7 +20,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,16 +27,14 @@ import org.apache.spark.sql.Dataset; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; /** - * created the Atomic Action for each tipe of results + * Creates action sets for Crossref affiliation relations inferred by BIP! */ public class PrepareAffiliationRelations implements Serializable { @@ -50,19 +42,17 @@ public class PrepareAffiliationRelations implements Serializable { private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String ID_PREFIX = "50|doi_________::"; - - private static final String TRUST = "0.91"; - - public static final String BIP_AFFILIATIONS_CLASSID = "sysimport:crosswalk:bipaffiliations"; - public static final String BIP_AFFILIATIONS_CLASSNAME = "Imported from BIP! Affiliations"; + public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; + public static final String BIP_INFERENCE_PROVENANCE = "bip_affiliation"; public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils - .toString( - PrepareAffiliationRelations.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -91,14 +81,12 @@ public class PrepareAffiliationRelations implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + // load and parse affiliation relations from HDFS JavaRDD affiliationRelationsDeserializeRDD = sc .textFile(inputPath) .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); -// for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ -// System.out.println(rel); -// } - + // convert affiliation to an internal representation Dataset affiliationRelations = spark.createDataset( affiliationRelationsDeserializeRDD.flatMap(entry -> @@ -112,15 +100,40 @@ public class PrepareAffiliationRelations implements Serializable { .rdd(), Encoders.bean(AffiliationRelationModel.class)); + // prepare action sets for affiliation relations affiliationRelations .map((MapFunction) affRel -> { - String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - final String affId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("ror", affRel.getRorId())); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - return getRelation(paperId, affId, ModelConstants.HAS_AUTHOR_INSTITUTION); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + + Qualifier qualifier = OafMapperUtils.qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); + + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils.dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(affRel.getConfidence())); + + return OafMapperUtils.getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null); }, Encoders.bean(Relation.class)) .toJavaRDD() @@ -131,50 +144,4 @@ public class PrepareAffiliationRelations implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } - - public static Relation getRelation(String source, String target, String relclass) { - Relation r = new Relation(); - - r.setCollectedfrom(getCollectedFrom()); - r.setSource(source); - r.setTarget(target); - r.setRelClass(relclass); - r.setRelType(ModelConstants.RESULT_ORGANIZATION); - r.setSubRelType(ModelConstants.AFFILIATION); - r.setDataInfo(getDataInfo()); - return r; - } - - public static List getCollectedFrom() { - KeyValue kv = new KeyValue(); - kv.setKey(ModelConstants.DNET_PROVENANCE_ACTIONS); - kv.setValue(ModelConstants.DNET_PROVENANCE_ACTIONS); - - return Collections.singletonList(kv); - } - - public static DataInfo getDataInfo() { - DataInfo di = new DataInfo(); - di.setInferred(false); - di.setDeletedbyinference(false); - di.setTrust(TRUST); - di.setProvenanceaction( - getQualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS - )); - return di; - } - - public static Qualifier getQualifier(String class_id, String class_name, - String qualifierSchema) { - Qualifier pa = new Qualifier(); - pa.setClassid(class_id); - pa.setClassname(class_name); - pa.setSchemeid(qualifierSchema); - pa.setSchemename(qualifierSchema); - return pa; - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java index 1be2a96fd..5f3493d56 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/ror/GenerateRorActionSetJob.java @@ -168,7 +168,7 @@ public class GenerateRorActionSetJob { } - private static String calculateOpenaireId(final String rorId) { + public static String calculateOpenaireId(final String rorId) { return String.format("20|%s::%s", Constants.ROR_NS_PREFIX, DHPUtils.md5(rorId)); } From bc7b00bcd170140f6db127dbb62623825d4f47f7 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 18:29:15 +0300 Subject: [PATCH 04/10] Add bi-directional affiliation relations --- .../PrepareAffiliationRelations.java | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 20c649a74..45e712c7e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -4,6 +4,8 @@ package eu.dnetlib.dhp.actionmanager.bipaffiliations; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import java.util.stream.Collectors; import eu.dnetlib.dhp.actionmanager.Constants; @@ -18,6 +20,7 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; @@ -38,7 +41,6 @@ import scala.Tuple2; */ public class PrepareAffiliationRelations implements Serializable { - private static final String DOI = "doi"; private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String ID_PREFIX = "50|doi_________::"; @@ -102,7 +104,7 @@ public class PrepareAffiliationRelations implements Serializable { // prepare action sets for affiliation relations affiliationRelations - .map((MapFunction) affRel -> { + .flatMap((FlatMapFunction) affRel -> { // DOI to OpenAIRE id final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); @@ -125,15 +127,8 @@ public class PrepareAffiliationRelations implements Serializable { qualifier, Double.toString(affRel.getConfidence())); - return OafMapperUtils.getRelation( - paperId, - affId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - null, - dataInfo, - null); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); }, Encoders.bean(Relation.class)) .toJavaRDD() @@ -144,4 +139,27 @@ public class PrepareAffiliationRelations implements Serializable { .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } + + private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + return Arrays.asList( + OafMapperUtils.getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null), + OafMapperUtils.getRelation( + affId, + paperId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + null, + dataInfo, + null) + ); + } } From c2998a14e8e998cc496a8e8db803e48bafdb43e4 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 20:28:16 +0300 Subject: [PATCH 05/10] Add basic tests for affiliation relations --- .../PrepareAffiliationRelationsTest.java | 108 +++++++++--------- 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index eba53ccdb..c76fcf6a9 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -5,8 +5,11 @@ import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -26,7 +29,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.Result; public class PrepareAffiliationRelationsTest { @@ -35,6 +37,7 @@ public class PrepareAffiliationRelationsTest { private static SparkSession spark; private static Path workingDir; + private static final String ID_PREFIX = "50|doi_________::"; private static final Logger log = LoggerFactory .getLogger(PrepareAffiliationRelationsTest.class); @@ -69,71 +72,64 @@ public class PrepareAffiliationRelationsTest { @Test void testMatch() throws Exception { + String affiliationRelationsPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getPath(); + String outputPath = workingDir.toString() + "/actionSet"; + PrepareAffiliationRelations .main( new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - affiliationRelationsPath, - "-outputPath", - workingDir.toString() + "/actionSet" + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", affiliationRelationsPath, + "-outputPath", outputPath }); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); -// JavaRDD tmp = sc -// .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) -// .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) -// .map(aa -> ((Result) aa.getPayload())); -// -// assertEquals(4, tmp.count()); -// -// Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class)); -// verificationDataset.createOrReplaceTempView("result"); -// -// Dataset execVerification = spark -// .sql( -// "Select p.id oaid, mes.id, mUnit.value from result p " + -// "lateral view explode(measures) m as mes " + -// "lateral view explode(mes.unit) u as mUnit "); -// -// Assertions.assertEquals(12, execVerification.count()); -// Assertions -// .assertEquals( -// "6.63451994567e-09", execVerification -// .filter( -// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + -// "and id = 'influence'") -// .select("value") -// .collectAsList() -// .get(0) -// .getString(0)); -// Assertions -// .assertEquals( -// "0.348694533145", execVerification -// .filter( -// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + -// "and id = 'popularity_alt'") -// .select("value") -// .collectAsList() -// .get(0) -// .getString(0)); -// Assertions -// .assertEquals( -// "2.16094680115e-09", execVerification -// .filter( -// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + -// "and id = 'popularity'") -// .select("value") -// .collectAsList() -// .get(0) -// .getString(0)); -// + JavaRDD tmp = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); + + for (Relation r : tmp.collect()) { + System.out.println( + r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() + ); + } + // count the number of relations + assertEquals(16, tmp.count()); + + Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + dataset.createOrReplaceTempView("result"); + + Dataset execVerification = spark.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); + + // verify that we have equal number of bi-directional relations + Assertions.assertEquals(8, execVerification + .filter( + "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION +"'") + .collectAsList() + .size()); + + Assertions.assertEquals(8, execVerification + .filter( + "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF +"'") + .collectAsList() + .size()); + + // check confidence value of a specific relation + String sourceDOI = "10.1105/tpc.8.3.343"; + + final String sourceOpenaireId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); + + Assertions.assertEquals("0.7071067812", execVerification + .filter( + "source='" + sourceOpenaireId +"'") + .collectAsList().get(0).getString(4)); + } } From 4eba14a80e25d17cd476cc1f0ffe420654b50726 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 21:07:50 +0300 Subject: [PATCH 06/10] Add oozie workflow --- .../oozie_app/config-default.xml | 30 +++++ .../bipaffiliations/oozie_app/workflow.xml | 108 ++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml new file mode 100644 index 000000000..d262cb6e0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/config-default.xml @@ -0,0 +1,30 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml new file mode 100644 index 000000000..7c44bb7eb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -0,0 +1,108 @@ + + + + + inputPath + the path where to find the inferred affiliation relations + + + outputPath + the path where to store the actionset + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + + yarn + cluster + Produces the atomic action with the inferred by BIP! affiliation relations from Crossref + eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations/class> + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${inputPath} + --outputPath${outputPath} + + + + + + + \ No newline at end of file From bc1a4611aacfa1358baf49d24955c42612ae0050 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Mon, 17 Jul 2023 11:17:53 +0300 Subject: [PATCH 07/10] Minor changes --- .../PrepareAffiliationRelations.java | 2 +- .../bipaffiliations/job.properties | 42 +++++++++++++++++++ .../bipaffiliations/oozie_app/workflow.xml | 1 - .../PrepareAffiliationRelationsTest.java | 10 ++--- 4 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 45e712c7e..44870c0f8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -46,7 +46,7 @@ public class PrepareAffiliationRelations implements Serializable { private static final String ID_PREFIX = "50|doi_________::"; public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; - public static final String BIP_INFERENCE_PROVENANCE = "bip_affiliation"; + public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; public static void main(String[] args) throws Exception { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties new file mode 100644 index 000000000..dce59a31f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -0,0 +1,42 @@ +# --- You can override the following properties (if needed) coming from your ~/.dhp/application.properties --- +# dhp.hadoop.frontend.temp.dir=/home/ilias.kanellos +# dhp.hadoop.frontend.user.name=ilias.kanellos +# dhp.hadoop.frontend.host.name=iis-cdh5-test-gw.ocean.icm.edu.pl +# dhp.hadoop.frontend.port.ssh=22 +# oozieServiceLoc=http://iis-cdh5-test-m3:11000/oozie +# jobTracker=yarnRM +# nameNode=hdfs://nameservice1 +# oozie.execution.log.file.location = target/extract-and-run-on-remote-host.log +# maven.executable=mvn + +# Some memory and driver settings for more demanding tasks +sparkDriverMemory=10G +sparkExecutorMemory=10G +sparkExecutorCores=4 +sparkShufflePartitions=7680 + +# The above is given differently in an example I found online +oozie.action.sharelib.for.spark=spark2 +oozieActionShareLibForSpark2=spark2 +spark2YarnHistoryServerAddress=http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 +spark2EventLogDir=/user/spark/spark2ApplicationHistory +sparkSqlWarehouseDir=/user/hive/warehouse +hiveMetastoreUris=thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 +# This MAY avoid the no library used error +oozie.use.system.libpath=true +# Some stuff copied from openaire's jobs +spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener +spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener + +# I think this should be the oozie workflow directory +# oozieWorkflowPath=/user/ilias.kanellos/workflow_example/ + + +# The workflow application path +wfAppPath=${oozieTopWfApplicationPath} + +# The following is needed as a property of a workflow +oozie.wf.application.path=${oozieTopWfApplicationPath} + +inputPath=/user/schatz/affiliations/data-v3.json +outputPath=/tmp/crossref-affiliations-output-v3 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 7c44bb7eb..31f35adfd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -79,7 +79,6 @@ - yarn diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index c76fcf6a9..7e2fc5a39 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -94,11 +94,11 @@ public class PrepareAffiliationRelationsTest { .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Relation) aa.getPayload())); - for (Relation r : tmp.collect()) { - System.out.println( - r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() - ); - } +// for (Relation r : tmp.collect()) { +// System.out.println( +// r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() +// ); +// } // count the number of relations assertEquals(16, tmp.count()); From be320ba3c1c013d11da957acbce843b00bcae484 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Mon, 17 Jul 2023 16:04:21 +0300 Subject: [PATCH 08/10] Indentation fixes --- .../dnetlib/dhp/actionmanager/Constants.java | 3 +- .../PrepareAffiliationRelations.java | 228 ++++++++++-------- .../AffiliationRelationDeserializer.java | 31 +-- .../model/AffiliationRelationModel.java | 11 +- .../bipaffiliations/job.properties | 11 +- .../bipaffiliations/oozie_app/workflow.xml | 4 +- .../PrepareAffiliationRelationsTest.java | 162 +++++++------ 7 files changed, 235 insertions(+), 215 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 8b0dab3a7..62556b16b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.actionmanager; import java.util.Optional; -import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -12,6 +11,7 @@ import org.apache.spark.sql.SparkSession; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.Subject; @@ -94,6 +94,7 @@ public class Constants { return s; } + public static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 44870c0f8..9b5d4a2ca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -8,12 +8,6 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; -import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; -import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -22,17 +16,23 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.sql.Dataset; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; @@ -41,125 +41,139 @@ import scala.Tuple2; */ public class PrepareAffiliationRelations implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ID_PREFIX = "50|doi_________::"; - public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; - public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; - public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; + private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String ID_PREFIX = "50|doi_________::"; + public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:bipinference"; + public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by BIP!"; + public static final String BIP_INFERENCE_PROVENANCE = "bip:affiliation:crossref"; - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareAffiliationRelations.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + String jsonConfiguration = IOUtils + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); - SparkConf conf = new SparkConf(); + SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Constants.removeOutputDir(spark, outputPath); - prepareAffiliationRelations(spark, inputPath, outputPath); - }); - } + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Constants.removeOutputDir(spark, outputPath); + prepareAffiliationRelations(spark, inputPath, outputPath); + }); + } - private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { + private static void prepareAffiliationRelations(SparkSession spark, String inputPath, + String outputPath) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // load and parse affiliation relations from HDFS - JavaRDD affiliationRelationsDeserializeRDD = sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + // load and parse affiliation relations from HDFS + JavaRDD affiliationRelationsDeserializeRDD = sc + .textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); - // convert affiliation to an internal representation - Dataset affiliationRelations = - spark.createDataset( - affiliationRelationsDeserializeRDD.flatMap(entry -> - entry.getMatchings().stream().flatMap(matching -> - matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence() - ))).collect(Collectors.toList()) - .iterator()) - .rdd(), - Encoders.bean(AffiliationRelationModel.class)); + // convert affiliation to an internal representation + Dataset affiliationRelations = spark + .createDataset( + affiliationRelationsDeserializeRDD + .flatMap( + entry -> entry + .getMatchings() + .stream() + .flatMap( + matching -> matching + .getRorId() + .stream() + .map( + rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence()))) + .collect(Collectors.toList()) + .iterator()) + .rdd(), + Encoders.bean(AffiliationRelationModel.class)); - // prepare action sets for affiliation relations - affiliationRelations - .flatMap((FlatMapFunction) affRel -> { + // prepare action sets for affiliation relations + affiliationRelations + .flatMap((FlatMapFunction) affRel -> { - // DOI to OpenAIRE id - final String paperId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); - // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); - Qualifier qualifier = OafMapperUtils.qualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS); + Qualifier qualifier = OafMapperUtils + .qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); - // format data info; setting `confidence` into relation's `trust` - DataInfo dataInfo = OafMapperUtils.dataInfo( - false, - BIP_INFERENCE_PROVENANCE, - true, - false, - qualifier, - Double.toString(affRel.getConfidence())); + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(affRel.getConfidence())); - // return bi-directional relations - return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() - .map(p -> new AtomicAction(Relation.class, p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + }, Encoders.bean(Relation.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - } + } - private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { - return Arrays.asList( - OafMapperUtils.getRelation( - paperId, - affId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.HAS_AUTHOR_INSTITUTION, - null, - dataInfo, - null), - OafMapperUtils.getRelation( - affId, - paperId, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - ModelConstants.IS_AUTHOR_INSTITUTION_OF, - null, - dataInfo, - null) - ); - } + private static List getAffiliationRelationPair(String paperId, String affId, DataInfo dataInfo) { + return Arrays + .asList( + OafMapperUtils + .getRelation( + paperId, + affId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.HAS_AUTHOR_INSTITUTION, + null, + dataInfo, + null), + OafMapperUtils + .getRelation( + affId, + paperId, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF, + null, + dataInfo, + null)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java index 450a8c175..ef4f200d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java @@ -1,26 +1,27 @@ -package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Data; +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; import java.io.Serializable; import java.util.List; +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.Data; + @Data public class AffiliationRelationDeserializer implements Serializable { - @JsonProperty("DOI") - private String doi; - @JsonProperty("Matchings") - private List matchings; + @JsonProperty("DOI") + private String doi; + @JsonProperty("Matchings") + private List matchings; - @Data - public static class Matching implements Serializable { - @JsonProperty("RORid") - private List rorId; - @JsonProperty("Confidence") - private double confidence; + @Data + public static class Matching implements Serializable { + @JsonProperty("RORid") + private List rorId; + @JsonProperty("Confidence") + private double confidence; - } + } } - diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java index 8689914ee..6509f56ac 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java @@ -1,15 +1,16 @@ + package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; +import java.io.Serializable; + import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import java.io.Serializable; - @Data @AllArgsConstructor public class AffiliationRelationModel implements Serializable { - private String doi; - private String rorId; - private double confidence; + private String doi; + private String rorId; + private double confidence; } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index dce59a31f..43d86ee09 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -28,15 +28,8 @@ oozie.use.system.libpath=true spark2ExtraListeners=com.cloudera.spark.lineage.NavigatorAppListener spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener -# I think this should be the oozie workflow directory -# oozieWorkflowPath=/user/ilias.kanellos/workflow_example/ - - -# The workflow application path -wfAppPath=${oozieTopWfApplicationPath} - # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -inputPath=/user/schatz/affiliations/data-v3.json -outputPath=/tmp/crossref-affiliations-output-v3 +inputPath=/user/schatz/affiliations/data-v3.1.json +outputPath=/tmp/crossref-affiliations-output-v3.1 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 31f35adfd..9930cfe17 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + @@ -84,7 +84,7 @@ yarn cluster Produces the atomic action with the inferred by BIP! affiliation relations from Crossref - eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations/class> + eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations dhp-aggregation-${projectVersion}.jar --executor-memory=${sparkExecutorMemory} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index 7e2fc5a39..72aabde7f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.actionmanager.bipaffiliations; import static org.junit.jupiter.api.Assertions.*; @@ -6,10 +7,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -29,107 +26,120 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.utils.CleaningFunctions; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; public class PrepareAffiliationRelationsTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static SparkSession spark; + private static SparkSession spark; - private static Path workingDir; - private static final String ID_PREFIX = "50|doi_________::"; - private static final Logger log = LoggerFactory - .getLogger(PrepareAffiliationRelationsTest.class); + private static Path workingDir; + private static final String ID_PREFIX = "50|doi_________::"; + private static final Logger log = LoggerFactory + .getLogger(PrepareAffiliationRelationsTest.class); - @BeforeAll - public static void beforeAll() throws IOException { - workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); - log.info("Using work dir {}", workingDir); + log.info("Using work dir {}", workingDir); - SparkConf conf = new SparkConf(); - conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); - conf.setMaster("local[*]"); - conf.set("spark.driver.host", "localhost"); - conf.set("hive.metastore.local", "true"); - conf.set("spark.ui.enabled", "false"); - conf.set("spark.sql.warehouse.dir", workingDir.toString()); - conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); - spark = SparkSession - .builder() - .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) - .config(conf) - .getOrCreate(); - } + spark = SparkSession + .builder() + .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } - @AfterAll - public static void afterAll() throws IOException { - FileUtils.deleteDirectory(workingDir.toFile()); - spark.stop(); - } + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } - @Test - void testMatch() throws Exception { + @Test + void testMatch() throws Exception { - String affiliationRelationsPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") - .getPath(); + String affiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); - String outputPath = workingDir.toString() + "/actionSet"; + String outputPath = workingDir.toString() + "/actionSet"; - PrepareAffiliationRelations - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputPath", affiliationRelationsPath, - "-outputPath", outputPath - }); + PrepareAffiliationRelations + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", affiliationRelationsPath, + "-outputPath", outputPath + }); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc - .sequenceFile(outputPath, Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Relation) aa.getPayload())); + JavaRDD tmp = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Relation) aa.getPayload())); // for (Relation r : tmp.collect()) { // System.out.println( // r.getSource() + "\t" + r.getTarget() + "\t" + r.getRelType() + "\t" + r.getRelClass() + "\t" + r.getSubRelType() + "\t" + r.getValidationDate() + "\t" + r.getDataInfo().getTrust() + "\t" + r.getDataInfo().getInferred() // ); // } - // count the number of relations - assertEquals(16, tmp.count()); + // count the number of relations + assertEquals(16, tmp.count()); - Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); - dataset.createOrReplaceTempView("result"); + Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + dataset.createOrReplaceTempView("result"); - Dataset execVerification = spark.sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); + Dataset execVerification = spark + .sql("select r.relType, r.relClass, r.source, r.target, r.dataInfo.trust from result r"); - // verify that we have equal number of bi-directional relations - Assertions.assertEquals(8, execVerification - .filter( - "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION +"'") - .collectAsList() - .size()); + // verify that we have equal number of bi-directional relations + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") + .collectAsList() + .size()); - Assertions.assertEquals(8, execVerification - .filter( - "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF +"'") - .collectAsList() - .size()); + Assertions + .assertEquals( + 8, execVerification + .filter( + "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") + .collectAsList() + .size()); - // check confidence value of a specific relation - String sourceDOI = "10.1105/tpc.8.3.343"; + // check confidence value of a specific relation + String sourceDOI = "10.1105/tpc.8.3.343"; - final String sourceOpenaireId = ID_PREFIX + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); + final String sourceOpenaireId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", sourceDOI)); - Assertions.assertEquals("0.7071067812", execVerification - .filter( - "source='" + sourceOpenaireId +"'") - .collectAsList().get(0).getString(4)); + Assertions + .assertEquals( + "0.7071067812", execVerification + .filter( + "source='" + sourceOpenaireId + "'") + .collectAsList() + .get(0) + .getString(4)); - } + } } - From ebfba38ab604ef6dd6a8ba38a280e9ca04b2ff70 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 28 Jul 2023 19:03:47 +0300 Subject: [PATCH 09/10] Add changes from code review --- .../PrepareAffiliationRelations.java | 58 +++++++------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 9b5d4a2ca..381558aae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -10,15 +10,15 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,45 +82,32 @@ public class PrepareAffiliationRelations implements Serializable { private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // load and parse affiliation relations from HDFS - JavaRDD affiliationRelationsDeserializeRDD = sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + Dataset df = spark + .read() + .schema("`DOI` STRING, `Matchings` ARRAY,`Confidence`:DOUBLE>>") + .json(inputPath); - // convert affiliation to an internal representation - Dataset affiliationRelations = spark - .createDataset( - affiliationRelationsDeserializeRDD - .flatMap( - entry -> entry - .getMatchings() - .stream() - .flatMap( - matching -> matching - .getRorId() - .stream() - .map( - rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence()))) - .collect(Collectors.toList()) - .iterator()) - .rdd(), - Encoders.bean(AffiliationRelationModel.class)); + // unroll nested arrays + df = df + .withColumn("matching", functions.explode(new Column("Matchings"))) + .withColumn("rorid", functions.explode(new Column("matching.RORid"))) + .select( + new Column("DOI").as("doi"), + new Column("rorid"), + new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations - affiliationRelations - .flatMap((FlatMapFunction) affRel -> { + df + .toJavaRDD() + .flatMap((FlatMapFunction) row -> { // DOI to OpenAIRE id final String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); Qualifier qualifier = OafMapperUtils .qualifier( @@ -137,18 +124,17 @@ public class PrepareAffiliationRelations implements Serializable { true, false, qualifier, - Double.toString(affRel.getConfidence())); + Double.toString(row.getAs("confidence"))); // return bi-directional relations return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() + }) .map(p -> new AtomicAction(Relation.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); } From 7cefe2665bbd57472d9ea6e08a593f03a05cbc25 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 28 Jul 2023 19:14:39 +0300 Subject: [PATCH 10/10] Remove unnecessary classes --- .../PrepareAffiliationRelations.java | 5 ---- .../AffiliationRelationDeserializer.java | 27 ------------------- .../model/AffiliationRelationModel.java | 16 ----------- 3 files changed, 48 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 381558aae..a9c610de7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -6,17 +6,13 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.slf4j.Logger; @@ -25,7 +21,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.Constants; -import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; import eu.dnetlib.dhp.actionmanager.ror.GenerateRorActionSetJob; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java deleted file mode 100644 index ef4f200d0..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java +++ /dev/null @@ -1,27 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; - -import java.io.Serializable; -import java.util.List; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import lombok.Data; - -@Data -public class AffiliationRelationDeserializer implements Serializable { - @JsonProperty("DOI") - private String doi; - @JsonProperty("Matchings") - private List matchings; - - @Data - public static class Matching implements Serializable { - @JsonProperty("RORid") - private List rorId; - @JsonProperty("Confidence") - private double confidence; - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java deleted file mode 100644 index 6509f56ac..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java +++ /dev/null @@ -1,16 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; - -import java.io.Serializable; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@AllArgsConstructor -public class AffiliationRelationModel implements Serializable { - private String doi; - private String rorId; - private double confidence; -}