pulled from beta

This commit is contained in:
Claudio Atzori 2022-01-11 16:59:41 +01:00
commit dcd282977c
14 changed files with 562 additions and 638 deletions

View File

@ -19,7 +19,7 @@ public class Constants {
public static final String DOI = "doi"; public static final String DOI = "doi";
public static final char DEFAULT_DELIMITER = ','; public static final String DEFAULT_DELIMITER = ",";
public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_DATA_INFO_TYPE = "update";
public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
@ -55,7 +55,8 @@ public class Constants {
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); .map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
} }
public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) { public static StructuredProperty getSubject(String sbj, String classid, String classname,
String diqualifierclassid) {
if (sbj.equals(NULL)) if (sbj.equals(NULL))
return null; return null;
StructuredProperty sp = new StructuredProperty(); StructuredProperty sp = new StructuredProperty();

View File

@ -0,0 +1,91 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetFOSSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetFOSSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetFOSSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
SparkConf sconf = new SparkConf();
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
getFOS(
spark,
sourcePath,
outputPath,
delimiter);
});
}
private static void getFOS(SparkSession spark, String sourcePath, String outputPath, String delimiter) {
Dataset<Row> fosData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(sourcePath);
fosData.map((MapFunction<Row, FOSDataModel>) r -> {
FOSDataModel fosDataModel = new FOSDataModel();
fosDataModel.setDoi(r.getString(0).toLowerCase());
fosDataModel.setLevel1(r.getString(1));
fosDataModel.setLevel2(r.getString(2));
fosDataModel.setLevel3(r.getString(3));
return fosDataModel;
}, Encoders.bean(FOSDataModel.class))
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -1,80 +0,0 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
public class GetInputData implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetInputData.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
Objects
.requireNonNull(
GetInputData.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"))));
parser.parseArgument(args);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String hdfsNameNode = parser.get("hdfsNameNode");
log.info("hdfsNameNode {}", hdfsNameNode);
final String classForName = parser.get("classForName");
log.info("classForName {}", classForName);
final char delimiter = Optional
.ofNullable(parser.get("delimiter"))
.map(s -> s.charAt(0))
.orElse(DEFAULT_DELIMITER);
log.info("delimiter {}", delimiter);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
}
public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs)
throws IOException, ClassNotFoundException {
// reads the csv and writes it as its json equivalent
try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) {
eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
}
}
}

View File

@ -0,0 +1,91 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
import static eu.dnetlib.dhp.actionmanager.Constants.isSparkSessionManaged;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
public class GetSDGSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetSDGSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GetSDGSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
// the path where the original fos csv file is stored
final String sourcePath = parser.get("sourcePath");
log.info("sourcePath {}", sourcePath);
// the path where to put the file as json
final String outputPath = parser.get("outputPath");
log.info("outputPath {}", outputPath);
final String delimiter = Optional
.ofNullable(parser.get("delimiter"))
.orElse(DEFAULT_DELIMITER);
SparkConf sconf = new SparkConf();
runWithSparkSession(
sconf,
isSparkSessionManaged,
spark -> {
getSDG(
spark,
sourcePath,
outputPath,
delimiter);
});
}
private static void getSDG(SparkSession spark, String sourcePath, String outputPath, String delimiter) {
Dataset<Row> sdgData = spark
.read()
.format("csv")
.option("sep", delimiter)
.option("inferSchema", "true")
.option("header", "true")
.option("quotes", "\"")
.load(sourcePath);
sdgData.map((MapFunction<Row, SDGDataModel>) r -> {
SDGDataModel sdgDataModel = new SDGDataModel();
sdgDataModel.setDoi(r.getString(0).toLowerCase());
sdgDataModel.setSbj(r.getString(1));
return sdgDataModel;
}, Encoders.bean(SDGDataModel.class))
.filter((FilterFunction<SDGDataModel>) sdg -> sdg.getSbj() != null)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath);
}
}

View File

