[SDG-FOS] alternative way to get input data to avoid OOM error while getting csv

This commit is contained in:
Miriam Baglioni 2022-01-03 15:23:06 +01:00
parent a706ba0c08
commit 92fd69e25d
10 changed files with 468 additions and 381 deletions

View File

@ -19,7 +19,7 @@ public class Constants {
public static final String DOI = "doi"; public static final String DOI = "doi";
public static final char DEFAULT_DELIMITER = ','; public static final String DEFAULT_DELIMITER = ",";
public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_DATA_INFO_TYPE = "update";
public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
@ -55,7 +55,8 @@ public class Constants {
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); .map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
} }
public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) { public static StructuredProperty getSubject(String sbj, String classid, String classname,
String diqualifierclassid) {
if (sbj.equals(NULL)) if (sbj.equals(NULL))
return null; return null;
StructuredProperty sp = new StructuredProperty(); StructuredProperty sp = new StructuredProperty();
@ -78,7 +79,7 @@ public class Constants {
false, false,
OafMapperUtils OafMapperUtils
.qualifier( .qualifier(
diqualifierclassid, diqualifierclassid,
UPDATE_CLASS_NAME, UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS),

View File

@ -0,0 +1,91 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetFOSSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetFOSSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetFOSSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
SparkConf sconf = new SparkConf();
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
getFOS(
spark,
sourcePath,
outputPath,
delimiter);
});
}
private static void getFOS(SparkSession spark, String sourcePath, String outputPath, String delimiter) {
Dataset<Row> fosData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(sourcePath);
fosData.map((MapFunction<Row, FOSDataModel>) r -> {
FOSDataModel fosDataModel = new FOSDataModel();
fosDataModel.setDoi(r.getString(0).toLowerCase());
fosDataModel.setLevel1(r.getString(1));
fosDataModel.setLevel2(r.getString(2));
fosDataModel.setLevel3(r.getString(3));
return fosDataModel;
}, Encoders.bean(FOSDataModel.class))
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -1,80 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
public class GetInputData implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetInputData.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
GetInputData.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"))));
parser.parseArgument(args);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String classForName = parser.get("classForName");
log.info("classForName {}", classForName);
final char delimiter = Optional
.ofNullable(parser.get("delimiter"))
.map(s -> s.charAt(0))
.orElse(DEFAULT_DELIMITER);
log.info("delimiter {}", delimiter);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
}
public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs)
throws IOException, ClassNotFoundException {
// reads the csv and writes it as its json equivalent
try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) {
eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
}
}
}

View File

@ -0,0 +1,91 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetSDGSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetSDGSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetSDGSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
SparkConf sconf = new SparkConf();
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
getSDG(
spark,
sourcePath,
outputPath,
delimiter);
});
}
private static void getSDG(SparkSession spark, String sourcePath, String outputPath, String delimiter) {
Dataset<Row> sdgData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(sourcePath);
sdgData.map((MapFunction<Row, SDGDataModel>) r -> {
SDGDataModel sdgDataModel = new SDGDataModel();
sdgDataModel.setDoi(r.getString(0).toLowerCase());
sdgDataModel.setSbj(r.getString(1));
return sdgDataModel;
}, Encoders.bean(SDGDataModel.class))
.filter((FilterFunction<SDGDataModel>) sdg -> sdg.getSbj() != null)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -1,11 +1,13 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities; package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import static eu.dnetlib.dhp.actionmanager.Constants.*;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import java.io.Serializable;
import eu.dnetlib.dhp.utils.DHPUtils; import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -16,71 +18,72 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import java.util.List; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import static eu.dnetlib.dhp.actionmanager.Constants.*; import eu.dnetlib.dhp.schema.oaf.Result;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
public class PrepareSDGSparkJob implements Serializable { public class PrepareSDGSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class);
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils String jsonConfiguration = IOUtils
.toString( .toString(
PrepareSDGSparkJob.class PrepareSDGSparkJob.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json")); "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser); Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String sourcePath = parser.get("sourcePath"); String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath); log.info("sourcePath: {}", sourcePath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
doPrepare( doPrepare(
spark, spark,
sourcePath, sourcePath,
outputPath);
});
}
private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) {
Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class);
sdgDataset.groupByKey((MapFunction<SDGDataModel,String>)r -> r.getDoi().toLowerCase(),Encoders.STRING())
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>)(k,it) -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
SDGDataModel first = it.next();
List<StructuredProperty>sbjs = new ArrayList<>();
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
r.setSubject(sbjs);
return r;
},Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/sdg");
}
outputPath);
});
}
private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) {
Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class);
sdgDataset
.groupByKey((MapFunction<SDGDataModel, String>) r -> r.getDoi().toLowerCase(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>) (k, it) -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
SDGDataModel first = it.next();
List<StructuredProperty> sbjs = new ArrayList<>();
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
it
.forEachRemaining(
s -> sbjs
.add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
r.setSubject(sbjs);
return r;
}, Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/sdg");
}
} }

