1
0
Fork 0

Merge pull request 'SDG Integration' (#178) from SDG into beta

Reviewed-on: D-Net/dnet-hadoop#178
This commit is contained in:
Miriam Baglioni 2021-12-23 14:50:00 +01:00
commit a706ba0c08
12 changed files with 586 additions and 66 deletions

View File

@ -19,14 +19,20 @@ public class Constants {
public static final String DOI = "doi"; public static final String DOI = "doi";
public static final char DEFAULT_DELIMITER = ',';
public static final String UPDATE_DATA_INFO_TYPE = "update"; public static final String UPDATE_DATA_INFO_TYPE = "update";
public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos"; public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE"; public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip"; public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
public static final String FOS_CLASS_ID = "FOS"; public static final String FOS_CLASS_ID = "FOS";
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification"; public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
public static final String SDG_CLASS_ID = "SDG";
public static final String SDG_CLASS_NAME = "Sustainable Development Goals";
public static final String NULL = "NULL"; public static final String NULL = "NULL";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -49,7 +55,7 @@ public class Constants {
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); .map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
} }
public static StructuredProperty getSubject(String sbj, String classid, String classname) { public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) {
if (sbj.equals(NULL)) if (sbj.equals(NULL))
return null; return null;
StructuredProperty sp = new StructuredProperty(); StructuredProperty sp = new StructuredProperty();
@ -72,7 +78,7 @@ public class Constants {
false, false,
OafMapperUtils OafMapperUtils
.qualifier( .qualifier(
UPDATE_SUBJECT_FOS_CLASS_ID, diqualifierclassid,
UPDATE_CLASS_NAME, UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS),

View File

@ -6,6 +6,7 @@ import java.io.InputStreamReader;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -15,13 +16,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.GetCSV;
public class GetFOSData implements Serializable { import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
public class GetInputData implements Serializable {
private static final Logger log = LoggerFactory.getLogger(GetInputData.class);
private static final Logger log = LoggerFactory.getLogger(GetFOSData.class);
public static final char DEFAULT_DELIMITER = ',';
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -29,9 +31,9 @@ public class GetFOSData implements Serializable {
.toString( .toString(
Objects Objects
.requireNonNull( .requireNonNull(
GetFOSData.class GetInputData.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json")))); "/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"))));
parser.parseArgument(args); parser.parseArgument(args);
@ -60,16 +62,17 @@ public class GetFOSData implements Serializable {
FileSystem fileSystem = FileSystem.get(conf); FileSystem fileSystem = FileSystem.get(conf);
new GetFOSData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem); new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
} }
public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs) public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs)
throws IOException, ClassNotFoundException { throws IOException, ClassNotFoundException {
// reads the csv and writes it as its json equivalent // reads the csv and writes it as its json equivalent
try (InputStreamReader reader = new InputStreamReader(fs.open(new Path(inputPath)))) { try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) {
GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter); eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
} }
} }

View File

@ -66,20 +66,20 @@ public class PrepareFOSSparkJob implements Serializable {
Dataset<FOSDataModel> fosDataset = readPath(spark, sourcePath, FOSDataModel.class); Dataset<FOSDataModel> fosDataset = readPath(spark, sourcePath, FOSDataModel.class);
fosDataset fosDataset
.groupByKey((MapFunction<FOSDataModel, String>) v -> v.getDoi(), Encoders.STRING()) .groupByKey((MapFunction<FOSDataModel, String>) v -> v.getDoi().toLowerCase(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, FOSDataModel, Result>) (k, it) -> { .mapGroups((MapGroupsFunction<String, FOSDataModel, Result>) (k, it) -> {
Result r = new Result(); Result r = new Result();
FOSDataModel first = it.next(); FOSDataModel first = it.next();
r.setId(DHPUtils.generateUnresolvedIdentifier(first.getDoi(), DOI)); r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
HashSet<String> level1 = new HashSet<>(); HashSet<String> level1 = new HashSet<>();
HashSet<String> level2 = new HashSet<>(); HashSet<String> level2 = new HashSet<>();
HashSet<String> level3 = new HashSet<>(); HashSet<String> level3 = new HashSet<>();
addLevels(level1, level2, level3, first); addLevels(level1, level2, level3, first);
it.forEachRemaining(v -> addLevels(level1, level2, level3, v)); it.forEachRemaining(v -> addLevels(level1, level2, level3, v));
List<StructuredProperty> sbjs = new ArrayList<>(); List<StructuredProperty> sbjs = new ArrayList<>();
level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME))); level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
r.setSubject(sbjs); r.setSubject(sbjs);
return r; return r;
}, Encoders.bean(Result.class)) }, Encoders.bean(Result.class))

View File

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import static eu.dnetlib.dhp.actionmanager.Constants.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class PrepareSDGSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareSDGSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
doPrepare(
spark,
sourcePath,
outputPath);
});
}
private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) {
Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class);
sdgDataset.groupByKey((MapFunction<SDGDataModel,String>)r -> r.getDoi().toLowerCase(),Encoders.STRING())
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>)(k,it) -> {
Result r = new Result();
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
SDGDataModel first = it.next();
List<StructuredProperty>sbjs = new ArrayList<>();
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
r.setSubject(sbjs);
return r;
},Encoders.bean(Result.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/sdg");
}
}

