From 347a889b20bcfa9f7f9f4bd67bf71086ac442d58 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 6 Jul 2023 00:51:01 +0300 Subject: [PATCH] Read affiliation relations --- .../dnetlib/dhp/actionmanager/Constants.java | 5 + .../PrepareAffiliationRelations.java | 168 ++++++++++++++++++ .../AffiliationRelationDeserializer.java | 26 +++ .../model/AffiliationRelationModel.java | 15 ++ .../input_actionset_parameter.json | 20 +++ .../PrepareAffiliationRelationsTest.java | 139 +++++++++++++++ .../bipaffiliations/doi_to_ror.json | 6 + 7 files changed, 379 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 1f2145d66..8b0dab3a7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.actionmanager; import java.util.Optional; +import eu.dnetlib.dhp.common.HdfsSupport; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -93,4 +94,8 @@ public class Constants { return s; } + public static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java new file mode 100644 index 000000000..dce9082fd --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -0,0 +1,168 @@ + +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import eu.dnetlib.dhp.actionmanager.Constants; +import eu.dnetlib.dhp.actionmanager.bipaffiliations.model.*; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.spark.sql.Dataset; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; + +/** + * created the Atomic Action for each tipe of results + */ +public class PrepareAffiliationRelations implements Serializable { + + private static final String DOI = "doi"; + private static final Logger log = LoggerFactory.getLogger(PrepareAffiliationRelations.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareAffiliationRelations.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Constants.removeOutputDir(spark, outputPath); + prepareAffiliationRelations(spark, inputPath, outputPath); + }); + } + + private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD bipDeserializeJavaRDD = sc + .textFile(inputPath) + .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + +// for(AffiliationRelationDeserializer rel: bipDeserializeJavaRDD.collect()){ +// System.out.println(rel); +// } + JavaRDD affiliationRelations = + bipDeserializeJavaRDD.flatMap(entry -> + entry.getMatchings().stream().flatMap(matching -> + matching.getRorId().stream().map( rorId -> new AffiliationRelationModel( + entry.getDoi(), + rorId, + matching.getConfidence() + ))).collect(Collectors.toList()).iterator()); + + for(AffiliationRelationModel rel: affiliationRelations.collect()){ + System.out.println(rel); + } +// Dataset relations = spark +// .createDataset(bipDeserializeJavaRDD.flatMap(entry -> { +//// entry.keySet().stream().map(key -> { +// AffiliationRelationModel rel = new AffiliationRelationModel(entry.getDoi()) +// System.out.println(entry); +// return entry; +//// BipScore bs = new BipScore(); +//// bs.setId(key); +//// bs.setScoreList(entry.get(key)); +//// return bs; +// }).collect(Collectors.toList()).iterator()).rdd(), Encoßders.bean(AffiliationRelationModel.class)); + +// bipScores +// +// .map((MapFunction) bs -> { +// Result ret = new Result(); +// +// ret.setId(bs.getId()); +// +// ret.setMeasures(getMeasure(bs)); +// +// return ret; +// }, Encoders.bean(Result.class)) +// .toJavaRDD() +// .map(p -> new AtomicAction(Result.class, p)) +// .mapToPair( +// aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), +// new Text(OBJECT_MAPPER.writeValueAsString(aa)))) +// .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + +// } +// +// private static List getMeasure(BipScore value) { +// return value +// .getScoreList() +// .stream() +// .map(score -> { +// Measure m = new Measure(); +// m.setId(score.getId()); +// m +// .setUnit( +// score +// .getUnit() +// .stream() +// .map(unit -> { +// KeyValue kv = new KeyValue(); +// kv.setValue(unit.getValue()); +// kv.setKey(unit.getKey()); +// kv +// .setDataInfo( +// OafMapperUtils +// .dataInfo( +// false, +// UPDATE_DATA_INFO_TYPE, +// true, +// false, +// OafMapperUtils +// .qualifier( +// UPDATE_MEASURE_BIP_CLASS_ID, +// UPDATE_CLASS_NAME, +// ModelConstants.DNET_PROVENANCE_ACTIONS, +// ModelConstants.DNET_PROVENANCE_ACTIONS), +// "")); +// return kv; +// }) +// .collect(Collectors.toList())); +// return m; +// }) +// .collect(Collectors.toList()); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java new file mode 100644 index 000000000..450a8c175 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationDeserializer.java @@ -0,0 +1,26 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; + +@Data +public class AffiliationRelationDeserializer implements Serializable { + @JsonProperty("DOI") + private String doi; + @JsonProperty("Matchings") + private List matchings; + + @Data + public static class Matching implements Serializable { + @JsonProperty("RORid") + private List rorId; + @JsonProperty("Confidence") + private double confidence; + + } + +} + diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java new file mode 100644 index 000000000..8689914ee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/model/AffiliationRelationModel.java @@ -0,0 +1,15 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +@Data +@AllArgsConstructor +public class AffiliationRelationModel implements Serializable { + private String doi; + private String rorId; + private double confidence; +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json new file mode 100644 index 000000000..7663a454b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -0,0 +1,20 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false + }, + { + "paramName": "ip", + "paramLongName": "inputPath", + "paramDescription": "the URL from where to get the programme file", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "the path of the new ActionSet", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java new file mode 100644 index 000000000..eba53ccdb --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -0,0 +1,139 @@ +package eu.dnetlib.dhp.actionmanager.bipaffiliations; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class PrepareAffiliationRelationsTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + private static final Logger log = LoggerFactory + .getLogger(PrepareAffiliationRelationsTest.class); + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareAffiliationRelationsTest.class.getSimpleName()); + + log.info("Using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareAffiliationRelationsTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PrepareAffiliationRelationsTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testMatch() throws Exception { + String affiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); + + PrepareAffiliationRelations + .main( + new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + affiliationRelationsPath, + "-outputPath", + workingDir.toString() + "/actionSet" + }); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + +// JavaRDD tmp = sc +// .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) +// .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) +// .map(aa -> ((Result) aa.getPayload())); +// +// assertEquals(4, tmp.count()); +// +// Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class)); +// verificationDataset.createOrReplaceTempView("result"); +// +// Dataset execVerification = spark +// .sql( +// "Select p.id oaid, mes.id, mUnit.value from result p " + +// "lateral view explode(measures) m as mes " + +// "lateral view explode(mes.unit) u as mUnit "); +// +// Assertions.assertEquals(12, execVerification.count()); +// Assertions +// .assertEquals( +// "6.63451994567e-09", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'influence'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// Assertions +// .assertEquals( +// "0.348694533145", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'popularity_alt'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// Assertions +// .assertEquals( +// "2.16094680115e-09", execVerification +// .filter( +// "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + +// "and id = 'popularity'") +// .select("value") +// .collectAsList() +// .get(0) +// .getString(0)); +// + } +} + diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json new file mode 100644 index 000000000..3b067dcc8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json @@ -0,0 +1,6 @@ +{"DOI":"10.1061\/(asce)0733-9399(2002)128:7(759)","Matchings":[{"RORid":["https:\/\/ror.org\/01teme464"],"Confidence":0.73},{"RORid":["https:\/\/ror.org\/03yxnpp24"],"Confidence":0.7071067812}]} +{"DOI":"10.1105\/tpc.8.3.343","Matchings":[{"RORid":["https:\/\/ror.org\/02k40bc56"],"Confidence":0.7071067812}]} +{"DOI":"10.1161\/01.cir.0000013305.01850.37","Matchings":[{"RORid":["https:\/\/ror.org\/00qjgza05"],"Confidence":1}]} +{"DOI":"10.1142\/s021821650200186x","Matchings":[{"RORid":["https:\/\/ror.org\/05apxxy63"],"Confidence":1},{"RORid":["https:\/\/ror.org\/035xkbk20"],"Confidence":1}]} +{"DOI":"10.1061\/(asce)0733-9372(2002)128:7(575)","Matchings":[{"RORid":["https:\/\/ror.org\/04j198w64"],"Confidence":0.58}]} +{"DOI":"10.1161\/hy0202.103001","Matchings":[{"RORid":["https:\/\/ror.org\/057xtrt18"],"Confidence":0.7071067812}]} \ No newline at end of file