@ -1,11 +1,13 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities; package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel; import static eu.dnetlib.dhp.actionmanager.Constants.*;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import java.io.Serializable;
import eu.dnetlib.dhp.utils.DHPUtils; import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -16,11 +18,12 @@ import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import java.util.List; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import static eu.dnetlib.dhp.actionmanager.Constants.*; import eu.dnetlib.dhp.schema.oaf.Result;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
public class PrepareSDGSparkJob implements Serializable { public class PrepareSDGSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class); private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class);
@ -62,15 +65,18 @@ public class PrepareSDGSparkJob implements Serializable {
private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) {
Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class); Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class);
sdgDataset
sdgDataset.groupByKey((MapFunction<SDGDataModel,String>)r -> r.getDoi().toLowerCase(),Encoders.STRING()) .groupByKey((MapFunction<SDGDataModel, String>) r -> r.getDoi().toLowerCase(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>) (k, it) -> { .mapGroups((MapGroupsFunction<String, SDGDataModel, Result>) (k, it) -> {
Result r = new Result(); Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI)); r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
SDGDataModel first = it.next(); SDGDataModel first = it.next();
List<StructuredProperty> sbjs = new ArrayList<>(); List<StructuredProperty> sbjs = new ArrayList<>();
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)); sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID))); it
.forEachRemaining(
s -> sbjs
.add(getSubject(s.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
r.setSubject(sbjs); r.setSubject(sbjs);
return r; return r;
}, Encoders.bean(Result.class)) }, Encoders.bean(Result.class))
@ -80,7 +86,4 @@ public class PrepareSDGSparkJob implements Serializable {
.json(outputPath + "/sdg"); .json(outputPath + "/sdg");
} }
} }

View File

@ -1,9 +1,10 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model; package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import com.opencsv.bean.CsvBindByPosition;
import java.io.Serializable; import java.io.Serializable;
import com.opencsv.bean.CsvBindByPosition;
public class SDGDataModel implements Serializable { public class SDGDataModel implements Serializable {
@CsvBindByPosition(position = 0) @CsvBindByPosition(position = 0)
@ -14,7 +15,6 @@ public class SDGDataModel implements Serializable{
// @CsvBindByName(column = "sdg") // @CsvBindByName(column = "sdg")
private String sbj; private String sbj;
public SDGDataModel() { public SDGDataModel() {
} }
@ -37,7 +37,6 @@ public class SDGDataModel implements Serializable{
this.doi = doi; this.doi = doi;
} }
public String getSbj() { public String getSbj() {
return sbj; return sbj;
} }

View File

@ -19,15 +19,9 @@
"paramRequired": false "paramRequired": false
}, },
{ {
"paramName": "hnn", "paramName": "d",
"paramLongName": "hdfsNameNode", "paramLongName": "delimiter",
"paramDescription": "the path used to store the HostedByMap", "paramDescription": "the delimiter if different from the default one (,)",
"paramRequired": true "paramRequired": false
},
{
"paramName": "cfn",
"paramLongName": "classForName",
"paramDescription": "the path used to store the HostedByMap",
"paramRequired": true
} }
] ]

View File

@ -107,17 +107,30 @@
</action> </action>
<action name="getFOS"> <action name="getFOS">
<java> <spark xmlns="uri:oozie:spark-action:0.2">
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class> <master>yarn</master>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <mode>cluster</mode>
<name>Gets Data from FOS csv file</name>
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSSparkJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${fosPath}</arg> <arg>--sourcePath</arg><arg>${fosPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/fos</arg> <arg>--outputPath</arg><arg>${workingDir}/input/fos</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel</arg> </spark>
</java>
<ok to="prepareFos"/> <ok to="prepareFos"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepareFos"> <action name="prepareFos">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
@ -144,17 +157,30 @@
</action> </action>
<action name="getSDG"> <action name="getSDG">
<java> <spark xmlns="uri:oozie:spark-action:0.2">
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class> <master>yarn</master>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <mode>cluster</mode>
<name>Gets Data from SDG csv file</name>
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetSDGSparkJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sdgPath}</arg> <arg>--sourcePath</arg><arg>${sdgPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg> <arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel</arg> </spark>
</java>
<ok to="prepareSDG"/> <ok to="prepareSDG"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="prepareSDG"> <action name="prepareSDG">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -10,7 +10,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareTest { public class PrepareTest {
@ -148,37 +148,6 @@ public class PrepareTest {
} }
@Test
void getFOSFileTest() throws IOException, ClassNotFoundException {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos_sbs.csv")
.getPath();
final String outputPath = workingDir.toString() + "/fos.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
FOSDataModel fos = new ObjectMapper().readValue(line, FOSDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(fos));
count += 1;
}
assertEquals(39, count);
}
@Test @Test
void fosPrepareTest() throws Exception { void fosPrepareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
@ -206,7 +175,6 @@ public class PrepareTest {
assertEquals(20, tmp.count()); assertEquals(20, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
@ -249,35 +217,6 @@ public class PrepareTest {
} }
@Test
void getSDGFileTest() throws IOException, ClassNotFoundException {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv")
.getPath();
final String outputPath = workingDir.toString() + "/sdg.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg));
count += 1;
}
assertEquals(37, count);
}
@Test @Test
void sdgPrepareTest() throws Exception { void sdgPrepareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
@ -305,7 +244,6 @@ public class PrepareTest {
assertEquals(32, tmp.count()); assertEquals(32, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
@ -321,33 +259,59 @@ public class PrepareTest {
.collect() .collect()
.contains("8. Economic growth")); .contains("8. Economic growth"));
}
@Test
void test3() throws Exception {
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_fos_results_20_12_2021.csv.gz";
final String outputPath = workingDir.toString() + "/fos.json";
GetFOSSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
"-outputPath", outputPath
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<FOSDataModel> tmp = sc
.textFile(outputPath)
.map(item -> OBJECT_MAPPER.readValue(item, FOSDataModel.class));
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel1() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel2() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getLevel3() != null));
} }
@Disabled
@Test @Test
void test2() throws Exception { void test4() throws Exception {
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz"; final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
final String outputPath = workingDir.toString() + "/sdg.json"; final String outputPath = workingDir.toString() + "/sdg.json";
GetSDGSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
new GetInputData() "-outputPath", outputPath
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader( });
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line; final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg)); JavaRDD<SDGDataModel> tmp = sc
count += 1; .textFile(outputPath)
} .map(item -> OBJECT_MAPPER.readValue(item, SDGDataModel.class));
tmp.foreach(t -> Assertions.assertTrue(t.getDoi() != null));
tmp.foreach(t -> Assertions.assertTrue(t.getSbj() != null));
} }
} }

