code for the path of refereed field for production data

This commit is contained in:
Miriam Baglioni 2020-06-24 09:51:35 +02:00
parent 507f7a94a8
commit 020ffc9734
13 changed files with 576 additions and 0 deletions

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-patch</artifactId>
<dependencies>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>1.2.4-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>1.2.4-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,14 @@
package eu.dnetlib.dhp.patchrefereed;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class Constants {
public static final Qualifier DEFAULT_REFEREED = new Qualifier();
static{
DEFAULT_REFEREED.setClassid("0000");
DEFAULT_REFEREED.setClassname("Unknown");
DEFAULT_REFEREED.setSchemename("dnet:review_levels");
DEFAULT_REFEREED.setSchemeid("dnet:review_levels");
}
}

View File

@ -0,0 +1,28 @@
package eu.dnetlib.dhp.patchrefereed;
import eu.dnetlib.dhp.schema.oaf.Instance;
import java.io.Serializable;
import java.util.List;
public class ResultInstance implements Serializable {
private String resultId;
private List<Instance> instanceList;
public String getResultId() {
return resultId;
}
public void setResultId(String resultId) {
this.resultId = resultId;
}
public List<Instance> getInstanceList() {
return instanceList;
}
public void setInstanceList(List<Instance> instanceList) {
this.instanceList = instanceList;
}
}

View File

