diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index 153a98d3f..ef4ef6756 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -19,14 +19,20 @@ public class Constants { public static final String DOI = "doi"; + public static final char DEFAULT_DELIMITER = ','; + public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; + public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg"; public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; + public static final String SDG_CLASS_ID = "SDG"; + public static final String SDG_CLASS_NAME = "Sustainable Development Goals"; + public static final String NULL = "NULL"; public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -49,7 +55,7 @@ public class Constants { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - public static StructuredProperty getSubject(String sbj, String classid, String classname) { + public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) { if (sbj.equals(NULL)) return null; StructuredProperty sp = new StructuredProperty(); @@ -72,7 +78,7 @@ public class Constants { false, OafMapperUtils .qualifier( - UPDATE_SUBJECT_FOS_CLASS_ID, + diqualifierclassid, UPDATE_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java similarity index 75% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java index bd430d940..499e42a8e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSData.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java @@ -6,6 +6,7 @@ import java.io.InputStreamReader; import java.io.Serializable; import java.util.Objects; import java.util.Optional; +import java.util.zip.GZIPInputStream; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -15,13 +16,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.collection.GetCSV; -public class GetFOSData implements Serializable { +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; + +public class GetInputData implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(GetInputData.class); - private static final Logger log = LoggerFactory.getLogger(GetFOSData.class); - public static final char DEFAULT_DELIMITER = ','; public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -29,9 +31,9 @@ public class GetFOSData implements Serializable { .toString( Objects .requireNonNull( - GetFOSData.class + GetInputData.class .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json")))); + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json")))); parser.parseArgument(args); @@ -60,16 +62,17 @@ public class GetFOSData implements Serializable { FileSystem fileSystem = FileSystem.get(conf); - new GetFOSData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); + new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); } public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs) throws IOException, ClassNotFoundException { + // reads the csv and writes it as its json equivalent - try (InputStreamReader reader = new InputStreamReader(fs.open(new Path(inputPath)))) { - GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); + try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) { + eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java index c8f472db7..fef796515 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareFOSSparkJob.java @@ -66,20 +66,20 @@ public class PrepareFOSSparkJob implements Serializable { Dataset fosDataset = readPath(spark, sourcePath, FOSDataModel.class); fosDataset - .groupByKey((MapFunction) v -> v.getDoi(), Encoders.STRING()) + .groupByKey((MapFunction) v -> v.getDoi().toLowerCase(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, it) -> { Result r = new Result(); FOSDataModel first = it.next(); - r.setId(DHPUtils.generateUnresolvedIdentifier(first.getDoi(), DOI)); + r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); HashSet level1 = new HashSet<>(); HashSet level2 = new HashSet<>(); HashSet level3 = new HashSet<>(); addLevels(level1, level2, level3, first); it.forEachRemaining(v -> addLevels(level1, level2, level3, v)); List sbjs = new ArrayList<>(); - level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); - level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); - level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); + level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); + level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); + level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID))); r.setSubject(sbjs); return r; }, Encoders.bean(Result.class)) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java new file mode 100644 index 000000000..b0607ead5 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -0,0 +1,86 @@ + +package eu.dnetlib.dhp.actionmanager.createunresolvedentities; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class PrepareSDGSparkJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + PrepareSDGSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String sourcePath = parser.get("sourcePath"); + log.info("sourcePath: {}", sourcePath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + doPrepare( + spark, + sourcePath, + + outputPath); + }); + } + + private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); + + + sdgDataset.groupByKey((MapFunction)r -> r.getDoi().toLowerCase(),Encoders.STRING()) + .mapGroups((MapGroupsFunction)(k,it) -> { + Result r = new Result(); + r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); + SDGDataModel first = it.next(); + Listsbjs = new ArrayList<>(); + sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); + it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); + r.setSubject(sbjs); + return r; + },Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/sdg"); + } + + + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java index 9b0300bf2..7b1584e3f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java @@ -1,6 +1,48 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; +import com.opencsv.bean.CsvBindByPosition; + import java.io.Serializable; -public class SDGDataModel implements Serializable { +public class SDGDataModel implements Serializable{ + + @CsvBindByPosition(position = 0) +// @CsvBindByName(column = "doi") + private String doi; + + @CsvBindByPosition(position = 1) +// @CsvBindByName(column = "sdg") + private String sbj; + + + public SDGDataModel() { + + } + + public SDGDataModel(String doi, String sbj) { + this.doi = doi; + this.sbj = sbj; + + } + + public static SDGDataModel newInstance(String d, String sbj) { + return new SDGDataModel(d, sbj); + } + + public String getDoi() { + return doi; + } + + public void setDoi(String doi) { + this.doi = doi; + } + + + public String getSbj() { + return sbj; + } + + public void setSbj(String sbj) { + this.sbj = sbj; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml index d53504fe6..9451af572 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml @@ -79,6 +79,7 @@ + @@ -107,7 +108,7 @@ - eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSData + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData --hdfsNameNode${nameNode} --sourcePath${fosPath} --outputPath${workingDir}/input/fos @@ -142,6 +143,42 @@ + + + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData + --hdfsNameNode${nameNode} + --sourcePath${sdgPath} + --outputPath${workingDir}/input/sdg + --classForNameeu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel + + + + + + + + yarn + cluster + Produces the unresolved from FOS! + eu.dnetlib.dhp.actionmanager.createunresolvedentities.PrepareSDGSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --sourcePath${workingDir}/input/sdg + --outputPath${workingDir}/prepared + + + + + diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index 67a0f3465..17137c39a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -10,6 +10,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -18,17 +19,13 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; -import eu.dnetlib.dhp.common.collection.CollectorException; import eu.dnetlib.dhp.schema.oaf.Result; public class PrepareTest { @@ -159,7 +156,7 @@ public class PrepareTest { .getPath(); final String outputPath = workingDir.toString() + "/fos.json"; - new GetFOSData() + new GetInputData() .doRewrite( sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel", ',', fs); @@ -180,6 +177,8 @@ public class PrepareTest { } + + @Test void fosPrepareTest() throws Exception { final String sourcePath = getClass() @@ -204,15 +203,9 @@ public class PrepareTest { String doi1 = "unresolved::10.3390/s18072310::doi"; - assertEquals(50, tmp.count()); + assertEquals(20, tmp.count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); - assertTrue( - tmp - .filter(r -> r.getId().equals(doi1)) - .flatMap(r -> r.getSubject().iterator()) - .map(sbj -> sbj.getValue()) - .collect() - .contains("engineering and technology")); + assertTrue( tmp @@ -220,16 +213,16 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("nano-technology")); + .contains("04 agricultural and veterinary sciences")); assertTrue( tmp .filter(r -> r.getId().equals(doi1)) .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("nanoscience & nanotechnology")); + .contains("0404 agricultural biotechnology")); - String doi = "unresolved::10.1111/1365-2656.12831::doi"; + String doi = "unresolved::10.1007/s11164-020-04383-6::doi"; assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); assertTrue( tmp @@ -237,7 +230,7 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("psychology and cognitive sciences")); + .contains("01 natural sciences")); assertTrue( tmp @@ -245,15 +238,116 @@ public class PrepareTest { .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("social sciences")); - assertFalse( + .contains("0104 chemical sciences")); + assertTrue( tmp .filter(r -> r.getId().equals(doi)) .flatMap(r -> r.getSubject().iterator()) .map(sbj -> sbj.getValue()) .collect() - .contains("NULL")); + .contains("010402 general chemistry")); } + @Test + void getSDGFileTest() throws IOException, ClassNotFoundException { + + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv") + .getPath(); + final String outputPath = workingDir.toString() + "/sdg.json"; + + new GetInputData() + .doRewrite( + sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", + ',', fs); + + BufferedReader in = new BufferedReader( + new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); + + String line; + int count = 0; + while ((line = in.readLine()) != null) { + SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); + + System.out.println(new ObjectMapper().writeValueAsString(sdg)); + count += 1; + } + + assertEquals(37, count); + + } + + @Test + void sdgPrepareTest() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); + + PrepareSDGSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + + "-outputPath", workingDir.toString() + "/work" + + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/work/sdg") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + + String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi"; + + assertEquals(32, tmp.count()); + assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); + + + assertTrue( + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("3. Good health")); + assertTrue( + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("8. Economic growth")); + + + } + @Disabled + @Test + void test2() throws Exception { + final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; + + + final String outputPath = workingDir.toString() + "/sdg.json"; + + new GetInputData() + .doRewrite( + sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", + ',', fs); + + BufferedReader in = new BufferedReader( + new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); + + String line; + int count = 0; + while ((line = in.readLine()) != null) { + SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); + + System.out.println(new ObjectMapper().writeValueAsString(sdg)); + count += 1; + } + + + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index 32fb25640..28fce4adb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -7,6 +7,7 @@ import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.Constants; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -348,4 +349,175 @@ public class ProduceTest { } + + private JavaRDD getResultJavaRDDPlusSDG() throws Exception { + final String bipPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") + .getPath(); + + PrepareBipFinder + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", bipPath, + "--outputPath", workingDir.toString() + "/work" + + }); + final String fosPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") + .getPath(); + + PrepareFOSSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", fosPath, + "-outputPath", workingDir.toString() + "/work" + }); + + final String sdgPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); + + PrepareSDGSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sdgPath, + "-outputPath", workingDir.toString() + "/work" + }); + + SparkSaveUnresolved.main(new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", workingDir.toString() + "/work", + + "-outputPath", workingDir.toString() + "/unresolved" + + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + return sc + .textFile(workingDir.toString() + "/unresolved") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + } + + + @Test + void produceTestSomeNumbersWithSDG() throws Exception { + + final String doi = "unresolved::10.3390/s18072310::doi"; + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + Assertions.assertEquals(136, tmp.count()); + + Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); + + Assertions + .assertEquals( + 50, tmp + .filter(row -> !row.getId().equals(doi)) + .filter(row -> row.getSubject() != null) + .count()); + + Assertions + .assertEquals( + 85, + tmp + .filter(row -> !row.getId().equals(doi)) + .filter(r -> r.getInstance() != null && r.getInstance().size() > 0) + .count()); + + } + + @Test + void produceTest7Subjects() throws Exception { + final String doi = "unresolved::10.3390/s18072310::doi"; + + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + Assertions + .assertEquals( + 7, tmp + .filter(row -> row.getId().equals(doi)) + .collect() + .get(0) + .getSubject() + .size()); + + List sbjs = tmp + .filter(row -> row.getId().equals(doi)) + .flatMap(row -> row.getSubject().iterator()) + .collect(); + + Assertions + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences"))); + + Assertions + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology"))); + Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science"))); + + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences"))); + Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences"))); + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics"))); + Assertions + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty"))); + + } + + @Test + void produceTestSubjectsWithSDG() throws Exception { + + JavaRDD tmp = getResultJavaRDDPlusSDG(); + + List sbjs_sdg = tmp + .filter(row -> row.getSubject() != null && row.getSubject().size() > 0) + .flatMap(row -> row.getSubject().iterator()) + .filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID)) + .collect(); + + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + "Sustainable Development Goals", sbj.getQualifier().getClassname())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename())); + + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust())); + sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance())); + sbjs_sdg + .forEach( + sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid())); + sbjs_sdg + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, + sbj.getDataInfo().getProvenanceaction().getSchemename())); + } + } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json new file mode 100644 index 000000000..59d707177 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json @@ -0,0 +1,37 @@ +{"doi":"10.1001/amaguidesnewsletters.2019.mayjun02","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2019.novdec01","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb01","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"8. Economic growth"} +{"doi":"10.1001/amaguidesnewsletters.2020.julaug01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.marapr01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun01","sbj":"3. Good health"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"16. Peace & justice"} +{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2021.julaug01","sbj":"1. No poverty"} +{"doi":"10.1001/amaguidesnewsletters.2021.mayjune01","sbj":"10. No inequality"} +{"doi":"10.1001/amaguidesnewsletters.2021.mayjune02","sbj":"10. No inequality"} +{"doi":"10.4336/2021.pfb.41e201902078","sbj":"15. Life on land"} +{"doi":"10.4337/ejeep.2019.00045","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.00050","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2019.0045","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.0050","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2019.0051","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2019.0052","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.0058","sbj":"1. No poverty"} +{"doi":"10.4337/ejeep.2020.0058","sbj":"10. No inequality"} +{"doi":"10.4337/ejeep.2020.0060","sbj":"10. No inequality"} +{"doi":"10.4337/ejeep.2020.0065","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.03","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.05","sbj":"8. Economic growth"} +{"doi":"10.4337/ejeep.2020.02.06","sbj":"16. Peace & justice"} +{"doi":"10.4337/ejeep.2020.02.09","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.01","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.03","sbj":"16. Peace & justice"} +{"doi":"10.4337/roke.2020.01.05","sbj":"1. No poverty"} +{"doi":"10.4337/roke.2020.01.05","sbj":"8. Economic growth"} +{"doi":"10.4337/roke.2020.01.07","sbj":"8. Economic growth"} +{"doi":"10.4337/roke.2020.02.03","sbj":"8. Economic growth"} +{"doi":"10.3390/s18072310","sbj":"1. No poverty"} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv new file mode 100644 index 000000000..30524467e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv @@ -0,0 +1,37 @@ +10.1001/amaguidesnewsletters.2019.mayjun02,10. No inequality +10.1001/amaguidesnewsletters.2019.novdec01,10. No inequality +10.1001/amaguidesnewsletters.2019.sepoct02,3. Good health +10.1001/amaguidesnewsletters.2019.sepoct02,8. Economic growth +10.1001/amaguidesnewsletters.2020.janfeb01,8. Economic growth +10.1001/amaguidesnewsletters.2020.janfeb02,3. Good health +10.1001/amaguidesnewsletters.2020.janfeb02,8. Economic growth +10.1001/amaguidesnewsletters.2020.julaug01,3. Good health +10.1001/amaguidesnewsletters.2020.marapr01,3. Good health +10.1001/amaguidesnewsletters.2020.mayjun01,3. Good health +10.1001/amaguidesnewsletters.2020.mayjun02,16. Peace & justice +10.1001/amaguidesnewsletters.2020.mayjun02,10. No inequality +10.1001/amaguidesnewsletters.2021.julaug01,1. No poverty +10.1001/amaguidesnewsletters.2021.mayjune01,10. No inequality +10.1001/amaguidesnewsletters.2021.mayjune02,10. No inequality +10.4336/2021.pfb.41e201902078,15. Life on land +10.4337/ejeep.2019.00045,16. Peace & justice +10.4337/ejeep.2019.00050,1. No poverty +10.4337/ejeep.2019.0045,16. Peace & justice +10.4337/ejeep.2019.0050,1. No poverty +10.4337/ejeep.2019.0051,16. Peace & justice +10.4337/ejeep.2019.0052,16. Peace & justice +10.4337/ejeep.2020.0058,1. No poverty +10.4337/ejeep.2020.0058,10. No inequality +10.4337/ejeep.2020.0060,10. No inequality +10.4337/ejeep.2020.0065,16. Peace & justice +10.4337/ejeep.2020.02.03,16. Peace & justice +10.4337/ejeep.2020.02.05,8. Economic growth +10.4337/ejeep.2020.02.06,16. Peace & justice +10.4337/ejeep.2020.02.09,16. Peace & justice +10.4337/roke.2020.01.01,16. Peace & justice +10.4337/roke.2020.01.03,16. Peace & justice +10.4337/roke.2020.01.05,1. No poverty +10.4337/roke.2020.01.05,8. Economic growth +10.4337/roke.2020.01.07,8. Economic growth +10.4337/roke.2020.02.03,8. Economic growth +10.4337/roke.2020.02.04,1. No poverty \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java index 93d520eb4..27573bb32 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/ExtractRelationFromEntityTest.java @@ -116,22 +116,22 @@ public class ExtractRelationFromEntityTest { .getType()); Assertions - .assertEquals( - "context", verificationDataset - .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) - .collectAsList() - .get(0) - .getSource() - .getType()); + .assertEquals( + "context", verificationDataset + .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) + .collectAsList() + .get(0) + .getSource() + .getType()); Assertions - .assertEquals( - "result", verificationDataset - .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) - .collectAsList() - .get(0) - .getTarget() - .getType()); + .assertEquals( + "result", verificationDataset + .filter((FilterFunction) row -> row.getSource().getId().startsWith("00")) + .collectAsList() + .get(0) + .getTarget() + .getType()); Assertions .assertEquals( "IsRelatedTo", verificationDataset @@ -151,22 +151,22 @@ public class ExtractRelationFromEntityTest { .getType()); Assertions - .assertEquals( - "context", verificationDataset - .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) - .collectAsList() - .get(0) - .getTarget() - .getType()); + .assertEquals( + "context", verificationDataset + .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) + .collectAsList() + .get(0) + .getTarget() + .getType()); Assertions - .assertEquals( - "result", verificationDataset - .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) - .collectAsList() - .get(0) - .getSource() - .getType()); + .assertEquals( + "result", verificationDataset + .filter((FilterFunction) row -> row.getTarget().getId().startsWith("00")) + .collectAsList() + .get(0) + .getSource() + .getType()); } }