View File

@ -0,0 +1,48 @@
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
import com.opencsv.bean.CsvBindByPosition;
import java.io.Serializable;
public class SDGDataModel implements Serializable{
@CsvBindByPosition(position = 0)
// @CsvBindByName(column = "doi")
private String doi;
@CsvBindByPosition(position = 1)
// @CsvBindByName(column = "sdg")
private String sbj;
public SDGDataModel() {
}
public SDGDataModel(String doi, String sbj) {
this.doi = doi;
this.sbj = sbj;
}
public static SDGDataModel newInstance(String d, String sbj) {
return new SDGDataModel(d, sbj);
}
public String getDoi() {
return doi;
}
public void setDoi(String doi) {
this.doi = doi;
}
public String getSbj() {
return sbj;
}
public void setSbj(String sbj) {
this.sbj = sbj;
}
}

View File

@ -79,6 +79,7 @@
<fork name="prepareInfo"> <fork name="prepareInfo">
<path start="prepareBip"/> <path start="prepareBip"/>
<path start="getFOS"/> <path start="getFOS"/>
<path start="getSDG"/>
</fork> </fork>
<action name="prepareBip"> <action name="prepareBip">
@ -107,7 +108,7 @@
<action name="getFOS"> <action name="getFOS">
<java> <java>
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSData</main-class> <main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${fosPath}</arg> <arg>--sourcePath</arg><arg>${fosPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/fos</arg> <arg>--outputPath</arg><arg>${workingDir}/input/fos</arg>
@ -142,6 +143,42 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="getSDG">
<java>
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--sourcePath</arg><arg>${sdgPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg>
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel</arg>
</java>
<ok to="prepareSDG"/>
<error to="Kill"/>
</action>
<action name="prepareSDG">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Produces the unresolved from FOS!</name>
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.PrepareSDGSparkJob</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingDir}/input/sdg</arg>
<arg>--outputPath</arg><arg>${workingDir}/prepared</arg>
</spark>
<ok to="join"/>
<error to="Kill"/>
</action>
<join name="join" to="produceUnresolved"/> <join name="join" to="produceUnresolved"/>

View File

@ -10,6 +10,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -18,17 +19,13 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel; import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class PrepareTest { public class PrepareTest {
@ -159,7 +156,7 @@ public class PrepareTest {
.getPath(); .getPath();
final String outputPath = workingDir.toString() + "/fos.json"; final String outputPath = workingDir.toString() + "/fos.json";
new GetFOSData() new GetInputData()
.doRewrite( .doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel", sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel",
',', fs); ',', fs);
@ -180,6 +177,8 @@ public class PrepareTest {
} }
@Test @Test
void fosPrepareTest() throws Exception { void fosPrepareTest() throws Exception {
final String sourcePath = getClass() final String sourcePath = getClass()
@ -204,15 +203,9 @@ public class PrepareTest {
String doi1 = "unresolved::10.3390/s18072310::doi"; String doi1 = "unresolved::10.3390/s18072310::doi";
assertEquals(50, tmp.count()); assertEquals(20, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue(
tmp
.filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue())
.collect()
.contains("engineering and technology"));
assertTrue( assertTrue(
tmp tmp
@ -220,16 +213,16 @@ public class PrepareTest {
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("nano-technology")); .contains("04 agricultural and veterinary sciences"));
assertTrue( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi1)) .filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("nanoscience & nanotechnology")); .contains("0404 agricultural biotechnology"));
String doi = "unresolved::10.1111/1365-2656.12831::doi"; String doi = "unresolved::10.1007/s11164-020-04383-6::doi";
assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count()); assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count());
assertTrue( assertTrue(
tmp tmp
@ -237,7 +230,7 @@ public class PrepareTest {
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("psychology and cognitive sciences")); .contains("01 natural sciences"));
assertTrue( assertTrue(
tmp tmp
@ -245,15 +238,116 @@ public class PrepareTest {
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("social sciences")); .contains("0104 chemical sciences"));
assertFalse( assertTrue(
tmp tmp
.filter(r -> r.getId().equals(doi)) .filter(r -> r.getId().equals(doi))
.flatMap(r -> r.getSubject().iterator()) .flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue()) .map(sbj -> sbj.getValue())
.collect() .collect()
.contains("NULL")); .contains("010402 general chemistry"));
} }
@Test
void getSDGFileTest() throws IOException, ClassNotFoundException {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv")
.getPath();
final String outputPath = workingDir.toString() + "/sdg.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg));
count += 1;
}
assertEquals(37, count);
}
@Test
void sdgPrepareTest() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
.getPath();
PrepareSDGSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
"-outputPath", workingDir.toString() + "/work"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc
.textFile(workingDir.toString() + "/work/sdg")
.map(item -> OBJECT_MAPPER.readValue(item, Result.class));
String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi";
assertEquals(32, tmp.count());
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
assertTrue(
tmp
.filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue())
.collect()
.contains("3. Good health"));
assertTrue(
tmp
.filter(r -> r.getId().equals(doi1))
.flatMap(r -> r.getSubject().iterator())
.map(sbj -> sbj.getValue())
.collect()
.contains("8. Economic growth"));
}
@Disabled
@Test
void test2() throws Exception {
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
final String outputPath = workingDir.toString() + "/sdg.json";
new GetInputData()
.doRewrite(
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
',', fs);
BufferedReader in = new BufferedReader(
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
String line;
int count = 0;
while ((line = in.readLine()) != null) {
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
System.out.println(new ObjectMapper().writeValueAsString(sdg));
count += 1;
}
}
} }