@ -0,0 +1,158 @@
package eu.dnetlib.dhp.patchrefereed;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkPatchRefereed implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPatchRefereed.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkAtomicActionJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/patchrefereed/patch_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath {}: ", preparedInfoPath);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
patchRefereed(
spark,
inputPath,
preparedInfoPath,
outputPath,
resultClazz);
});
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <R extends Result> void patchRefereed(SparkSession spark,
String inputPath,
String prearedInfoPath,
String outputPath,
Class<R> resultClazz) {
Dataset<R> result = readPath(spark, inputPath, resultClazz);
Dataset<ResultInstance> ri = readPath(spark, prearedInfoPath, ResultInstance.class);
result.joinWith(ri, result.col("id").equalTo(ri.col("resultId")), "left")
.map((MapFunction<Tuple2<R, ResultInstance>, R>)value->{
R r = value._1();
Optional<ResultInstance> oInstanceList = Optional.ofNullable(value._2());
List<Instance> resultInstance = r.getInstance();
if(oInstanceList.isPresent()){
List<Instance> instanceList = oInstanceList.get().getInstanceList();
resultInstance.forEach(i -> checkEquivalence(instanceList, i));
}
return r;
},Encoders.bean(resultClazz))
.write()
.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath);
}
private static void checkEquivalence(List<Instance> resultInstance, Instance i) {
for(Instance ri : resultInstance){
if(update(ri,i)){
return;
}
}
}
//verify collectedfrom hostedby openaccess instancetype url
private static boolean update(Instance ri, Instance i) {
if (ri.getAccessright().equals(i.getAccessright()) &&
ri.getCollectedfrom().equals(i.getCollectedfrom()) &&
ri.getHostedby().equals(i.getHostedby()) &&
ri.getInstancetype().equals(i.getInstancetype()) &&
equals(ri.getUrl(),i.getUrl())){
Optional<Qualifier> oRefereed = Optional.ofNullable(i.getRefereed());
if(!oRefereed.isPresent()){
if (ri.getRefereed().getClassid().equals("")){
i.setRefereed(Constants.DEFAULT_REFEREED);
}else {
i.setRefereed(ri.getRefereed());
}
}
return true;
}
return false;
}
private static boolean equals(List<String> url, List<String> url1) {
for(String u:url){
if(!url1.contains(u)){
return false;
}
}
return true;
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,125 @@
package eu.dnetlib.dhp.patchrefereed;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
public class SparkPrepareResultInstanceList implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultInstanceList.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkAtomicActionJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/patchrefereed/prepare_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
Class<? extends Result> resultClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
getAtomicActions(
spark,
inputPath,
outputPath,
resultClazz);
});
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <R extends Result> void getAtomicActions(SparkSession spark,
String inputPath,
String outputPath,
Class<R> resultClazz) {
Dataset<R> result = readPath(spark, inputPath, resultClazz);
result.map(r -> {
ResultInstance ri = null;
List<Instance> instanceList = Optional.ofNullable(r.getInstance())
.map(instances -> instances
.stream()
.map(instance -> {
Optional<Qualifier> oRefereed = Optional.ofNullable(instance.getRefereed());
if (oRefereed.isPresent()) {
return instance;
} else {
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toList()))
.orElse(null);
if((instanceList != null ) && instanceList.size() > 0){
ri = new ResultInstance();
ri.setResultId(r.getId());
ri.setInstanceList(instanceList);
}
return ri;
},Encoders.bean(ResultInstance.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(outputPath);
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
}

View File

@ -0,0 +1,31 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},{
"paramName": "pip",
"paramLongName": "preparedInfoPath",
"paramDescription": "dex",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName":"tn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -0,0 +1,146 @@
package eu.dnetlib.dhp.patchrefereed;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.oaf.Publication;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class PatchRefereedTest {
private static final Logger log = LoggerFactory.getLogger(PatchRefereedTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PatchRefereedTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PatchRefereedTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PatchRefereedTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
/**
* There are no new relations to be added. All the possible relations have already been linked with the project in
* the graph
*
* @throws Exception
*/
@Test
public void test1() throws Exception {
SparkPrepareResultInstanceList
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath",
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/publication.json").getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultInstance> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, ResultInstance.class));
Assertions.assertEquals(1, tmp.count());
}
@Test
public void test2() throws Exception {
SparkPatchRefereed
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath",
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/prod_publication.json").getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication",
"-preparedInfoPath",getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/preparedInfo.json").getPath()
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Publication> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, Publication.class));
Assertions.assertEquals(1, tmp.count());
org.apache.spark.sql.Dataset<Publication> verificationDataset = spark
.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
verificationDataset.createOrReplaceTempView("dataset");
verificationDataset.show(false);
}
@Test
public void test3() throws Exception {
SparkPrepareResultInstanceList
.main(
new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-inputPath",
getClass().getResource("/eu/dnetlib/dhp/patchrefereed/simpleTest/publication2.json").getPath(),
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
"-outputPath", workingDir.toString() + "/publication"
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<ResultInstance> tmp = sc
.textFile(workingDir.toString() + "/publication")
.map(item -> OBJECT_MAPPER.readValue(item, ResultInstance.class));
Assertions.assertEquals(1, tmp.count());
}
//@Test
}

View File

@ -0,0 +1 @@
{"instanceList":[{"accessright":{"classid":"UNKNOWN","classname":"UNKNOWN","schemeid":"dnet:access_modes","schemename":"dnet:access_modes"},"collectedfrom":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"key":"10|openaire____::806360c771262b4d6770e7cdf04b5c5a","value":"ORCID"},"dateofacceptance":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"value":"2002-01-01"},"distributionlocation":"","hostedby":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"key":"10|openaire____::55045bd2a65019fd8e6741a755395c8c","value":"Unknown Repository"},"instancetype":{"classid":"0004","classname":"Conference object","schemeid":"dnet:publication_resource","schemename":"dnet:publication_resource"},"license":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"value":""},"processingchargeamount":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"value":""},"processingchargecurrency":{"dataInfo":{"deletedbyinference":false,"inferenceprovenance":"","inferred":false,"invisible":false,"provenanceaction":{"classid":"","classname":"","schemeid":"","schemename":""},"trust":""},"value":""},"refereed":{"classid":"","classname":"","schemeid":"","schemename":""},"url":[]}],"resultId":"50|orcid_______::6c6960227e2775a019090d8f33e0e6e5"}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long