dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/SparkDumpResults.java

363 lines
11 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.csv;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.commons.lang3.StringUtils.remove;
import static org.apache.commons.lang3.StringUtils.split;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 04/05/23
*/
//STEP 3
public class SparkDumpResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String workingPath = parser.get("workingPath");
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
// Utils.removeOutputDir(spark, outputPath);
run(spark, inputPath, inputClazz, resultType, workingPath);
});
}
private static <R extends Result> void run(SparkSession spark, String inputPath,
Class<R> inputClazz, String resultType, String workingPath) {
Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
// resultIds.foreach((ForeachFunction<String>) r -> System.out.println(r));
Dataset<R> results = Utils
.readPath(spark, inputPath + "/" + resultType, inputClazz)
.filter(
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
resultIds
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.map((MapFunction<Tuple2<String, R>, R>) t2 -> t2._2(), Encoders.bean(inputClazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/" + resultType + "/temp/result");
// map results
results = Utils.readPath(spark, workingPath + "/" + resultType + "/temp/result", inputClazz);
results
.map(
(MapFunction<R, CSVResult>) r -> mapResultInfo(r),
Encoders.bean(CSVResult.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result");
// map relations between pid and result
results
.flatMap((FlatMapFunction<R, CSVPid>) r -> {
List<CSVPid> pids = new ArrayList<>();
if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) {
pids.addAll(mapPid(r.getPid(), r.getId()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_pid");
// map authors from the result
// per ogni autore nel result
// se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
// se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua
// la sua posizione nell'insieme degli autori) sempre con md5
results
.flatMap((FlatMapFunction<R, AuthorResult>) r -> {
int count = 0;
List<AuthorResult> arl = new ArrayList<>();
Set<String> authorIds = new HashSet();
if (Optional.ofNullable(r.getAuthor()).isPresent()) {
for (Author a : r.getAuthor()) {
count += 1;
AuthorResult ar = new AuthorResult();
ar.setResultId(r.getId());
if (Optional.ofNullable(a.getRank()).isPresent()) {
if (a.getRank() > 0) {
ar.setRank(String.valueOf(a.getRank()));
} else {
ar.setRank(String.valueOf(count));
}
}
ar.setFirstName(removeBreaks(a.getName()));
ar.setLastName(removeBreaks(a.getSurname()));
ar.setFullName(removeBreaks(a.getFullname()));
Tuple2<String, Boolean> orcid = getOrcid(a.getPid());
if (Optional.ofNullable(orcid).isPresent()) {
ar.setOrcid(orcid._1());
ar.setFromOrcid(orcid._2());
}
ar.autosetId();
if (!authorIds.contains(ar.getAuthorId())) {
arl.add(ar);
authorIds.add(ar.getAuthorId());
}
}
}
return arl.iterator();
}, Encoders.bean(AuthorResult.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingPath + "/" + resultType + "/temp/authorresult");
Dataset<AuthorResult> authorResult = Utils
.readPath(spark, workingPath + "/" + resultType + "/temp/authorresult", AuthorResult.class);
// map the relation between author and result
authorResult
.map(
(MapFunction<AuthorResult, CSVRelResAut>) ar -> {
CSVRelResAut ret = new CSVRelResAut();
ret.setResult_id(ar.getResultId());
ret.setAuthor_id(ar.getAuthorId());
return ret;
},
Encoders.bean(CSVRelResAut.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_author");
// ma the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose
// the one from orcid if any
authorResult
.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> {
AuthorResult first = it.next();
if (!Optional.ofNullable(first.getFromOrcid()).isPresent() || first.getFromOrcid())
return getAuthorDump(first);
while (it.hasNext()) {
AuthorResult ar = it.next();
if (ar.getFromOrcid())
return getAuthorDump(ar);
}
return getAuthorDump(first);
},
Encoders.bean(CSVAuthor.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/author");
}
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
return pid
.stream()
.map(p -> p.getQualifier().getClassid().toLowerCase() + "@" + p.getValue().toLowerCase())
.distinct()
.map(p -> {
CSVPid ret = new CSVPid();
ret.setId(DHPUtils.md5(p + "@" + resultId));
ret.setResult_id(resultId);
ret.setPid(split(p, "@")[1]);
ret.setType(split(p, "@")[0]);
return ret;
})
.collect(Collectors.toList());
}
private static CSVAuthor getAuthorDump(AuthorResult ar) {
CSVAuthor ret = new CSVAuthor();
ret.setFirstname(ar.getFirstName());
ret.setId(ar.getAuthorId());
ret.setLastname(ar.getLastName());
ret.setFullname(ar.getFullName());
if (ar.getOrcid() != null) {
ret.setOrcid(ar.getOrcid());
ret.setFromOrcid(ar.getFromOrcid());
} else {
ret.setOrcid("");
}
return ret;
}
private static Tuple2<String, Boolean> getOrcid(List<StructuredProperty> pid) {
if (!Optional.ofNullable(pid).isPresent())
return null;
if (pid.size() == 0)
return null;
for (StructuredProperty p : pid) {
if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)) {
return new Tuple2<>(p.getValue(), Boolean.TRUE);
}
}
for (StructuredProperty p : pid) {
if (p.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) {
return new Tuple2<>(p.getValue(), Boolean.FALSE);
}
}
return null;
}
private static String getFieldValue(Field<String> input) {
if (input != null &&
StringUtils.isNotEmpty(input.getValue())) {
return removeBreaks(input.getValue());
} else {
return "";
}
}
private static <R extends Result> CSVResult mapResultInfo(R r) {
CSVResult ret = new CSVResult();
ret.setId(removeBreaks(r.getId()));
ret.setType(removeBreaks(r.getResulttype().getClassid()));
ret.setTitle(getTitle(r.getTitle()));
ret.setDescription(getAbstract(r.getDescription()));
ret.setAccessright(removeBreaks(r.getBestaccessright().getClassid()));
ret.setPublication_date(removeBreaks(getFieldValue(r.getDateofacceptance())));
ret.setPublisher(removeBreaks(getFieldValue(r.getPublisher())));
if (Optional.ofNullable(r.getSubject()).isPresent())
ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> {
if (StringUtils.isNotEmpty(s.getValue()))
return removeBreaks(s.getValue().toLowerCase());
else
return null;
}).filter(Objects::nonNull).distinct().collect(Collectors.toList())));
else
ret.setKeywords("");
if (Optional.ofNullable(r.getCountry()).isPresent())
ret
.setCountry(
String.join(", ", r.getCountry().stream().map(Country::getClassid).collect(Collectors.toList())));
else
ret.setCountry("");
if (Optional.ofNullable(r.getLanguage()).isPresent() && StringUtils.isNotEmpty(r.getLanguage().getClassid())) {
ret.setLanguage(r.getLanguage().getClassid());
} else {
ret.setLanguage("");
}
return ret;
}
private static String getAbstract(List<Field<String>> description) {
if (description == null)
return "";
for (Field<String> abs : description) {
if (StringUtils.isNotEmpty(abs.getValue())) {
return removeBreaks(abs.getValue());
}
}
return "";
}
private static String getTitle(List<StructuredProperty> titles) {
String firstTitle = null;
for (StructuredProperty title : titles) {
if (StringUtils.isEmpty(firstTitle)) {
if (StringUtils.isNotEmpty(title.getValue()))
firstTitle = removeBreaks(title.getValue());
}
if (title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) {
if (StringUtils.isNotEmpty(title.getValue()))
return removeBreaks(title.getValue());
}
}
if (firstTitle != null) {
return removeBreaks(firstTitle);
}
return "";
}
private static String removeBreaks(String input) {
if (StringUtils.isNotEmpty(input))
return input
.replace("\n", " ")
.replace("\t", " ")
.replace("\r", " ")
// .replace("\\", " ")
.replace("\"", " ");
return input;
}
}