View File

@ -7,6 +7,7 @@ import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.Constants;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -348,4 +349,175 @@ public class ProduceTest {
} }
private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception {
final String bipPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json")
.getPath();
PrepareBipFinder
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", bipPath,
"--outputPath", workingDir.toString() + "/work"
});
final String fosPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json")
.getPath();
PrepareFOSSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", fosPath,
"-outputPath", workingDir.toString() + "/work"
});
final String sdgPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
.getPath();
PrepareSDGSparkJob
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sdgPath,
"-outputPath", workingDir.toString() + "/work"
});
SparkSaveUnresolved.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", workingDir.toString() + "/work",
"-outputPath", workingDir.toString() + "/unresolved"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc
.textFile(workingDir.toString() + "/unresolved")
.map(item -> OBJECT_MAPPER.readValue(item, Result.class));
}
@Test
void produceTestSomeNumbersWithSDG() throws Exception {
final String doi = "unresolved::10.3390/s18072310::doi";
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
Assertions.assertEquals(136, tmp.count());
Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count());
Assertions
.assertEquals(
50, tmp
.filter(row -> !row.getId().equals(doi))
.filter(row -> row.getSubject() != null)
.count());
Assertions
.assertEquals(
85,
tmp
.filter(row -> !row.getId().equals(doi))
.filter(r -> r.getInstance() != null && r.getInstance().size() > 0)
.count());
}
@Test
void produceTest7Subjects() throws Exception {
final String doi = "unresolved::10.3390/s18072310::doi";
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
Assertions
.assertEquals(
7, tmp
.filter(row -> row.getId().equals(doi))
.collect()
.get(0)
.getSubject()
.size());
List<StructuredProperty> sbjs = tmp
.filter(row -> row.getId().equals(doi))
.flatMap(row -> row.getSubject().iterator())
.collect();
Assertions
.assertEquals(
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences")));
Assertions
.assertEquals(
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology")));
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science")));
Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences")));
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences")));
Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics")));
Assertions
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty")));
}
@Test
void produceTestSubjectsWithSDG() throws Exception {
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
List<StructuredProperty> sbjs_sdg = tmp
.filter(row -> row.getSubject() != null && row.getSubject().size() > 0)
.flatMap(row -> row.getSubject().iterator())
.filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID))
.collect();
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(
"Sustainable Development Goals", sbj.getQualifier().getClassname()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
sbjs_sdg
.forEach(
sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
sbjs_sdg
.forEach(
sbj -> Assertions
.assertEquals(
ModelConstants.DNET_PROVENANCE_ACTIONS,
sbj.getDataInfo().getProvenanceaction().getSchemename()));
}
} }

View File

