forked from antonis.lempesis/dnet-hadoop
[SDG] logic to create unresolved entities out of SDG input. This changes also some classes related to FOS to reuse the same code. The code under createunresolvedentities create results with the merged update of the the inputs provided (bip at the level of the isntance, fos and sdg for subjects)
This commit is contained in:
parent
2a67ee13ec
commit
7a1b440413
|
@ -19,14 +19,20 @@ public class Constants {
|
|||
|
||||
public static final String DOI = "doi";
|
||||
|
||||
public static final char DEFAULT_DELIMITER = ',';
|
||||
|
||||
public static final String UPDATE_DATA_INFO_TYPE = "update";
|
||||
public static final String UPDATE_SUBJECT_FOS_CLASS_ID = "subject:fos";
|
||||
public static final String UPDATE_CLASS_NAME = "Inferred by OpenAIRE";
|
||||
public static final String UPDATE_MEASURE_BIP_CLASS_ID = "measure:bip";
|
||||
public static final String UPDATE_SUBJECT_SDG_CLASS_ID = "subject:sdg";
|
||||
|
||||
public static final String FOS_CLASS_ID = "FOS";
|
||||
public static final String FOS_CLASS_NAME = "Fields of Science and Technology classification";
|
||||
|
||||
public static final String SDG_CLASS_ID = "SDG";
|
||||
public static final String SDG_CLASS_NAME = "Sustainable Development Goals";
|
||||
|
||||
public static final String NULL = "NULL";
|
||||
|
||||
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
@ -49,7 +55,7 @@ public class Constants {
|
|||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||
}
|
||||
|
||||
public static StructuredProperty getSubject(String sbj, String classid, String classname) {
|
||||
public static StructuredProperty getSubject(String sbj, String classid, String classname, String diqualifierclassid) {
|
||||
if (sbj.equals(NULL))
|
||||
return null;
|
||||
StructuredProperty sp = new StructuredProperty();
|
||||
|
@ -72,7 +78,7 @@ public class Constants {
|
|||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
UPDATE_SUBJECT_FOS_CLASS_ID,
|
||||
diqualifierclassid,
|
||||
UPDATE_CLASS_NAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
|
|
|
@ -6,6 +6,7 @@ import java.io.InputStreamReader;
|
|||
import java.io.Serializable;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -15,13 +16,14 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.collection.GetCSV;
|
||||
|
||||
public class GetFOSData implements Serializable {
|
||||
import static eu.dnetlib.dhp.actionmanager.Constants.DEFAULT_DELIMITER;
|
||||
|
||||
public class GetInputData implements Serializable {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GetInputData.class);
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GetFOSData.class);
|
||||
|
||||
public static final char DEFAULT_DELIMITER = ',';
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
|
@ -29,9 +31,9 @@ public class GetFOSData implements Serializable {
|
|||
.toString(
|
||||
Objects
|
||||
.requireNonNull(
|
||||
GetFOSData.class
|
||||
GetInputData.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_fos_parameters.json"))));
|
||||
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/get_input_parameters.json"))));
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
|
@ -60,16 +62,17 @@ public class GetFOSData implements Serializable {
|
|||
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
|
||||
new GetFOSData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
|
||||
new GetInputData().doRewrite(sourcePath, outputPath, classForName, delimiter, fileSystem);
|
||||
|
||||
}
|
||||
|
||||
public void doRewrite(String inputPath, String outputFile, String classForName, char delimiter, FileSystem fs)
|
||||
throws IOException, ClassNotFoundException {
|
||||
|
||||
|
||||
// reads the csv and writes it as its json equivalent
|
||||
try (InputStreamReader reader = new InputStreamReader(fs.open(new Path(inputPath)))) {
|
||||
GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
|
||||
try (InputStreamReader reader = new InputStreamReader(new GZIPInputStream(fs.open(new Path(inputPath))))) {
|
||||
eu.dnetlib.dhp.common.collection.GetCSV.getCsv(fs, reader, outputFile, classForName, delimiter);
|
||||
}
|
||||
|
||||
}
|
|
@ -66,20 +66,20 @@ public class PrepareFOSSparkJob implements Serializable {
|
|||
Dataset<FOSDataModel> fosDataset = readPath(spark, sourcePath, FOSDataModel.class);
|
||||
|
||||
fosDataset
|
||||
.groupByKey((MapFunction<FOSDataModel, String>) v -> v.getDoi(), Encoders.STRING())
|
||||
.groupByKey((MapFunction<FOSDataModel, String>) v -> v.getDoi().toLowerCase(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, FOSDataModel, Result>) (k, it) -> {
|
||||
Result r = new Result();
|
||||
FOSDataModel first = it.next();
|
||||
r.setId(DHPUtils.generateUnresolvedIdentifier(first.getDoi(), DOI));
|
||||
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
|
||||
HashSet<String> level1 = new HashSet<>();
|
||||
HashSet<String> level2 = new HashSet<>();
|
||||
HashSet<String> level3 = new HashSet<>();
|
||||
addLevels(level1, level2, level3, first);
|
||||
it.forEachRemaining(v -> addLevels(level1, level2, level3, v));
|
||||
List<StructuredProperty> sbjs = new ArrayList<>();
|
||||
level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME)));
|
||||
level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME)));
|
||||
level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME)));
|
||||
level1.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
|
||||
level2.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
|
||||
level3.forEach(l -> sbjs.add(getSubject(l, FOS_CLASS_ID, FOS_CLASS_NAME, UPDATE_SUBJECT_FOS_CLASS_ID)));
|
||||
r.setSubject(sbjs);
|
||||
return r;
|
||||
}, Encoders.bean(Result.class))
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import static eu.dnetlib.dhp.actionmanager.Constants.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
public class PrepareSDGSparkJob implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class);
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareSDGSparkJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String sourcePath = parser.get("sourcePath");
|
||||
log.info("sourcePath: {}", sourcePath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
doPrepare(
|
||||
spark,
|
||||
sourcePath,
|
||||
|
||||
outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) {
|
||||
Dataset<SDGDataModel> sdgDataset = readPath(spark, sourcePath, SDGDataModel.class);
|
||||
|
||||
|
||||
sdgDataset.groupByKey((MapFunction<SDGDataModel,String>)r -> r.getDoi().toLowerCase(),Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, SDGDataModel, Result>)(k,it) -> {
|
||||
Result r = new Result();
|
||||
r.setId(DHPUtils.generateUnresolvedIdentifier(k, DOI));
|
||||
SDGDataModel first = it.next();
|
||||
List<StructuredProperty>sbjs = new ArrayList<>();
|
||||
sbjs.add(getSubject(first.getSbj(), SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID));
|
||||
it.forEachRemaining(s -> sbjs.add(getSubject(s.getSbj(),SDG_CLASS_ID, SDG_CLASS_NAME, UPDATE_SUBJECT_SDG_CLASS_ID)));
|
||||
r.setSubject(sbjs);
|
||||
return r;
|
||||
},Encoders.bean(Result.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(outputPath + "/sdg");
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -1,6 +1,48 @@
|
|||
package eu.dnetlib.dhp.actionmanager.createunresolvedentities.model;
|
||||
|
||||
import com.opencsv.bean.CsvBindByPosition;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class SDGDataModel implements Serializable {
|
||||
public class SDGDataModel implements Serializable{
|
||||
|
||||
@CsvBindByPosition(position = 0)
|
||||
// @CsvBindByName(column = "doi")
|
||||
private String doi;
|
||||
|
||||
@CsvBindByPosition(position = 1)
|
||||
// @CsvBindByName(column = "sdg")
|
||||
private String sbj;
|
||||
|
||||
|
||||
public SDGDataModel() {
|
||||
|
||||
}
|
||||
|
||||
public SDGDataModel(String doi, String sbj) {
|
||||
this.doi = doi;
|
||||
this.sbj = sbj;
|
||||
|
||||
}
|
||||
|
||||
public static SDGDataModel newInstance(String d, String sbj) {
|
||||
return new SDGDataModel(d, sbj);
|
||||
}
|
||||
|
||||
public String getDoi() {
|
||||
return doi;
|
||||
}
|
||||
|
||||
public void setDoi(String doi) {
|
||||
this.doi = doi;
|
||||
}
|
||||
|
||||
|
||||
public String getSbj() {
|
||||
return sbj;
|
||||
}
|
||||
|
||||
public void setSbj(String sbj) {
|
||||
this.sbj = sbj;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,7 @@
|
|||
<fork name="prepareInfo">
|
||||
<path start="prepareBip"/>
|
||||
<path start="getFOS"/>
|
||||
<path start="getSDG"/>
|
||||
</fork>
|
||||
|
||||
<action name="prepareBip">
|
||||
|
@ -107,7 +108,7 @@
|
|||
|
||||
<action name="getFOS">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetFOSData</main-class>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--sourcePath</arg><arg>${fosPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/input/fos</arg>
|
||||
|
@ -142,6 +143,42 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="getSDG">
|
||||
<java>
|
||||
<main-class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.GetInputData</main-class>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--sourcePath</arg><arg>${sdgPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/input/sdg</arg>
|
||||
<arg>--classForName</arg><arg>eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel</arg>
|
||||
</java>
|
||||
<ok to="prepareSDG"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="prepareSDG">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the unresolved from FOS!</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.createunresolvedentities.PrepareSDGSparkJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${workingDir}/input/sdg</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/prepared</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="join"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<join name="join" to="produceUnresolved"/>
|
||||
|
|
|
@ -10,6 +10,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -18,17 +19,13 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
|
||||
import eu.dnetlib.dhp.common.collection.CollectorException;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class PrepareTest {
|
||||
|
@ -159,7 +156,7 @@ public class PrepareTest {
|
|||
.getPath();
|
||||
final String outputPath = workingDir.toString() + "/fos.json";
|
||||
|
||||
new GetFOSData()
|
||||
new GetInputData()
|
||||
.doRewrite(
|
||||
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel",
|
||||
',', fs);
|
||||
|
@ -180,6 +177,8 @@ public class PrepareTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
void fosPrepareTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
|
@ -204,15 +203,9 @@ public class PrepareTest {
|
|||
|
||||
String doi1 = "unresolved::10.3390/s18072310::doi";
|
||||
|
||||
assertEquals(50, tmp.count());
|
||||
assertEquals(20, tmp.count());
|
||||
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
|
||||
assertTrue(
|
||||
tmp
|
||||
.filter(r -> r.getId().equals(doi1))
|
||||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("engineering and technology"));
|
||||
|
||||
|
||||
assertTrue(
|
||||
tmp
|
||||
|
@ -220,16 +213,16 @@ public class PrepareTest {
|
|||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("nano-technology"));
|
||||
.contains("04 agricultural and veterinary sciences"));
|
||||
assertTrue(
|
||||
tmp
|
||||
.filter(r -> r.getId().equals(doi1))
|
||||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("nanoscience & nanotechnology"));
|
||||
.contains("0404 agricultural biotechnology"));
|
||||
|
||||
String doi = "unresolved::10.1111/1365-2656.12831::doi";
|
||||
String doi = "unresolved::10.1007/s11164-020-04383-6::doi";
|
||||
assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count());
|
||||
assertTrue(
|
||||
tmp
|
||||
|
@ -237,7 +230,7 @@ public class PrepareTest {
|
|||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("psychology and cognitive sciences"));
|
||||
.contains("01 natural sciences"));
|
||||
|
||||
assertTrue(
|
||||
tmp
|
||||
|
@ -245,15 +238,116 @@ public class PrepareTest {
|
|||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("social sciences"));
|
||||
assertFalse(
|
||||
.contains("0104 chemical sciences"));
|
||||
assertTrue(
|
||||
tmp
|
||||
.filter(r -> r.getId().equals(doi))
|
||||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("NULL"));
|
||||
.contains("010402 general chemistry"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void getSDGFileTest() throws IOException, ClassNotFoundException {
|
||||
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg_sbs.csv")
|
||||
.getPath();
|
||||
final String outputPath = workingDir.toString() + "/sdg.json";
|
||||
|
||||
new GetInputData()
|
||||
.doRewrite(
|
||||
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
|
||||
',', fs);
|
||||
|
||||
BufferedReader in = new BufferedReader(
|
||||
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
|
||||
|
||||
String line;
|
||||
int count = 0;
|
||||
while ((line = in.readLine()) != null) {
|
||||
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
|
||||
|
||||
System.out.println(new ObjectMapper().writeValueAsString(sdg));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
assertEquals(37, count);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void sdgPrepareTest() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
|
||||
.getPath();
|
||||
|
||||
PrepareSDGSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", sourcePath,
|
||||
|
||||
"-outputPath", workingDir.toString() + "/work"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Result> tmp = sc
|
||||
.textFile(workingDir.toString() + "/work/sdg")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Result.class));
|
||||
|
||||
String doi1 = "unresolved::10.1001/amaguidesnewsletters.2019.sepoct02::doi";
|
||||
|
||||
assertEquals(32, tmp.count());
|
||||
assertEquals(1, tmp.filter(row -> row.getId().equals(doi1)).count());
|
||||
|
||||
|
||||
assertTrue(
|
||||
tmp
|
||||
.filter(r -> r.getId().equals(doi1))
|
||||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("3. Good health"));
|
||||
assertTrue(
|
||||
tmp
|
||||
.filter(r -> r.getId().equals(doi1))
|
||||
.flatMap(r -> r.getSubject().iterator())
|
||||
.map(sbj -> sbj.getValue())
|
||||
.collect()
|
||||
.contains("8. Economic growth"));
|
||||
|
||||
|
||||
}
|
||||
@Disabled
|
||||
@Test
|
||||
void test2() throws Exception {
|
||||
final String sourcePath = "/Users/miriam.baglioni/Downloads/doi_sdg_results_20_12_21.csv.gz";
|
||||
|
||||
|
||||
final String outputPath = workingDir.toString() + "/sdg.json";
|
||||
|
||||
new GetInputData()
|
||||
.doRewrite(
|
||||
sourcePath, outputPath, "eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel",
|
||||
',', fs);
|
||||
|
||||
BufferedReader in = new BufferedReader(
|
||||
new InputStreamReader(fs.open(new org.apache.hadoop.fs.Path(outputPath))));
|
||||
|
||||
String line;
|
||||
int count = 0;
|
||||
while ((line = in.readLine()) != null) {
|
||||
SDGDataModel sdg = new ObjectMapper().readValue(line, SDGDataModel.class);
|
||||
|
||||
System.out.println(new ObjectMapper().writeValueAsString(sdg));
|
||||
count += 1;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import java.nio.file.Path;
|
|||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.Constants;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -348,4 +349,175 @@ public class ProduceTest {
|
|||
|
||||
}
|
||||
|
||||
|
||||
private JavaRDD<Result> getResultJavaRDDPlusSDG() throws Exception {
|
||||
final String bipPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/bip/bip.json")
|
||||
.getPath();
|
||||
|
||||
PrepareBipFinder
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", bipPath,
|
||||
"--outputPath", workingDir.toString() + "/work"
|
||||
|
||||
});
|
||||
final String fosPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/fos/fos.json")
|
||||
.getPath();
|
||||
|
||||
PrepareFOSSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", fosPath,
|
||||
"-outputPath", workingDir.toString() + "/work"
|
||||
});
|
||||
|
||||
final String sdgPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/sdg/sdg.json")
|
||||
.getPath();
|
||||
|
||||
PrepareSDGSparkJob
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", sdgPath,
|
||||
"-outputPath", workingDir.toString() + "/work"
|
||||
});
|
||||
|
||||
SparkSaveUnresolved.main(new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", workingDir.toString() + "/work",
|
||||
|
||||
"-outputPath", workingDir.toString() + "/unresolved"
|
||||
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
return sc
|
||||
.textFile(workingDir.toString() + "/unresolved")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Result.class));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void produceTestSomeNumbersWithSDG() throws Exception {
|
||||
|
||||
final String doi = "unresolved::10.3390/s18072310::doi";
|
||||
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
|
||||
|
||||
Assertions.assertEquals(136, tmp.count());
|
||||
|
||||
Assertions.assertEquals(1, tmp.filter(row -> row.getId().equals(doi)).count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
50, tmp
|
||||
.filter(row -> !row.getId().equals(doi))
|
||||
.filter(row -> row.getSubject() != null)
|
||||
.count());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
85,
|
||||
tmp
|
||||
.filter(row -> !row.getId().equals(doi))
|
||||
.filter(r -> r.getInstance() != null && r.getInstance().size() > 0)
|
||||
.count());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void produceTest7Subjects() throws Exception {
|
||||
final String doi = "unresolved::10.3390/s18072310::doi";
|
||||
|
||||
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
7, tmp
|
||||
.filter(row -> row.getId().equals(doi))
|
||||
.collect()
|
||||
.get(0)
|
||||
.getSubject()
|
||||
.size());
|
||||
|
||||
List<StructuredProperty> sbjs = tmp
|
||||
.filter(row -> row.getId().equals(doi))
|
||||
.flatMap(row -> row.getSubject().iterator())
|
||||
.collect();
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("04 agricultural and veterinary sciences")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0404 agricultural biotechnology")));
|
||||
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("040502 food science")));
|
||||
|
||||
Assertions
|
||||
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("03 medical and health sciences")));
|
||||
Assertions.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("0303 health sciences")));
|
||||
Assertions
|
||||
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("030309 nutrition & dietetics")));
|
||||
Assertions
|
||||
.assertEquals(true, sbjs.stream().anyMatch(sbj -> sbj.getValue().equals("1. No poverty")));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void produceTestSubjectsWithSDG() throws Exception {
|
||||
|
||||
JavaRDD<Result> tmp = getResultJavaRDDPlusSDG();
|
||||
|
||||
List<StructuredProperty> sbjs_sdg = tmp
|
||||
.filter(row -> row.getSubject() != null && row.getSubject().size() > 0)
|
||||
.flatMap(row -> row.getSubject().iterator())
|
||||
.filter(sbj -> sbj.getQualifier().getClassid().equals(Constants.SDG_CLASS_ID))
|
||||
.collect();
|
||||
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("SDG", sbj.getQualifier().getClassid()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals(
|
||||
"Sustainable Development Goals", sbj.getQualifier().getClassname()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemeid()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals(ModelConstants.DNET_SUBJECT_TYPOLOGIES, sbj.getQualifier().getSchemename()));
|
||||
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getDeletedbyinference()));
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(true, sbj.getDataInfo().getInferred()));
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals(false, sbj.getDataInfo().getInvisible()));
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("", sbj.getDataInfo().getTrust()));
|
||||
sbjs_sdg.forEach(sbj -> Assertions.assertEquals("update", sbj.getDataInfo().getInferenceprovenance()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions.assertEquals("subject:sdg", sbj.getDataInfo().getProvenanceaction().getClassid()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals("Inferred by OpenAIRE", sbj.getDataInfo().getProvenanceaction().getClassname()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals(
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, sbj.getDataInfo().getProvenanceaction().getSchemeid()));
|
||||
sbjs_sdg
|
||||
.forEach(
|
||||
sbj -> Assertions
|
||||
.assertEquals(
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
sbj.getDataInfo().getProvenanceaction().getSchemename()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
{"doi":"10.1001/amaguidesnewsletters.2019.mayjun02","sbj":"10. No inequality"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2019.novdec01","sbj":"10. No inequality"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"3. Good health"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2019.sepoct02","sbj":"8. Economic growth"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb01","sbj":"8. Economic growth"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"3. Good health"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.janfeb02","sbj":"8. Economic growth"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.julaug01","sbj":"3. Good health"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.marapr01","sbj":"3. Good health"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun01","sbj":"3. Good health"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2020.mayjun02","sbj":"10. No inequality"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2021.julaug01","sbj":"1. No poverty"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2021.mayjune01","sbj":"10. No inequality"}
|
||||
{"doi":"10.1001/amaguidesnewsletters.2021.mayjune02","sbj":"10. No inequality"}
|
||||
{"doi":"10.4336/2021.pfb.41e201902078","sbj":"15. Life on land"}
|
||||
{"doi":"10.4337/ejeep.2019.00045","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2019.00050","sbj":"1. No poverty"}
|
||||
{"doi":"10.4337/ejeep.2019.0045","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2019.0050","sbj":"1. No poverty"}
|
||||
{"doi":"10.4337/ejeep.2019.0051","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2019.0052","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2020.0058","sbj":"1. No poverty"}
|
||||
{"doi":"10.4337/ejeep.2020.0058","sbj":"10. No inequality"}
|
||||
{"doi":"10.4337/ejeep.2020.0060","sbj":"10. No inequality"}
|
||||
{"doi":"10.4337/ejeep.2020.0065","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2020.02.03","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2020.02.05","sbj":"8. Economic growth"}
|
||||
{"doi":"10.4337/ejeep.2020.02.06","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/ejeep.2020.02.09","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/roke.2020.01.01","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/roke.2020.01.03","sbj":"16. Peace & justice"}
|
||||
{"doi":"10.4337/roke.2020.01.05","sbj":"1. No poverty"}
|
||||
{"doi":"10.4337/roke.2020.01.05","sbj":"8. Economic growth"}
|
||||
{"doi":"10.4337/roke.2020.01.07","sbj":"8. Economic growth"}
|
||||
{"doi":"10.4337/roke.2020.02.03","sbj":"8. Economic growth"}
|
||||
{"doi":"10.3390/s18072310","sbj":"1. No poverty"}
|
|
@ -0,0 +1,37 @@
|
|||
10.1001/amaguidesnewsletters.2019.mayjun02,10. No inequality
|
||||
10.1001/amaguidesnewsletters.2019.novdec01,10. No inequality
|
||||
10.1001/amaguidesnewsletters.2019.sepoct02,3. Good health
|
||||
10.1001/amaguidesnewsletters.2019.sepoct02,8. Economic growth
|
||||
10.1001/amaguidesnewsletters.2020.janfeb01,8. Economic growth
|
||||
10.1001/amaguidesnewsletters.2020.janfeb02,3. Good health
|
||||
10.1001/amaguidesnewsletters.2020.janfeb02,8. Economic growth
|
||||
10.1001/amaguidesnewsletters.2020.julaug01,3. Good health
|
||||
10.1001/amaguidesnewsletters.2020.marapr01,3. Good health
|
||||
10.1001/amaguidesnewsletters.2020.mayjun01,3. Good health
|
||||
10.1001/amaguidesnewsletters.2020.mayjun02,16. Peace & justice
|
||||
10.1001/amaguidesnewsletters.2020.mayjun02,10. No inequality
|
||||
10.1001/amaguidesnewsletters.2021.julaug01,1. No poverty
|
||||
10.1001/amaguidesnewsletters.2021.mayjune01,10. No inequality
|
||||
10.1001/amaguidesnewsletters.2021.mayjune02,10. No inequality
|
||||
10.4336/2021.pfb.41e201902078,15. Life on land
|
||||
10.4337/ejeep.2019.00045,16. Peace & justice
|
||||
10.4337/ejeep.2019.00050,1. No poverty
|
||||
10.4337/ejeep.2019.0045,16. Peace & justice
|
||||
10.4337/ejeep.2019.0050,1. No poverty
|
||||
10.4337/ejeep.2019.0051,16. Peace & justice
|
||||
10.4337/ejeep.2019.0052,16. Peace & justice
|
||||
10.4337/ejeep.2020.0058,1. No poverty
|
||||
10.4337/ejeep.2020.0058,10. No inequality
|
||||
10.4337/ejeep.2020.0060,10. No inequality
|
||||
10.4337/ejeep.2020.0065,16. Peace & justice
|
||||
10.4337/ejeep.2020.02.03,16. Peace & justice
|
||||
10.4337/ejeep.2020.02.05,8. Economic growth
|
||||
10.4337/ejeep.2020.02.06,16. Peace & justice
|
||||
10.4337/ejeep.2020.02.09,16. Peace & justice
|
||||
10.4337/roke.2020.01.01,16. Peace & justice
|
||||
10.4337/roke.2020.01.03,16. Peace & justice
|
||||
10.4337/roke.2020.01.05,1. No poverty
|
||||
10.4337/roke.2020.01.05,8. Economic growth
|
||||
10.4337/roke.2020.01.07,8. Economic growth
|
||||
10.4337/roke.2020.02.03,8. Economic growth
|
||||
10.4337/roke.2020.02.04,1. No poverty
|
|
|
@ -116,22 +116,22 @@ public class ExtractRelationFromEntityTest {
|
|||
.getType());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"context", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getSource()
|
||||
.getType());
|
||||
.assertEquals(
|
||||
"context", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getSource()
|
||||
.getType());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"result", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getTarget()
|
||||
.getType());
|
||||
.assertEquals(
|
||||
"result", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getSource().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getTarget()
|
||||
.getType());
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"IsRelatedTo", verificationDataset
|
||||
|
@ -151,22 +151,22 @@ public class ExtractRelationFromEntityTest {
|
|||
.getType());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"context", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getTarget()
|
||||
.getType());
|
||||
.assertEquals(
|
||||
"context", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getTarget()
|
||||
.getType());
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"result", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getSource()
|
||||
.getType());
|
||||
.assertEquals(
|
||||
"result", verificationDataset
|
||||
.filter((FilterFunction<Relation>) row -> row.getTarget().getId().startsWith("00"))
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getSource()
|
||||
.getType());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue