new classes for generating the report for the orcid cleaning plus new property file

This commit is contained in:
Miriam Baglioni 2020-11-04 13:18:49 +01:00
parent 4ee84ae724
commit fd00d44e1e
3 changed files with 294 additions and 0 deletions

View File

@ -0,0 +1,202 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
public class MakeReportSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MakeReportSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
MakeReportSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean/input_clean_author_pids.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {} ", preparedInfoPath);
String orcidInputPath = parser.get("orcidInputPath");
log.info("orcidInputPath: {}", orcidInputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends Result> entityClazz = (Class<? extends Result>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
makeReport(spark, inputPath, entityClazz, outputPath, preparedInfoPath, loadAuthoritativeOrcid(spark, orcidInputPath));
});
}
/**
* Loads the sequence file containing the information about the orcid id, name and surname of the author.
* It returns a dataset whose model maps the one defined in the class OrcidAuthoritative
* @param spark the sparkSession
* @param orcidInputPath the path where to read the sequence file
* @return the dataset
*/
private static Dataset<OrcidAuthotitative> loadAuthoritativeOrcid(SparkSession spark, String orcidInputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return spark.createDataset(JavaRDD.toRDD(sc.sequenceFile(orcidInputPath, Text.class, Text.class)
.map(pair -> OBJECT_MAPPER.readValue(pair._2().toString(), OrcidAuthotitative.class))), Encoders.bean(OrcidAuthotitative.class));
}
/**
* It cleans the association with the orcid to authors whose name and surname do not match the one in the orcid sequence file
* First step the join between the authoritative information and the prepared Info on orcid id. If the name surname of the authors do not match
* (or the surname and the first letter of the name) null is returned. If they match the resultInfo instance is returned.
* Second step the result info returned are grouped by key (the key is the id of the result) and a new result with as many authors as those
* having a correct association with the orcid is returned.
* Third step a join between the result and the authoritavide authors result is done on the result id.
* Each author in the original result with a pid of type orcid is match against the list of the authoritative authors. If the author
* in the original result has his pid among those in the authoritative result than the pid is retained, else is removed
* @param spark the stpark session
* @param inputPath the path where to find the result to emend
* @param entityClazz the class of the result considered
* @param outputPath the path where to write the emended result
* @param preparedInfoPath the path where to find the selected information for the result
* @param authoritative the authoritative association orcid name surname
* @param <I>
*/
private static <I extends Result> void makeReport(SparkSession spark, String inputPath, Class<I> entityClazz,
String outputPath, String preparedInfoPath,
Dataset<OrcidAuthotitative> authoritative) {
Dataset<ResultInfo> resultInfo = readPath(spark, preparedInfoPath, ResultInfo.class);
Dataset<Tuple2<String, ReportInfo>> checkedResult = resultInfo.joinWith(authoritative, authoritative.col("oid")
.equalTo(resultInfo.col("orcid")), "left")
.map((MapFunction<Tuple2<ResultInfo, OrcidAuthotitative>, Tuple2<String, ReportInfo>>) pair -> {
Optional<OrcidAuthotitative> ooa = Optional.ofNullable(pair._2());
if(!ooa.isPresent()){
return null;
}
OrcidAuthotitative oa = ooa.get();
ResultInfo ri = pair._1();
if (StringUtils.isBlank(ri.getSurname())) {
PacePerson pp = new PacePerson(ri.getFullname(), false);
ri.setSurname(pp.getNormalisedSurname());
ri.setName(pp.getNormalisedFirstName());
}
ReportInfo reportInfo = new ReportInfo();
reportInfo.setOid(oa.getOid());
reportInfo.setOname(oa.getName());
reportInfo.setOsurname(oa.getSurname());
reportInfo.setOcreditname(oa.getCreditname());
reportInfo.setAssociatedAuthors(Arrays.asList(ri));
if(!Optional.ofNullable(oa.getSurname()).isPresent()){
return new Tuple2<>("missing", reportInfo);
}
final String handledOsurname = StringUtils.stripAccents(oa.getSurname().toLowerCase().trim()).replace("-", " ");
final String handledSurname = StringUtils.stripAccents(ri.getSurname().toLowerCase().trim()).replace("-", " ");
if (!handledOsurname
.equalsIgnoreCase(handledSurname)) {
if (!handledOsurname.contains(handledSurname) && !handledSurname.contains(handledOsurname)) {
return new Tuple2<>("wrong", reportInfo);
}
return new Tuple2<>("check", reportInfo);
}
return new Tuple2<>("right", reportInfo);
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ReportInfo.class))).filter(Objects::nonNull);
writeSet(checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("wrong")), outputPath + "/wrong");
writeSet(checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("right")), outputPath + "/right");
writeSet(checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("check")), outputPath + "/check");
writeSet(checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("missing")), outputPath + "/missing");
}
private static void writeSet(Dataset<Tuple2<String, ReportInfo>> dataset, String outputPath){
dataset.groupByKey((MapFunction<Tuple2<String,ReportInfo>, String>) value -> value._2().getOid() , Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<String, ReportInfo>, ReportInfo>) (oid, tuple2Iterator) ->{
ReportInfo reportInfo = tuple2Iterator.next()._2();
List<ResultInfo> aList = reportInfo.getAssociatedAuthors();
tuple2Iterator.forEachRemaining(tuple -> aList.addAll(tuple._2().getAssociatedAuthors()));
reportInfo.setAssociatedAuthors(aList);
return reportInfo;
}, Encoders.bean(ReportInfo.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import java.io.Serializable;
import java.util.List;
public class ReportInfo implements Serializable {
private String oid;
private String oname;
private String osurname;
private String ocreditname;
private List<ResultInfo> associatedAuthors;
public String getOid() {
return oid;
}
public void setOid(String oid) {
this.oid = oid;
}
public String getOname() {
return oname;
}
public void setOname(String oname) {
this.oname = oname;
}
public String getOsurname() {
return osurname;
}
public void setOsurname(String osurname) {
this.osurname = osurname;
}
public String getOcreditname() {
return ocreditname;
}
public void setOcreditname(String ocreditname) {
this.ocreditname = ocreditname;
}
public List<ResultInfo> getAssociatedAuthors() {
return associatedAuthors;
}
public void setAssociatedAuthors(List<ResultInfo> associatedAuthors) {
this.associatedAuthors = associatedAuthors;
}
}

View File

@ -0,0 +1,39 @@
[
{
"paramName":"i",
"paramLongName":"inputPath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName": "gtn",
"paramLongName": "graphTableClassName",
"paramDescription": "the table name of the result currently considering",
"paramRequired": true
},
{
"paramName": "pip",
"paramLongName": "preparedInfoPath",
"paramDescription": "The path of the prepared result informaiton",
"paramRequired": true
},
{
"paramName": "oip",
"paramLongName": "orcidInputPath",
"paramDescription": "the path to the authoritative orcid information",
"paramRequired": true
}
]