@ -0,0 +1,37 @@
{"doi":"10.1001/amaguidesnewsletters.2019.mayjun02","sbj":"10. No inequality"}
{"doi":"10.1001/amaguidesnewsletters.2019.novdec01","sbj":"10. No inequality"}
{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"3. Good health"}
{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"8. Economic growth"}
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb01","sbj":"8. Economic growth"}
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"3. Good health"}
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"8. Economic growth"}
{"doi":"10.1001/amaguidesnewsletters.2020.julaug01","sbj":"3. Good health"}
{"doi":"10.1001/amaguidesnewsletters.2020.marapr01","sbj":"3. Good health"}
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun01","sbj":"3. Good health"}
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"16. Peace & justice"}
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"10. No inequality"}
{"doi":"10.1001/amaguidesnewsletters.2021.julaug01","sbj":"1. No poverty"}
{"doi":"10.1001/amaguidesnewsletters.2021.mayjune01","sbj":"10. No inequality"}
{"doi":"10.1001/amaguidesnewsletters.2021.mayjune02","sbj":"10. No inequality"}
{"doi":"10.4336/2021.pfb.41e201902078","sbj":"15. Life on land"}
{"doi":"10.4337/ejeep.2019.00045","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2019.00050","sbj":"1. No poverty"}
{"doi":"10.4337/ejeep.2019.0045","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2019.0050","sbj":"1. No poverty"}
{"doi":"10.4337/ejeep.2019.0051","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2019.0052","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2020.0058","sbj":"1. No poverty"}
{"doi":"10.4337/ejeep.2020.0058","sbj":"10. No inequality"}
{"doi":"10.4337/ejeep.2020.0060","sbj":"10. No inequality"}
{"doi":"10.4337/ejeep.2020.0065","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2020.02.03","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2020.02.05","sbj":"8. Economic growth"}
{"doi":"10.4337/ejeep.2020.02.06","sbj":"16. Peace & justice"}
{"doi":"10.4337/ejeep.2020.02.09","sbj":"16. Peace & justice"}
{"doi":"10.4337/roke.2020.01.01","sbj":"16. Peace & justice"}
{"doi":"10.4337/roke.2020.01.03","sbj":"16. Peace & justice"}
{"doi":"10.4337/roke.2020.01.05","sbj":"1. No poverty"}
{"doi":"10.4337/roke.2020.01.05","sbj":"8. Economic growth"}
{"doi":"10.4337/roke.2020.01.07","sbj":"8. Economic growth"}
{"doi":"10.4337/roke.2020.02.03","sbj":"8. Economic growth"}
{"doi":"10.3390/s18072310","sbj":"1. No poverty"}

View File

