From 92fd69e25da548827d857b08371cb99ffd7f6aaa Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 3 Jan 2022 15:23:06 +0100 Subject: [PATCH 1/5] [SDG-FOS] alternative way to get input data to avoid OOM error while getting csv --- .../dnetlib/dhp/actionmanager/Constants.java | 7 +- .../GetFOSSparkJob.java | 91 +++++++++ .../GetInputData.java | 80 -------- .../GetSDGSparkJob.java | 91 +++++++++ .../PrepareSDGSparkJob.java | 119 ++++++------ .../model/SDGDataModel.java | 59 +++--- .../get_input_parameters.json | 14 +- .../oozie_app/workflow.xml | 46 ++++- .../createunresolvedentities/PrepareTest.java | 168 +++++++---------- .../createunresolvedentities/ProduceTest.java | 174 +++++++++--------- 10 files changed, 468 insertions(+), 381 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java index ef4ef6756..3a46228d8 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/Constants.java @@ -19,7 +19,7 @@ public class Constants { public static final String DOI = "doi"; - public static final char DEFAULT_DELIMITER = ','; + public static final String DEFAULT_DELIMITER = ","; public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; @@ -55,7 +55,8 @@ public class Constants { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) { + public static StructuredProperty getSubject(String sbj, String classid, String classname, + String diqualifierclassid) { if (sbj.equals(NULL)) return null; StructuredProperty sp = new StructuredProperty(); @@ -78,7 +79,7 @@ public class Constants { false, OafMapperUtils .qualifier( - diqualifierclassid, + diqualifierclassid, UPDATE_CLASS_NAME, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java new file mode 100644 index 000000000..75fe42e90 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetFOSSparkJob.java @@ -0,0 +1,91 @@ + +package eu.dnetlib.dhp.actionmanager.createunresolvedentities; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; +import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetFOSSparkJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(GetFOSSparkJob.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GetFOSSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + // the path where the original fos csv file is stored + final String sourcePath = parser.get("sourcePath"); + log.info("sourcePath {}", sourcePath); + + // the path where to put the file as json + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String delimiter = Optional + .ofNullable(parser.get("delimiter")) + .orElse(DEFAULT_DELIMITER); + + SparkConf sconf = new SparkConf(); + runWithSparkSession( + sconf, + isSparkSessionManaged, + spark -> { + getFOS( + spark, + sourcePath, + outputPath, + delimiter); + }); + } + + private static void getFOS(SparkSession spark, String sourcePath, String outputPath, String delimiter) { + Dataset fosData = spark + .read() + .format("csv") + .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(sourcePath); + + fosData.map((MapFunction) r -> { + FOSDataModel fosDataModel = new FOSDataModel(); + fosDataModel.setDoi(r.getString(0).toLowerCase()); + fosDataModel.setLevel1(r.getString(1)); + fosDataModel.setLevel2(r.getString(2)); + fosDataModel.setLevel3(r.getString(3)); + return fosDataModel; + }, Encoders.bean(FOSDataModel.class)) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java deleted file mode 100644 index 499e42a8e..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetInputData.java +++ /dev/null @@ -1,80 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.createunresolvedentities; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Serializable; -import java.util.Objects; -import java.util.Optional; -import java.util.zip.GZIPInputStream; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; - -public class GetInputData implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(GetInputData.class); - - - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - Objects - .requireNonNull( - GetInputData.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json")))); - - parser.parseArgument(args); - - // the path where the original fos csv file is stored - final String sourcePath = parser.get("sourcePath"); - log.info("sourcePath {}", sourcePath); - - // the path where to put the file as json - final String outputPath = parser.get("outputPath"); - log.info("outputPath {}", outputPath); - - final String hdfsNameNode = parser.get("hdfsNameNode"); - log.info("hdfsNameNode {}", hdfsNameNode); - - final String classForName = parser.get("classForName"); - log.info("classForName {}", classForName); - - final char delimiter = Optional - .ofNullable(parser.get("delimiter")) - .map(s -> s.charAt(0)) - .orElse(DEFAULT_DELIMITER); - log.info("delimiter {}", delimiter); - - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", hdfsNameNode); - - FileSystem fileSystem = FileSystem.get(conf); - - new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); - - } - - public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs) - throws IOException, ClassNotFoundException { - - - // reads the csv and writes it as its json equivalent - try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) { - eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); - } - - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java new file mode 100644 index 000000000..328075389 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java @@ -0,0 +1,91 @@ + +package eu.dnetlib.dhp.actionmanager.createunresolvedentities; + +import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER; +import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +public class GetSDGSparkJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(GetSDGSparkJob.class); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + GetSDGSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + // the path where the original fos csv file is stored + final String sourcePath = parser.get("sourcePath"); + log.info("sourcePath {}", sourcePath); + + // the path where to put the file as json + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}", outputPath); + + final String delimiter = Optional + .ofNullable(parser.get("delimiter")) + .orElse(DEFAULT_DELIMITER); + + SparkConf sconf = new SparkConf(); + runWithSparkSession( + sconf, + isSparkSessionManaged, + spark -> { + getSDG( + spark, + sourcePath, + outputPath, + delimiter); + }); + } + + private static void getSDG(SparkSession spark, String sourcePath, String outputPath, String delimiter) { + Dataset sdgData = spark + .read() + .format("csv") + .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(sourcePath); + + sdgData.map((MapFunction) r -> { + SDGDataModel sdgDataModel = new SDGDataModel(); + sdgDataModel.setDoi(r.getString(0).toLowerCase()); + sdgDataModel.setSbj(r.getString(1)); + + return sdgDataModel; + }, Encoders.bean(SDGDataModel.class)) + .filter((FilterFunction) sdg -> sdg.getSbj() != null) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java index b0607ead5..27da77c0c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -1,11 +1,13 @@ package eu.dnetlib.dhp.actionmanager.createunresolvedentities; -import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; -import eu.dnetlib.dhp.utils.DHPUtils; +import static eu.dnetlib.dhp.actionmanager.Constants.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; @@ -16,71 +18,72 @@ import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import static eu.dnetlib.dhp.actionmanager.Constants.*; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; +import eu.dnetlib.dhp.utils.DHPUtils; public class PrepareSDGSparkJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); + private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareSDGSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); + String jsonConfiguration = IOUtils + .toString( + PrepareSDGSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + parser.parseArgument(args); - Boolean isSparkSessionManaged = isSparkSessionManaged(parser); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String sourcePath = parser.get("sourcePath"); - log.info("sourcePath: {}", sourcePath); + String sourcePath = parser.get("sourcePath"); + log.info("sourcePath: {}", sourcePath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - doPrepare( - spark, - sourcePath, - - outputPath); - }); - } - - private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { - Dataset sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); - - - sdgDataset.groupByKey((MapFunction)r -> r.getDoi().toLowerCase(),Encoders.STRING()) - .mapGroups((MapGroupsFunction)(k,it) -> { - Result r = new Result(); - r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); - SDGDataModel first = it.next(); - Listsbjs = new ArrayList<>(); - sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); - it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); - r.setSubject(sbjs); - return r; - },Encoders.bean(Result.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/sdg"); - } + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + doPrepare( + spark, + sourcePath, + outputPath); + }); + } + private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); + sdgDataset + .groupByKey((MapFunction) r -> r.getDoi().toLowerCase(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { + Result r = new Result(); + r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); + SDGDataModel first = it.next(); + List sbjs = new ArrayList<>(); + sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); + it + .forEachRemaining( + s -> sbjs + .add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); + r.setSubject(sbjs); + return r; + }, Encoders.bean(Result.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/sdg"); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java index 7b1584e3f..98ba5045c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/model/SDGDataModel.java @@ -1,48 +1,47 @@ -package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; -import com.opencsv.bean.CsvBindByPosition; +package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; import java.io.Serializable; -public class SDGDataModel implements Serializable{ +import com.opencsv.bean.CsvBindByPosition; - @CsvBindByPosition(position = 0) +public class SDGDataModel implements Serializable { + + @CsvBindByPosition(position = 0) // @CsvBindByName(column = "doi") - private String doi; + private String doi; - @CsvBindByPosition(position = 1) + @CsvBindByPosition(position = 1) // @CsvBindByName(column = "sdg") - private String sbj; + private String sbj; + public SDGDataModel() { - public SDGDataModel() { + } - } + public SDGDataModel(String doi, String sbj) { + this.doi = doi; + this.sbj = sbj; - public SDGDataModel(String doi, String sbj) { - this.doi = doi; - this.sbj = sbj; + } - } + public static SDGDataModel newInstance(String d, String sbj) { + return new SDGDataModel(d, sbj); + } - public static SDGDataModel newInstance(String d, String sbj) { - return new SDGDataModel(d, sbj); - } + public String getDoi() { + return doi; + } - public String getDoi() { - return doi; - } + public void setDoi(String doi) { + this.doi = doi; + } - public void setDoi(String doi) { - this.doi = doi; - } + public String getSbj() { + return sbj; + } - - public String getSbj() { - return sbj; - } - - public void setSbj(String sbj) { - this.sbj = sbj; - } + public void setSbj(String sbj) { + this.sbj = sbj; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json index 050a25677..5a6a63774 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json @@ -19,15 +19,9 @@ "paramRequired": false }, { - "paramName": "hnn", - "paramLongName": "hdfsNameNode", - "paramDescription": "the path used to store the HostedByMap", - "paramRequired": true - }, - { - "paramName": "cfn", - "paramLongName": "classForName", - "paramDescription": "the path used to store the HostedByMap", - "paramRequired": true + "paramName": "d", + "paramLongName": "delimiter", + "paramDescription": "the delimiter if different from the default one (,)", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml index 9451af572..a80bf4fbd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/createunresolvedentities/oozie_app/workflow.xml @@ -107,17 +107,30 @@ - - eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData - --hdfsNameNode${nameNode} + + yarn + cluster + Gets Data from FOS csv file + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --sourcePath${fosPath} --outputPath${workingDir}/input/fos - --classForNameeu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel - + + yarn @@ -144,17 +157,30 @@ - - eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData - --hdfsNameNode${nameNode} + + yarn + cluster + Gets Data from SDG csv file + eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetSDGSparkJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --sourcePath${sdgPath} --outputPath${workingDir}/input/sdg - --classForNameeu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel - + + yarn diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java index 17137c39a..166430c2f 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareTest.java @@ -10,7 +10,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; +import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import eu.dnetlib.dhp.schema.oaf.Result; public class PrepareTest { @@ -148,37 +148,6 @@ public class PrepareTest { } - @Test - void getFOSFileTest() throws IOException, ClassNotFoundException { - - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos_sbs.csv") - .getPath(); - final String outputPath = workingDir.toString() + "/fos.json"; - - new GetInputData() - .doRewrite( - sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel", - ',', fs); - - BufferedReader in = new BufferedReader( - new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); - - String line; - int count = 0; - while ((line = in.readLine()) != null) { - FOSDataModel fos = new ObjectMapper().readValue(line, FOSDataModel.class); - - System.out.println(new ObjectMapper().writeValueAsString(fos)); - count += 1; - } - - assertEquals(39, count); - - } - - - @Test void fosPrepareTest() throws Exception { final String sourcePath = getClass() @@ -206,7 +175,6 @@ public class PrepareTest { assertEquals(20, tmp.count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); - assertTrue( tmp .filter(r -> r.getId().equals(doi1)) @@ -249,105 +217,101 @@ public class PrepareTest { } - @Test - void getSDGFileTest() throws IOException, ClassNotFoundException { - - final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv") - .getPath(); - final String outputPath = workingDir.toString() + "/sdg.json"; - - new GetInputData() - .doRewrite( - sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", - ',', fs); - - BufferedReader in = new BufferedReader( - new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); - - String line; - int count = 0; - while ((line = in.readLine()) != null) { - SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); - - System.out.println(new ObjectMapper().writeValueAsString(sdg)); - count += 1; - } - - assertEquals(37, count); - - } - @Test void sdgPrepareTest() throws Exception { final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); PrepareSDGSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sourcePath, + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, - "-outputPath", workingDir.toString() + "/work" + "-outputPath", workingDir.toString() + "/work" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/work/sdg") - .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + .textFile(workingDir.toString() + "/work/sdg") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi"; assertEquals(32, tmp.count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); - assertTrue( - tmp - .filter(r -> r.getId().equals(doi1)) - .flatMap(r -> r.getSubject().iterator()) - .map(sbj -> sbj.getValue()) - .collect() - .contains("3. Good health")); + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("3. Good health")); assertTrue( - tmp - .filter(r -> r.getId().equals(doi1)) - .flatMap(r -> r.getSubject().iterator()) - .map(sbj -> sbj.getValue()) - .collect() - .contains("8. Economic growth")); - + tmp + .filter(r -> r.getId().equals(doi1)) + .flatMap(r -> r.getSubject().iterator()) + .map(sbj -> sbj.getValue()) + .collect() + .contains("8. Economic growth")); } - @Disabled + @Test - void test2() throws Exception { + void test3() throws Exception { + final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz"; + + final String outputPath = workingDir.toString() + "/fos.json"; + GetFOSSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + + "-outputPath", outputPath + + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(outputPath) + .map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class)); + + tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); + tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null)); + tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null)); + tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null)); + + } + + @Test + void test4() throws Exception { final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; - final String outputPath = workingDir.toString() + "/sdg.json"; + GetSDGSparkJob + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, - new GetInputData() - .doRewrite( - sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel", - ',', fs); + "-outputPath", outputPath - BufferedReader in = new BufferedReader( - new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath)))); + }); - String line; - int count = 0; - while ((line = in.readLine()) != null) { - SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - System.out.println(new ObjectMapper().writeValueAsString(sdg)); - count += 1; - } + JavaRDD tmp = sc + .textFile(outputPath) + .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class)); + tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null)); + tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null)); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java index 28fce4adb..02c6582f1 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/ProduceTest.java @@ -7,7 +7,6 @@ import java.nio.file.Path; import java.util.List; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.Constants; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.actionmanager.Constants; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; @@ -349,60 +349,58 @@ public class ProduceTest { } - private JavaRDD getResultJavaRDDPlusSDG() throws Exception { final String bipPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") + .getPath(); PrepareBipFinder - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", bipPath, - "--outputPath", workingDir.toString() + "/work" + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", bipPath, + "--outputPath", workingDir.toString() + "/work" - }); + }); final String fosPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") + .getPath(); PrepareFOSSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", fosPath, - "-outputPath", workingDir.toString() + "/work" - }); + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", fosPath, + "-outputPath", workingDir.toString() + "/work" + }); final String sdgPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") + .getPath(); PrepareSDGSparkJob - .main( - new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", sdgPath, - "-outputPath", workingDir.toString() + "/work" - }); + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sdgPath, + "-outputPath", workingDir.toString() + "/work" + }); SparkSaveUnresolved.main(new String[] { - "--isSparkSessionManaged", Boolean.FALSE.toString(), - "--sourcePath", workingDir.toString() + "/work", + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", workingDir.toString() + "/work", - "-outputPath", workingDir.toString() + "/unresolved" + "-outputPath", workingDir.toString() + "/unresolved" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); return sc - .textFile(workingDir.toString() + "/unresolved") - .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); + .textFile(workingDir.toString() + "/unresolved") + .map(item -> OBJECT_MAPPER.readValue(item, Result.class)); } - @Test void produceTestSomeNumbersWithSDG() throws Exception { @@ -414,19 +412,19 @@ public class ProduceTest { Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); Assertions - .assertEquals( - 50, tmp - .filter(row -> !row.getId().equals(doi)) - .filter(row -> row.getSubject() != null) - .count()); + .assertEquals( + 50, tmp + .filter(row -> !row.getId().equals(doi)) + .filter(row -> row.getSubject() != null) + .count()); Assertions - .assertEquals( - 85, - tmp - .filter(row -> !row.getId().equals(doi)) - .filter(r -> r.getInstance() != null && r.getInstance().size() > 0) - .count()); + .assertEquals( + 85, + tmp + .filter(row -> !row.getId().equals(doi)) + .filter(r -> r.getInstance() != null && r.getInstance().size() > 0) + .count()); } @@ -437,35 +435,35 @@ public class ProduceTest { JavaRDD tmp = getResultJavaRDDPlusSDG(); Assertions - .assertEquals( - 7, tmp - .filter(row -> row.getId().equals(doi)) - .collect() - .get(0) - .getSubject() - .size()); + .assertEquals( + 7, tmp + .filter(row -> row.getId().equals(doi)) + .collect() + .get(0) + .getSubject() + .size()); List sbjs = tmp - .filter(row -> row.getId().equals(doi)) - .flatMap(row -> row.getSubject().iterator()) - .collect(); + .filter(row -> row.getId().equals(doi)) + .flatMap(row -> row.getSubject().iterator()) + .collect(); Assertions - .assertEquals( - true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences"))); + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences"))); Assertions - .assertEquals( - true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology"))); + .assertEquals( + true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology"))); Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science"))); Assertions - .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences"))); + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences"))); Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences"))); Assertions - .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics"))); + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics"))); Assertions - .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty"))); + .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty"))); } @@ -475,25 +473,25 @@ public class ProduceTest { JavaRDD tmp = getResultJavaRDDPlusSDG(); List sbjs_sdg = tmp - .filter(row -> row.getSubject() != null && row.getSubject().size() > 0) - .flatMap(row -> row.getSubject().iterator()) - .filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID)) - .collect(); + .filter(row -> row.getSubject() != null && row.getSubject().size() > 0) + .flatMap(row -> row.getSubject().iterator()) + .filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID)) + .collect(); sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals( - "Sustainable Development Goals", sbj.getQualifier().getClassname())); + .forEach( + sbj -> Assertions + .assertEquals( + "Sustainable Development Goals", sbj.getQualifier().getClassname())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid())); + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename())); + .forEach( + sbj -> Assertions + .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred())); @@ -501,23 +499,23 @@ public class ProduceTest { sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance())); sbjs_sdg - .forEach( - sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid())); + .forEach( + sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname())); + .forEach( + sbj -> Assertions + .assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals( - ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid())); + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid())); sbjs_sdg - .forEach( - sbj -> Assertions - .assertEquals( - ModelConstants.DNET_PROVENANCE_ACTIONS, - sbj.getDataInfo().getProvenanceaction().getSchemename())); + .forEach( + sbj -> Assertions + .assertEquals( + ModelConstants.DNET_PROVENANCE_ACTIONS, + sbj.getDataInfo().getProvenanceaction().getSchemename())); } } From adccc2346a46283aedd9ce0a33e37b3236002be6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 7 Jan 2022 11:28:50 +0100 Subject: [PATCH 2/5] [SDG-FOS] to lower case for the doi --- .../actionmanager/createunresolvedentities/GetSDGSparkJob.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java index 328075389..e2c28e64e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java @@ -69,8 +69,6 @@ public class GetSDGSparkJob implements Serializable { .read() .format("csv") .option("sep", delimiter) - .option("inferSchema", "true") - .option("header", "true") .option("quotes", "\"") .load(sourcePath); From b7e450070b048dde950c68f54a23be66880297b6 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 7 Jan 2022 12:13:26 +0100 Subject: [PATCH 3/5] [SDG-FOS] to import SDG file not considering the header --- .../actionmanager/createunresolvedentities/GetSDGSparkJob.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java index e2c28e64e..328075389 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/GetSDGSparkJob.java @@ -69,6 +69,8 @@ public class GetSDGSparkJob implements Serializable { .read() .format("csv") .option("sep", delimiter) + .option("inferSchema", "true") + .option("header", "true") .option("quotes", "\"") .load(sourcePath); From 064f9bbd87f71434c070c8a98589eaa6afe5f050 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 7 Jan 2022 18:58:51 +0100 Subject: [PATCH 4/5] [AFFPropSR] added new paprameter for the number of iterations and new code for just one iteration --- .../SparkResultToOrganizationFromSemRel.java | 69 ++++++++++++++----- .../input_propagation_parameter.json | 6 ++ .../oozie_app/workflow.xml | 3 +- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index 9ceebf222..cfc69a8f0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; import java.util.Arrays; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -64,6 +65,18 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { final String workingPath = parser.get("workingDir"); log.info("workingPath: {}", workingPath); + final int iterations = Optional + .ofNullable(parser.get("iterations")) + .map(v -> { + if (Integer.valueOf(v) < MAX_ITERATION) { + return Integer.valueOf(v); + } else + return MAX_ITERATION; + }) + .orElse(MAX_ITERATION); + + log.info("iterations: {}", iterations); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); @@ -77,7 +90,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { resultOrganizationPath, relationPath, workingPath, - outputPath)); + outputPath, + iterations)); } public static void execPropagation(SparkSession spark, @@ -86,26 +100,45 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { String resultOrganizationPath, String graphPath, String workingPath, + String outputPath, + int iterations) { + if (iterations == 1) { + doPropagateOnce( + spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, + workingPath, outputPath); + } else { + + final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); + final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); + final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE); + final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR); + final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE); + final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT); + + final PropagationCounter propagationCounter = new PropagationCounter(iterationOne, + iterationTwo, + iterationThree, + iterationFour, + iterationFive, + notReachedFirstParent); + + doPropagate( + spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, + workingPath, outputPath, propagationCounter); + } + + } + + private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, + String resultOrganizationPath, String graphPath, String workingPath, String outputPath) { - final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); - final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); - final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE); - final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR); - final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE); - final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT); - - final PropagationCounter propagationCounter = new PropagationCounter(iterationOne, - iterationTwo, - iterationThree, - iterationFour, - iterationFive, - notReachedFirstParent); - - doPropagate( - spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, - workingPath, outputPath, propagationCounter); + StepActions + .execStep( + spark, graphPath, workingPath + NEW_RELATION_PATH, + leavesPath, childParentPath, resultOrganizationPath); + addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); } private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json index f73cc221e..e09cd62fa 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json @@ -46,5 +46,11 @@ "paramLongName": "outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true + }, + { + "paramName": "it", + "paramLongName": "iterations", + "paramDescription": "the number of iterations to be computed", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml index 17502abea..3f0530aaf 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -181,6 +181,7 @@ --resultOrgPath${workingDir}/preparedInfo/resultOrgPath --hive_metastore_uris${hive_metastore_uris} --workingDir${workingDir}/working + --iterations${iterations} From 0163dadb7f9abbe1c801b55d30728c4e3768627f Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 11 Jan 2022 11:05:44 +0100 Subject: [PATCH 5/5] [doiboost] - update MAG schema, new filed added on version dec-2021 --- .../eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala index d25a4893f..039c935f3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala +++ b/dhp-workflows/dhp-doiboost/src/main/scala/eu/dnetlib/doiboost/mag/SparkImportMagIntoDataset.scala @@ -41,7 +41,7 @@ object SparkImportMagIntoDataset { "PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")), "PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")), "PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")), - "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "OnlineDate:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "FamilyRank:uint?", "CreatedDate:DateTime")), + "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "OnlineDate:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "FamilyRank:uint?", "DocSubTypes:string", "CreatedDate:DateTime")), "RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float")) )