View File

@ -1,48 +1,47 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import com.opencsv.bean.CsvBindByPosition; package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import java.io.Serializable; import java.io.Serializable;
public class SDGDataModel implements Serializable{ import com.opencsv.bean.CsvBindByPosition;
@CsvBindByPosition(position = 0) public class SDGDataModel implements Serializable {
@CsvBindByPosition(position = 0)
// @CsvBindByName(column = "doi") // @CsvBindByName(column = "doi")
private String doi; private String doi;
@CsvBindByPosition(position = 1) @CsvBindByPosition(position = 1)
// @CsvBindByName(column = "sdg") // @CsvBindByName(column = "sdg")
private String sbj; private String sbj;
public SDGDataModel() {
public SDGDataModel() { }
} public SDGDataModel(String doi, String sbj) {
this.doi = doi;
this.sbj = sbj;
public SDGDataModel(String doi, String sbj) { }
this.doi = doi;
this.sbj = sbj;
} public static SDGDataModel newInstance(String d, String sbj) {
return new SDGDataModel(d, sbj);
}
public static SDGDataModel newInstance(String d, String sbj) { public String getDoi() {
return new SDGDataModel(d, sbj); return doi;
} }
public String getDoi() { public void setDoi(String doi) {
return doi; this.doi = doi;
} }
public void setDoi(String doi) { public String getSbj() {
this.doi = doi; return sbj;
} }
public void setSbj(String sbj) {
public String getSbj() { this.sbj = sbj;
return sbj; }
}
public void setSbj(String sbj) {
this.sbj = sbj;
}
} }

View File

@ -19,15 +19,9 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "hnn", "paramName": "d",
"paramLongName": "hdfsNameNode", "paramLongName": "delimiter",
"paramDescription": "the path used to store the HostedByMap", "paramDescription": "the delimiter if different from the default one (,)",
"paramRequired": true "paramRequired": false
},
{
"paramName": "cfn",
"paramLongName": "classForName",
"paramDescription": "the path used to store the HostedByMap",
"paramRequired": true
} }
] ]

View File