@ -0,0 +1,37 @@
10.1001/amaguidesnewsletters.2019.mayjun02,10. No inequality
10.1001/amaguidesnewsletters.2019.novdec01,10. No inequality
10.1001/amaguidesnewsletters.2019.sepoct02,3. Good health
10.1001/amaguidesnewsletters.2019.sepoct02,8. Economic growth
10.1001/amaguidesnewsletters.2020.janfeb01,8. Economic growth
10.1001/amaguidesnewsletters.2020.janfeb02,3. Good health
10.1001/amaguidesnewsletters.2020.janfeb02,8. Economic growth
10.1001/amaguidesnewsletters.2020.julaug01,3. Good health
10.1001/amaguidesnewsletters.2020.marapr01,3. Good health
10.1001/amaguidesnewsletters.2020.mayjun01,3. Good health
10.1001/amaguidesnewsletters.2020.mayjun02,16. Peace & justice
10.1001/amaguidesnewsletters.2020.mayjun02,10. No inequality
10.1001/amaguidesnewsletters.2021.julaug01,1. No poverty
10.1001/amaguidesnewsletters.2021.mayjune01,10. No inequality
10.1001/amaguidesnewsletters.2021.mayjune02,10. No inequality
10.4336/2021.pfb.41e201902078,15. Life on land
10.4337/ejeep.2019.00045,16. Peace & justice
10.4337/ejeep.2019.00050,1. No poverty
10.4337/ejeep.2019.0045,16. Peace & justice
10.4337/ejeep.2019.0050,1. No poverty
10.4337/ejeep.2019.0051,16. Peace & justice
10.4337/ejeep.2019.0052,16. Peace & justice
10.4337/ejeep.2020.0058,1. No poverty
10.4337/ejeep.2020.0058,10. No inequality
10.4337/ejeep.2020.0060,10. No inequality
10.4337/ejeep.2020.0065,16. Peace & justice
10.4337/ejeep.2020.02.03,16. Peace & justice
10.4337/ejeep.2020.02.05,8. Economic growth
10.4337/ejeep.2020.02.06,16. Peace & justice
10.4337/ejeep.2020.02.09,16. Peace & justice
10.4337/roke.2020.01.01,16. Peace & justice
10.4337/roke.2020.01.03,16. Peace & justice
10.4337/roke.2020.01.05,1. No poverty
10.4337/roke.2020.01.05,8. Economic growth
10.4337/roke.2020.01.07,8. Economic growth
10.4337/roke.2020.02.03,8. Economic growth
10.4337/roke.2020.02.04,1. No poverty
1 10.1001/amaguidesnewsletters.2019.mayjun02 10. No inequality
2 10.1001/amaguidesnewsletters.2019.novdec01 10. No inequality
3 10.1001/amaguidesnewsletters.2019.sepoct02 3. Good health
4 10.1001/amaguidesnewsletters.2019.sepoct02 8. Economic growth
5 10.1001/amaguidesnewsletters.2020.janfeb01 8. Economic growth
6 10.1001/amaguidesnewsletters.2020.janfeb02 3. Good health
7 10.1001/amaguidesnewsletters.2020.janfeb02 8. Economic growth
8 10.1001/amaguidesnewsletters.2020.julaug01 3. Good health
9 10.1001/amaguidesnewsletters.2020.marapr01 3. Good health
10 10.1001/amaguidesnewsletters.2020.mayjun01 3. Good health
11 10.1001/amaguidesnewsletters.2020.mayjun02 16. Peace & justice
12 10.1001/amaguidesnewsletters.2020.mayjun02 10. No inequality
13 10.1001/amaguidesnewsletters.2021.julaug01 1. No poverty
14 10.1001/amaguidesnewsletters.2021.mayjune01 10. No inequality
15 10.1001/amaguidesnewsletters.2021.mayjune02 10. No inequality
16 10.4336/2021.pfb.41e201902078 15. Life on land
17 10.4337/ejeep.2019.00045 16. Peace & justice
18 10.4337/ejeep.2019.00050 1. No poverty
19 10.4337/ejeep.2019.0045 16. Peace & justice
20 10.4337/ejeep.2019.0050 1. No poverty
21 10.4337/ejeep.2019.0051 16. Peace & justice
22 10.4337/ejeep.2019.0052 16. Peace & justice
23 10.4337/ejeep.2020.0058 1. No poverty
24 10.4337/ejeep.2020.0058 10. No inequality
25 10.4337/ejeep.2020.0060 10. No inequality
26 10.4337/ejeep.2020.0065 16. Peace & justice
27 10.4337/ejeep.2020.02.03 16. Peace & justice
28 10.4337/ejeep.2020.02.05 8. Economic growth
29 10.4337/ejeep.2020.02.06 16. Peace & justice
30 10.4337/ejeep.2020.02.09 16. Peace & justice
31 10.4337/roke.2020.01.01 16. Peace & justice
32 10.4337/roke.2020.01.03 16. Peace & justice
33 10.4337/roke.2020.01.05 1. No poverty
34 10.4337/roke.2020.01.05 8. Economic growth
35 10.4337/roke.2020.01.07 8. Economic growth
36 10.4337/roke.2020.02.03 8. Economic growth
37 10.4337/roke.2020.02.04 1. No poverty

View File

@ -116,22 +116,22 @@ public class ExtractRelationFromEntityTest {
.getType()); .getType());
Assertions Assertions
.assertEquals( .assertEquals(
"context", verificationDataset "context", verificationDataset
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00")) .filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
.collectAsList() .collectAsList()
.get(0) .get(0)
.getSource() .getSource()
.getType()); .getType());
Assertions Assertions
.assertEquals( .assertEquals(
"result", verificationDataset "result", verificationDataset
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00")) .filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
.collectAsList() .collectAsList()
.get(0) .get(0)
.getTarget() .getTarget()
.getType()); .getType());
Assertions Assertions
.assertEquals( .assertEquals(
"IsRelatedTo", verificationDataset "IsRelatedTo", verificationDataset
@ -151,22 +151,22 @@ public class ExtractRelationFromEntityTest {
.getType()); .getType());
Assertions Assertions
.assertEquals( .assertEquals(
"context", verificationDataset "context", verificationDataset
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00")) .filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
.collectAsList() .collectAsList()
.get(0) .get(0)
.getTarget() .getTarget()
.getType()); .getType());
Assertions Assertions
.assertEquals( .assertEquals(
"result", verificationDataset "result", verificationDataset
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00")) .filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
.collectAsList() .collectAsList()
.get(0) .get(0)
.getSource() .getSource()
.getType()); .getType());
} }
} }