From 2a67ee13ecb1f62ca47ccbbc71f4d12651669b22 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 23 Dec 2021 10:37:52 +0100 Subject: [PATCH 1/2] [SDG] added model class --- .../createunresolvedentities/model/SDGDataModel.java | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java new file mode 100644 index 0000000000..9b0300bf2f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java @@ -0,0 +1,6 @@ +package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; + +import java.io.Serializable; + +public class SDGDataModel implements Serializable { +} From 7a1b4404132b2b2c8aedd2968a8c9eaa908ccedf Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Thu, 23 Dec 2021 13:24:28 +0100 Subject: [PATCH 2/2] [SDG] logic to create unresolved entities out of SDG input. This changes also some classes related to FOS to reuse the same code. The code under createunresolvedentities create results with the merged update of the the inputs provided (bip at the level of the isntance, fos and sdg for subjects) --- .../dnetlib/dhp/actionmanager/Constants.java | 10 +- .../{GetFOSData.java => GetInputData.java} | 21 ++- .../PrepareFOSSparkJob.java | 10 +- .../PrepareSDGSparkJob.java | 86 +++++++++ .../model/SDGDataModel.java | 44 ++++- ...ameters.json => get_input_parameters.json} | 0 .../oozie_app/workflow.xml | 39 +++- .../createunresolvedentities/PrepareTest.java | 136 +++++++++++--- .../createunresolvedentities/ProduceTest.java | 172 ++++++++++++++++++ .../createunresolvedentities/sdg/sdg.json | 37 ++++ .../createunresolvedentities/sdg/sdg_sbs.csv | 37 ++++ .../ExtractRelationFromEntityTest.java | 56 +++--- 12 files changed, 581 insertions(+), 67 deletions(-) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/{GetFOSData.java => GetInputData.java} (75%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/{get_fos_parameters.json => get_input_parameters.json} (100%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 153a98d3ff..ef4ef67563 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -19,14 +19,20 @@ public class Constants { public static final String DOI = "doi"; + public static final char DEFAULT_DELIMITER = ','; + public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; + public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; + public static final String SDG_CLASS_ID = "SDG"; + public static final String SDG_CLASS_NAME = "Sustainable Development Goals"; + public static final String NULL = "NULL"; public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -49,7 +55,7 @@ public class Constants { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - public static StructuredProperty getSubject(String sbj, String classid, String classname) { + public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) { if (sbj.equals(NULL)) return null; StructuredProperty sp = new StructuredProperty(); @@ -72,7 +78,7 @@ public class Constants { false, OafMapperUtils .qualifier( - UPDATE_SUBJECT_FOS_CLASS_ID, + diqualifierclassid, UPDATE_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java similarity index 75% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java index bd430d940f..499e42a8e2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java @@ -6,6 +6,7 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.util.Objects; import java.util.Optional; +import java.util.zip.GZIPInputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -15,13 +16,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.collection.GetCSV; -public class GetFOSData implements Serializable { +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; + +public class GetInputData implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(GetInputData.class); - private static final Logger log = LoggerFactory.getLogger(GetFOSData.class); - public static final char DEFAULT_DELIMITER = ','; public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -29,9 +31,9 @@ public class GetFOSData implements Serializable { .toString( Objects .requireNonNull( - GetFOSData.class + GetInputData.class .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json")))); + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json")))); parser.parseArgument(args); @@ -60,16 +62,17 @@ public class GetFOSData implements Serializable { FileSystem fileSystem = FileSystem.get(conf); - new GetFOSData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); + new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); } public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs) throws IOException, ClassNotFoundException { + // reads the csv and writes it as its json equivalent - try (InputStreamReader reader = new InputStreamReader(fs.open(new Path(inputPath)))) { - GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); + try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) { + eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java index c8f472db7c..fef7965156 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java @@ -66,20 +66,20 @@ public class PrepareFOSSparkJob implements Serializable { Dataset fosDataset = readPath(spark, sourcePath, FOSDataModel.class); fosDataset - .groupByKey((MapFunction) v -> v.getDoi(), Encoders.STRING()) + .groupByKey((MapFunction) v -> v.getDoi().toLowerCase(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result r = new Result(); FOSDataModel first = it.next(); - r.setId(DHPUtils.generateUnresolvedIdentifier(first.getDoi(), DOI)); + r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); HashSet level1 = new HashSet<>(); HashSet level2 = new HashSet<>(); HashSet level3 = new HashSet<>(); addLevels(level1, level2, level3, first); it.forEachRemaining(v -> addLevels(level1, level2, level3, v)); List sbjs = new ArrayList<>(); - level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); - level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); - level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); + level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); + level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); + level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); r.setSubject(sbjs); return r; }, Encoders.bean(Result.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java new file mode 100644 index 0000000000..b0607ead53 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -0,0 +1,86 @@ + +package eu.dnetlib.dhp.actionmanager.createunresolvedentities; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class PrepareSDGSparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareSDGSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String sourcePath = parser.get("sourcePath"); + log.info("sourcePath: {}", sourcePath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + doPrepare( + spark, + sourcePath, + + outputPath); + }); + } + + private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); + + + sdgDataset.groupByKey((MapFunction)r -> r.getDoi().toLowerCase(),Encoders.STRING()) + .mapGroups((MapGroupsFunction)(k,it) -> { + Result r = new Result(); + r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); + SDGDataModel first = it.next(); + Listsbjs = new ArrayList<>(); + sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); + it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); + r.setSubject(sbjs); + return r; + },Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/sdg"); + } + + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java index 9b0300bf2f..7b1584e3f7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java @@ -1,6 +1,48 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; +import com.opencsv.bean.CsvBindByPosition; + import java.io.Serializable; -public class SDGDataModel implements Serializable { +public class SDGDataModel implements Serializable{ + + @CsvBindByPosition(position = 0) +// @CsvBindByName(column = "doi") + private String doi; + + @CsvBindByPosition(position = 1) +// @CsvBindByName(column = "sdg") + private String sbj; + + + public SDGDataModel() { + + } + + public SDGDataModel(String doi, String sbj) { + this.doi = doi; + this.sbj = sbj; + + } + + public static SDGDataModel newInstance(String d, String sbj) { + return new SDGDataModel(d, sbj); + } + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } + + + public String getSbj() { + return sbj; + } + + public void setSbj(String sbj) { + this.sbj = sbj; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml index d53504fe63..9451af572b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml @@ -79,6 +79,7 @@ + @@ -107,7 +108,7 @@ - eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSData + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData --hdfsNameNode${nameNode} --sourcePath${fosPath} --outputPath${workingDir}/input/fos @@ -142,6 +143,42 @@ + + + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData + --hdfsNameNode${nameNode} + --sourcePath${sdgPath} + --outputPath${workingDir}/input/sdg + --classForNameeu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel + + + + + + + + yarn + cluster + Produces the unresolved from FOS! + eu.dnetlib.dhp.actionmanager.createunresolvedentities.PrepareSDGSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/input/sdg + --outputPath${workingDir}/prepared + + + + + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index 67a0f34654..17137c39a2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -10,6 +10,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -18,17 +19,13 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; -import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.schema.oaf.Result; public class PrepareTest { @@ -159,7 +156,7 @@ public class PrepareTest { .getPath(); final String outputPath = workingDir.toString() + "/fos.json"; - new GetFOSData() + new GetInputData() .doRewrite( sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel", ',', fs); @@ -180,6 +177,8 @@ public class PrepareTest { } + + @Test void fosPrepareTest() throws Exception { final String sourcePath = getClass() @@ -204,15 +203,9 @@ public class PrepareTest { String doi1 = "unresolved::10.3390/s18072310::doi"; - assertEquals(50, tmp.count()); + assertEquals(20, tmp.count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); - assertTrue( - tmp - .filter(r -> r.getId().equals(doi1)) - .flatMap(r -> r.getSubject().iterator()) - .map(sbj -> sbj.getValue()) - .collect() - .contains("engineering and technology")); + assertTrue( tmp @@ -220,16 +213,16 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("nano-technology")); + .contains("04 agricultural and veterinary sciences")); assertTrue( tmp .filter(r -> r.getId().equals(doi1)) .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("nanoscience & nanotechnology")); + .contains("0404 agricultural biotechnology")); - String doi = "unresolved::10.1111/1365-2656.12831::doi"; + String doi = "unresolved::10.1007/s11164-020-04383-6::doi"; assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); assertTrue( tmp @@ -237,7 +230,7 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("psychology and cognitive sciences")); + .contains("01 natural sciences")); assertTrue( tmp @@ -245,15 +238,116 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("social sciences")); - assertFalse( + .contains("0104 chemical sciences")); + assertTrue( tmp .filter(r -> r.getId().equals(doi)) .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("NULL")); + .contains("010402 general chemistry")); } + @Test + void getSDGFileTest() throws IOException, ClassNotFoundException { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv") + .getPath(); + final String outputPath = workingDir.toString() + "/sdg.json"; + + new GetInputData() + .doRewrite( + sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", + ',', fs); + + BufferedReader in = new BufferedReader( + new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); + + String line; + int count = 0; + while ((line = in.readLine()) != null) { + SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); + + System.out.println(new ObjectMapper().writeValueAsString(sdg)); + count += 1; + } + + assertEquals(37, count); + + } + + @Test + void sdgPrepareTest() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); + + PrepareSDGSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + + "-outputPath", workingDir.toString() + "/work" + + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/work/sdg") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + + String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi"; + + assertEquals(32, tmp.count()); + assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); + + + assertTrue( + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("3. Good health")); + assertTrue( + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("8. Economic growth")); + + + } + @Disabled + @Test + void test2() throws Exception { + final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; + + + final String outputPath = workingDir.toString() + "/sdg.json"; + + new GetInputData() + .doRewrite( + sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", + ',', fs); + + BufferedReader in = new BufferedReader( + new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); + + String line; + int count = 0; + while ((line = in.readLine()) != null) { + SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); + + System.out.println(new ObjectMapper().writeValueAsString(sdg)); + count += 1; + } + + + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index 32fb256400..28fce4adb2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -7,6 +7,7 @@ import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.Constants; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -348,4 +349,175 @@ public class ProduceTest { } + + private JavaRDD getResultJavaRDDPlusSDG() throws Exception { + final String bipPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") + .getPath(); + + PrepareBipFinder + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", bipPath, + "--outputPath", workingDir.toString() + "/work" + + }); + final String fosPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") + .getPath(); + + PrepareFOSSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", fosPath, + "-outputPath", workingDir.toString() + "/work" + }); + + final String sdgPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); + + PrepareSDGSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sdgPath, + "-outputPath", workingDir.toString() + "/work" + }); + + SparkSaveUnresolved.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", workingDir.toString() + "/work", + + "-outputPath", workingDir.toString() + "/unresolved" + + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + return sc + .textFile(workingDir.toString() + "/unresolved") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + } + + + @Test + void produceTestSomeNumbersWithSDG() throws Exception { + + final String doi = "unresolved::10.3390/s18072310::doi"; + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + Assertions.assertEquals(136, tmp.count()); + + Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); + + Assertions + .assertEquals( + 50, tmp + .filter(row -> !row.getId().equals(doi)) + .filter(row -> row.getSubject() != null) + .count()); + + Assertions + .assertEquals( + 85, + tmp + .filter(row -> !row.getId().equals(doi)) + .filter(r -> r.getInstance() != null && r.getInstance().size() > 0) + .count()); + + } + + @Test + void produceTest7Subjects() throws Exception { + final String doi = "unresolved::10.3390/s18072310::doi"; + + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + Assertions + .assertEquals( + 7, tmp + .filter(row -> row.getId().equals(doi)) + .collect() + .get(0) + .getSubject() + .size()); + + List sbjs = tmp + .filter(row -> row.getId().equals(doi)) + .flatMap(row -> row.getSubject().iterator()) + .collect(); + + Assertions + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences"))); + + Assertions + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology"))); + Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science"))); + + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences"))); + Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences"))); + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics"))); + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty"))); + + } + + @Test + void produceTestSubjectsWithSDG() throws Exception { + + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + List sbjs_sdg = tmp + .filter(row -> row.getSubject() != null && row.getSubject().size() > 0) + .flatMap(row -> row.getSubject().iterator()) + .filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID)) + .collect(); + + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + "Sustainable Development Goals", sbj.getQualifier().getClassname())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename())); + + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance())); + sbjs_sdg + .forEach( + sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, + sbj.getDataInfo().getProvenanceaction().getSchemename())); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json new file mode 100644 index 0000000000..59d7071771 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json @@ -0,0 +1,37 @@ +{"doi":"10.1001/amaguidesnewsletters.2019.mayjun02","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2019.novdec01","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb01","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.julaug01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.marapr01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"16. Peace & justice"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2021.julaug01","sbj":"1. No poverty"} +{"doi":"10.1001/amaguidesnewsletters.2021.mayjune01","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2021.mayjune02","sbj":"10. No inequality"} +{"doi":"10.4336/2021.pfb.41e201902078","sbj":"15. Life on land"} +{"doi":"10.4337/ejeep.2019.00045","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.00050","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2019.0045","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.0050","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2019.0051","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.0052","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.0058","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2020.0058","sbj":"10. No inequality"} +{"doi":"10.4337/ejeep.2020.0060","sbj":"10. No inequality"} +{"doi":"10.4337/ejeep.2020.0065","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.03","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.05","sbj":"8. Economic growth"} +{"doi":"10.4337/ejeep.2020.02.06","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.09","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.01","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.03","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.05","sbj":"1. No poverty"} +{"doi":"10.4337/roke.2020.01.05","sbj":"8. Economic growth"} +{"doi":"10.4337/roke.2020.01.07","sbj":"8. Economic growth"} +{"doi":"10.4337/roke.2020.02.03","sbj":"8. Economic growth"} +{"doi":"10.3390/s18072310","sbj":"1. No poverty"} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv new file mode 100644 index 0000000000..30524467ed --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv @@ -0,0 +1,37 @@ +10.1001/amaguidesnewsletters.2019.mayjun02,10. No inequality +10.1001/amaguidesnewsletters.2019.novdec01,10. No inequality +10.1001/amaguidesnewsletters.2019.sepoct02,3. Good health +10.1001/amaguidesnewsletters.2019.sepoct02,8. Economic growth +10.1001/amaguidesnewsletters.2020.janfeb01,8. Economic growth +10.1001/amaguidesnewsletters.2020.janfeb02,3. Good health +10.1001/amaguidesnewsletters.2020.janfeb02,8. Economic growth +10.1001/amaguidesnewsletters.2020.julaug01,3. Good health +10.1001/amaguidesnewsletters.2020.marapr01,3. Good health +10.1001/amaguidesnewsletters.2020.mayjun01,3. Good health +10.1001/amaguidesnewsletters.2020.mayjun02,16. Peace & justice +10.1001/amaguidesnewsletters.2020.mayjun02,10. No inequality +10.1001/amaguidesnewsletters.2021.julaug01,1. No poverty +10.1001/amaguidesnewsletters.2021.mayjune01,10. No inequality +10.1001/amaguidesnewsletters.2021.mayjune02,10. No inequality +10.4336/2021.pfb.41e201902078,15. Life on land +10.4337/ejeep.2019.00045,16. Peace & justice +10.4337/ejeep.2019.00050,1. No poverty +10.4337/ejeep.2019.0045,16. Peace & justice +10.4337/ejeep.2019.0050,1. No poverty +10.4337/ejeep.2019.0051,16. Peace & justice +10.4337/ejeep.2019.0052,16. Peace & justice +10.4337/ejeep.2020.0058,1. No poverty +10.4337/ejeep.2020.0058,10. No inequality +10.4337/ejeep.2020.0060,10. No inequality +10.4337/ejeep.2020.0065,16. Peace & justice +10.4337/ejeep.2020.02.03,16. Peace & justice +10.4337/ejeep.2020.02.05,8. Economic growth +10.4337/ejeep.2020.02.06,16. Peace & justice +10.4337/ejeep.2020.02.09,16. Peace & justice +10.4337/roke.2020.01.01,16. Peace & justice +10.4337/roke.2020.01.03,16. Peace & justice +10.4337/roke.2020.01.05,1. No poverty +10.4337/roke.2020.01.05,8. Economic growth +10.4337/roke.2020.01.07,8. Economic growth +10.4337/roke.2020.02.03,8. Economic growth +10.4337/roke.2020.02.04,1. No poverty \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java index 93d520eb4e..27573bb329 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java @@ -116,22 +116,22 @@ public class ExtractRelationFromEntityTest { .getType()); Assertions - .assertEquals( - "context", verificationDataset - .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) - .collectAsList() - .get(0) - .getSource() - .getType()); + .assertEquals( + "context", verificationDataset + .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) + .collectAsList() + .get(0) + .getSource() + .getType()); Assertions - .assertEquals( - "result", verificationDataset - .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) - .collectAsList() - .get(0) - .getTarget() - .getType()); + .assertEquals( + "result", verificationDataset + .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) + .collectAsList() + .get(0) + .getTarget() + .getType()); Assertions .assertEquals( "IsRelatedTo", verificationDataset @@ -151,22 +151,22 @@ public class ExtractRelationFromEntityTest { .getType()); Assertions - .assertEquals( - "context", verificationDataset - .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) - .collectAsList() - .get(0) - .getTarget() - .getType()); + .assertEquals( + "context", verificationDataset + .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) + .collectAsList() + .get(0) + .getTarget() + .getType()); Assertions - .assertEquals( - "result", verificationDataset - .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) - .collectAsList() - .get(0) - .getSource() - .getType()); + .assertEquals( + "result", verificationDataset + .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) + .collectAsList() + .get(0) + .getSource() + .getType()); } }