@ -107,17 +107,30 @@
</action> </action>
<action name="getFOS"> <action name="getFOS">
<java> <spark xmlns="uri:oozie:spark-action:0.2">
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class> <master>yarn</master>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <mode>cluster</mode>
<name>Gets Data from FOS csv file</name>
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSSparkJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${fosPath}</arg> <arg>--sourcePath</arg><arg>${fosPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/fos</arg> <arg>--outputPath</arg><arg>${workingDir}/input/fos</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel</arg> </spark>
</java>
<ok to="prepareFos"/> <ok to="prepareFos"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepareFos"> <action name="prepareFos">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -144,17 +157,30 @@
</action> </action>
<action name="getSDG"> <action name="getSDG">
<java> <spark xmlns="uri:oozie:spark-action:0.2">
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class> <master>yarn</master>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <mode>cluster</mode>
<name>Gets Data from SDG csv file</name>
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetSDGSparkJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sdgPath}</arg> <arg>--sourcePath</arg><arg>${sdgPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg> <arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel</arg> </spark>
</java>
<ok to="prepareSDG"/> <ok to="prepareSDG"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepareSDG"> <action name="prepareSDG">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -10,7 +10,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareTest { public class PrepareTest {
@ -148,37 +148,6 @@ public class PrepareTest {
} }
@Test
void getFOSFileTest() throws IOException, ClassNotFoundException {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos_sbs.csv")
.getPath();
final String outputPath = workingDir.toString() + "/fos.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
FOSDataModel fos = new ObjectMapper().readValue(line, FOSDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(fos));
count += 1;
}
assertEquals(39, count);
}
@Test @Test
void fosPrepareTest() throws Exception { void fosPrepareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
@ -206,7 +175,6 @@ public class PrepareTest {
assertEquals(20, tmp.count()); assertEquals(20, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
@ -249,105 +217,101 @@ public class PrepareTest {
} }
@Test
void getSDGFileTest() throws IOException, ClassNotFoundException {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv")
.getPath();
final String outputPath = workingDir.toString() + "/sdg.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg));
count += 1;
}
assertEquals(37, count);
}
@Test @Test
void sdgPrepareTest() throws Exception { void sdgPrepareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
.getPath(); .getPath();
PrepareSDGSparkJob PrepareSDGSparkJob
.main( .main(
new String[] { new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(), "--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath, "--sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/work" "-outputPath", workingDir.toString() + "/work"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc JavaRDD<Result> tmp = sc
.textFile(workingDir.toString() + "/work/sdg") .textFile(workingDir.toString() + "/work/sdg")
.map(item -> OBJECT_MAPPER.readValue(item, Result.class)); .map(item -> OBJECT_MAPPER.readValue(item, Result.class));
String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi"; String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi";
assertEquals(32, tmp.count()); assertEquals(32, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("3. Good health")); .contains("3. Good health"));
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("8. Economic growth")); .contains("8. Economic growth"));
} }
@Disabled
@Test @Test
void test2() throws Exception { void test3() throws Exception {
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz";
final String outputPath = workingDir.toString() + "/fos.json";
GetFOSSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
"-outputPath", outputPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<FOSDataModel> tmp = sc
.textFile(outputPath)
.map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class));
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null));
}
@Test
void test4() throws Exception {
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
final String outputPath = workingDir.toString() + "/sdg.json"; final String outputPath = workingDir.toString() + "/sdg.json";
GetSDGSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
new GetInputData() "-outputPath", outputPath
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader( });
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line; final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg)); JavaRDD<SDGDataModel> tmp = sc
count += 1; .textFile(outputPath)
} .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class));
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null));
} }
} }

View File