View File

@ -7,7 +7,6 @@ import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -25,6 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
@ -349,7 +349,6 @@ public class ProduceTest {
} }
private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception { private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception {
final String bipPath = getClass() final String bipPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json") .getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json")
@ -402,7 +401,6 @@ public class ProduceTest {
.map(item -> OBJECT_MAPPER.readValue(item, Result.class)); .map(item -> OBJECT_MAPPER.readValue(item, Result.class));
} }
@Test @Test
void produceTestSomeNumbersWithSDG() throws Exception { void produceTestSomeNumbersWithSDG() throws Exception {

View File

@ -8,7 +8,6 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkImportMagIntoDataset { object SparkImportMagIntoDataset {
val datatypedict = Map( val datatypedict = Map(
"bool" -> BooleanType, "bool" -> BooleanType,
"int" -> IntegerType, "int" -> IntegerType,
@ -20,232 +19,33 @@ object SparkImportMagIntoDataset {
"DateTime" -> DateType "DateTime" -> DateType
) )
val stream = Map( val stream = Map(
"Affiliations" -> Tuple2( "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"mag/Affiliations.txt", "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
Seq( "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"AffiliationId:long", "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"Rank:uint", "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"NormalizedName:string", "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"DisplayName:string", "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"GridId:string", "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
"OfficialPage:string", "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"WikiPage:string", "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"PaperCount:long", "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
"PaperFamilyCount:long", "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")),
"CitationCount:long", "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
"Iso3166Code:string", "PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
"Latitude:float?", "PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
"Longitude:float?", "PaperMeSH" -> Tuple2("advanced/PaperMeSH.txt", Seq("PaperId:long", "DescriptorUI:string", "DescriptorName:string", "QualifierUI:string", "QualifierName:string", "IsMajorTopic:bool")),
"CreatedDate:DateTime" "PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
) "PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
), "PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
"AuthorExtendedAttributes" -> Tuple2( "PaperUrls" -> Tuple2("mag/PaperUrls.txt", Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")),
"mag/AuthorExtendedAttributes.txt", "Papers" -> Tuple2("mag/Papers.txt", Seq("PaperId:long", "Rank:uint", "Doi:string", "DocType:string", "PaperTitle:string", "OriginalTitle:string", "BookTitle:string", "Year:int?", "Date:DateTime?", "OnlineDate:DateTime?", "Publisher:string", "JournalId:long?", "ConferenceSeriesId:long?", "ConferenceInstanceId:long?", "Volume:string", "Issue:string", "FirstPage:string", "LastPage:string", "ReferenceCount:long", "CitationCount:long", "EstimatedCitation:long", "OriginalVenue:string", "FamilyId:long?", "FamilyRank:uint?", "DocSubTypes:string", "CreatedDate:DateTime")),
Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string") "RelatedFieldOfStudy" -> Tuple2("advanced/RelatedFieldOfStudy.txt", Seq("FieldOfStudyId1:long", "Type1:string", "FieldOfStudyId2:long", "Type2:string", "Rank:float"))
),
"Authors" -> Tuple2(
"mag/Authors.txt",
Seq(
"AuthorId:long",
"Rank:uint",
"NormalizedName:string",
"DisplayName:string",
"LastKnownAffiliationId:long?",
"PaperCount:long",
"PaperFamilyCount:long",
"CitationCount:long",
"CreatedDate:DateTime"
)
),
"ConferenceInstances" -> Tuple2(
"mag/ConferenceInstances.txt",
Seq(
"ConferenceInstanceId:long",
"NormalizedName:string",
"DisplayName:string",
"ConferenceSeriesId:long",
"Location:string",
"OfficialUrl:string",
"StartDate:DateTime?",
"EndDate:DateTime?",
"AbstractRegistrationDate:DateTime?",
"SubmissionDeadlineDate:DateTime?",
"NotificationDueDate:DateTime?",
"FinalVersionDueDate:DateTime?",
"PaperCount:long",
"PaperFamilyCount:long",
"CitationCount:long",
"Latitude:float?",
"Longitude:float?",
"CreatedDate:DateTime"
)
),
"ConferenceSeries" -> Tuple2(
"mag/ConferenceSeries.txt",
Seq(
"ConferenceSeriesId:long",
"Rank:uint",
"NormalizedName:string",
"DisplayName:string",
"PaperCount:long",
"PaperFamilyCount:long",
"CitationCount:long",
"CreatedDate:DateTime"
)
),
"EntityRelatedEntities" -> Tuple2(
"advanced/EntityRelatedEntities.txt",
Seq(
"EntityId:long",
"EntityType:string",
"RelatedEntityId:long",
"RelatedEntityType:string",
"RelatedType:int",
"Score:float"
)
),
"FieldOfStudyChildren" -> Tuple2(
"advanced/FieldOfStudyChildren.txt",
Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")
),
"FieldOfStudyExtendedAttributes" -> Tuple2(
"advanced/FieldOfStudyExtendedAttributes.txt",
Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")
),
"FieldsOfStudy" -> Tuple2(
"advanced/FieldsOfStudy.txt",
Seq(
"FieldOfStudyId:long",
"Rank:uint",
"NormalizedName:string",
"DisplayName:string",
"MainType:string",
"Level:int",
"PaperCount:long",
"PaperFamilyCount:long",
"CitationCount:long",
"CreatedDate:DateTime"
)
),
"Journals" -> Tuple2(
"mag/Journals.txt",
Seq(
"JournalId:long",
"Rank:uint",
"NormalizedName:string",
"DisplayName:string",
"Issn:string",
"Publisher:string",
"Webpage:string",
"PaperCount:long",
"PaperFamilyCount:long",
"CitationCount:long",
"CreatedDate:DateTime"
)
),
"PaperAbstractsInvertedIndex" -> Tuple2(
"nlp/PaperAbstractsInvertedIndex.txt.*",
Seq("PaperId:long", "IndexedAbstract:string")
),
"PaperAuthorAffiliations" -> Tuple2(
"mag/PaperAuthorAffiliations.txt",
Seq(
"PaperId:long",
"AuthorId:long",
"AffiliationId:long?",
"AuthorSequenceNumber:uint",
"OriginalAuthor:string",
"OriginalAffiliation:string"
)
),
"PaperCitationContexts" -> Tuple2(
"nlp/PaperCitationContexts.txt",
Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")
),
"PaperExtendedAttributes" -> Tuple2(
"mag/PaperExtendedAttributes.txt",
Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")
),
"PaperFieldsOfStudy" -> Tuple2(
"advanced/PaperFieldsOfStudy.txt",
Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")
),
"PaperMeSH" -> Tuple2(
"advanced/PaperMeSH.txt",
Seq(
"PaperId:long",
"DescriptorUI:string",
"DescriptorName:string",
"QualifierUI:string",
"QualifierName:string",
"IsMajorTopic:bool"
)
),
"PaperRecommendations" -> Tuple2(
"advanced/PaperRecommendations.txt",
Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")
),
"PaperReferences" -> Tuple2(
"mag/PaperReferences.txt",
Seq("PaperId:long", "PaperReferenceId:long")
),
"PaperResources" -> Tuple2(
"mag/PaperResources.txt",
Seq(
"PaperId:long",
"ResourceType:int",
"ResourceUrl:string",
"SourceUrl:string",
"RelationshipType:int"
)
),
"PaperUrls" -> Tuple2(
"mag/PaperUrls.txt",
Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")
),
"Papers" -> Tuple2(
"mag/Papers.txt",
Seq(
"PaperId:long",
"Rank:uint",
"Doi:string",
"DocType:string",
"PaperTitle:string",
"OriginalTitle:string",
"BookTitle:string",
"Year:int?",
"Date:DateTime?",
"OnlineDate:DateTime?",
"Publisher:string",
"JournalId:long?",
"ConferenceSeriesId:long?",
"ConferenceInstanceId:long?",
"Volume:string",
"Issue:string",
"FirstPage:string",
"LastPage:string",
"ReferenceCount:long",
"CitationCount:long",
"EstimatedCitation:long",
"OriginalVenue:string",
"FamilyId:long?",
"FamilyRank:uint?",
"CreatedDate:DateTime"
)
),
"RelatedFieldOfStudy" -> Tuple2(
"advanced/RelatedFieldOfStudy.txt",
Seq(
"FieldOfStudyId1:long",
"Type1:string",
"FieldOfStudyId2:long",
"Type2:string",
"Rank:float"
)
)
) )
def getSchema(streamName: String): StructType = { def getSchema(streamName: String): StructType = {
var schema = new StructType() var schema = new StructType()
val d: Seq[String] = stream(streamName)._2 val d: Seq[String] = stream(streamName)._2
@ -261,22 +61,19 @@ object SparkImportMagIntoDataset {
schema schema
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser( val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json")))
IOUtils.toString(
getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/mag/convert_mag_to_oaf_params.json")
)
)
parser.parseArgument(args) parser.parseArgument(args)
val spark: SparkSession = val spark: SparkSession =
SparkSession SparkSession
.builder() .builder()
.config(conf) .config(conf)
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.master(parser.get("master")) .master(parser.get("master")).getOrCreate()
.getOrCreate()
stream.foreach { case (k, v) => stream.foreach { case (k, v) =>
val s: StructType = getSchema(k) val s: StructType = getSchema(k)

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -64,6 +65,18 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
final String workingPath = parser.get("workingDir"); final String workingPath = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final int iterations = Optional
.ofNullable(parser.get("iterations"))
.map(v -> {
if (Integer.valueOf(v) < MAX_ITERATION) {
return Integer.valueOf(v);
} else
return MAX_ITERATION;
})
.orElse(MAX_ITERATION);
log.info("iterations: {}", iterations);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
@ -77,7 +90,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
resultOrganizationPath, resultOrganizationPath,
relationPath, relationPath,
workingPath, workingPath,
outputPath)); outputPath,
iterations));
} }
public static void execPropagation(SparkSession spark, public static void execPropagation(SparkSession spark,
@ -86,7 +100,13 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
String resultOrganizationPath, String resultOrganizationPath,
String graphPath, String graphPath,
String workingPath, String workingPath,
String outputPath) { String outputPath,
int iterations) {
if (iterations == 1) {
doPropagateOnce(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
workingPath, outputPath);
} else {
final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE);
final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO);
@ -105,9 +125,22 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
doPropagate( doPropagate(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
workingPath, outputPath, propagationCounter); workingPath, outputPath, propagationCounter);
}
} }
private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath,
String outputPath) {
StepActions
.execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath);
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
}
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath, String outputPath, String resultOrganizationPath, String graphPath, String workingPath, String outputPath,
PropagationCounter propagationCounter) { PropagationCounter propagationCounter) {

View File

@ -46,5 +46,11 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files", "paramDescription": "the path used to store temporary output files",
"paramRequired": true "paramRequired": true
},
{
"paramName": "it",
"paramLongName": "iterations",
"paramDescription": "the number of iterations to be computed",
"paramRequired": false
} }
] ]

View File

@ -1,4 +1,4 @@
<workflow-app name="affiliation_from_instrepo_propagation" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="affiliation_from_semrel_propagation" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>sourcePath</name>
@ -181,6 +181,7 @@
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg> <arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--workingDir</arg><arg>${workingDir}/working</arg> <arg>--workingDir</arg><arg>${workingDir}/working</arg>
<arg>--iterations</arg><arg>${iterations}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>