@ -7,7 +7,6 @@ import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -349,60 +349,58 @@ public class ProduceTest {
} }
private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception { private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception {
final String bipPath = getClass() final String bipPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json")
.getPath(); .getPath();
PrepareBipFinder PrepareBipFinder
.main( .main(
new String[] { new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(), "--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", bipPath, "--sourcePath", bipPath,
"--outputPath", workingDir.toString() + "/work" "--outputPath", workingDir.toString() + "/work"
}); });
final String fosPath = getClass() final String fosPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json") .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json")
.getPath(); .getPath();
PrepareFOSSparkJob PrepareFOSSparkJob
.main( .main(
new String[] { new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(), "--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", fosPath, "--sourcePath", fosPath,
"-outputPath", workingDir.toString() + "/work" "-outputPath", workingDir.toString() + "/work"
}); });
final String sdgPath = getClass() final String sdgPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json") .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
.getPath(); .getPath();
PrepareSDGSparkJob PrepareSDGSparkJob
.main( .main(
new String[] { new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(), "--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sdgPath, "--sourcePath", sdgPath,
"-outputPath", workingDir.toString() + "/work" "-outputPath", workingDir.toString() + "/work"
}); });
SparkSaveUnresolved.main(new String[] { SparkSaveUnresolved.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(), "--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", workingDir.toString() + "/work", "--sourcePath", workingDir.toString() + "/work",
"-outputPath", workingDir.toString() + "/unresolved" "-outputPath", workingDir.toString() + "/unresolved"
}); });
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc return sc
.textFile(workingDir.toString() + "/unresolved") .textFile(workingDir.toString() + "/unresolved")
.map(item -> OBJECT_MAPPER.readValue(item, Result.class)); .map(item -> OBJECT_MAPPER.readValue(item, Result.class));
} }
@Test @Test
void produceTestSomeNumbersWithSDG() throws Exception { void produceTestSomeNumbersWithSDG() throws Exception {
@ -414,19 +412,19 @@ public class ProduceTest {
Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count());
Assertions Assertions
.assertEquals( .assertEquals(
50, tmp 50, tmp
.filter(row -> !row.getId().equals(doi)) .filter(row -> !row.getId().equals(doi))
.filter(row -> row.getSubject() != null) .filter(row -> row.getSubject() != null)
.count()); .count());
Assertions Assertions
.assertEquals( .assertEquals(
85, 85,
tmp tmp
.filter(row -> !row.getId().equals(doi)) .filter(row -> !row.getId().equals(doi))
.filter(r -> r.getInstance() != null && r.getInstance().size() > 0) .filter(r -> r.getInstance() != null && r.getInstance().size() > 0)
.count()); .count());
} }
@ -437,35 +435,35 @@ public class ProduceTest {
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG(); JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
Assertions Assertions
.assertEquals( .assertEquals(
7, tmp 7, tmp
.filter(row -> row.getId().equals(doi)) .filter(row -> row.getId().equals(doi))
.collect() .collect()
.get(0) .get(0)
.getSubject() .getSubject()
.size()); .size());
List<StructuredProperty> sbjs = tmp List<StructuredProperty> sbjs = tmp
.filter(row -> row.getId().equals(doi)) .filter(row -> row.getId().equals(doi))
.flatMap(row -> row.getSubject().iterator()) .flatMap(row -> row.getSubject().iterator())
.collect(); .collect();
Assertions Assertions
.assertEquals( .assertEquals(
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences"))); true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences")));
Assertions Assertions
.assertEquals( .assertEquals(
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology"))); true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology")));
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science"))); Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science")));
Assertions Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences"))); .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences")));
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences"))); Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences")));
Assertions Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics"))); .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics")));
Assertions Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty"))); .assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty")));
} }
@ -475,25 +473,25 @@ public class ProduceTest {
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG(); JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
List<StructuredProperty> sbjs_sdg = tmp List<StructuredProperty> sbjs_sdg = tmp
.filter(row -> row.getSubject() != null && row.getSubject().size() > 0) .filter(row -> row.getSubject() != null && row.getSubject().size() > 0)
.flatMap(row -> row.getSubject().iterator()) .flatMap(row -> row.getSubject().iterator())
.filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID)) .filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID))
.collect(); .collect();
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals( .assertEquals(
"Sustainable Development Goals", sbj.getQualifier().getClassname())); "Sustainable Development Goals", sbj.getQualifier().getClassname()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid())); .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename())); .assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
@ -501,23 +499,23 @@ public class ProduceTest {
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance())); sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid())); sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname())); .assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals( .assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid())); ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
sbjs_sdg sbjs_sdg
.forEach( .forEach(
sbj -> Assertions sbj -> Assertions
.assertEquals( .assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
sbj.getDataInfo().getProvenanceaction().getSchemename())); sbj.getDataInfo().getProvenanceaction().getSchemename()));
} }
} }