[DUMP CSV] Dumping of the results, of the authors and the relationships between results and authors and results and pids

This commit is contained in:
Miriam Baglioni 2023-05-17 16:56:28 +02:00
parent 7563499740
commit 66873c1744
9 changed files with 319 additions and 247 deletions

View File

@ -86,7 +86,7 @@ public class AuthorResult implements Serializable {
if (orcid != null) {
authorId = DHPUtils.md5(orcid);
} else {
authorId = DHPUtils.md5(resultId + firstName + lastName + rank);
authorId = DHPUtils.md5(resultId + rank);
}
}

View File

@ -1,5 +1,25 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.commons.lang3.StringUtils.split;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
@ -7,282 +27,272 @@ import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 04/05/23
*/
//STEP 3
public class SparkDumpResults implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
final String workingPath = parser.get("workingPath");
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
run(spark, inputPath, outputPath, inputClazz, resultType, workingPath);
});
}
private static <R extends Result> void run(SparkSession spark, String inputPath, String outputPath,
Class<R> inputClazz, String resultType, String workingPath) {
Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
Dataset<R> results = Utils
.readPath(spark, inputPath + "/" + resultType, inputClazz)
.filter(
(FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
// map results
resultIds
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.map(
(MapFunction<Tuple2<String, R>, CSVResult>) t2 -> mapResultInfo(t2._2()),
Encoders.bean(CSVResult.class))
.write()
.option("compression", "gzip")
.option("header","true")
.option("delimiter",Constants.SEP)
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result");
// map relations between pid and result
resultIds
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String, R>, CSVPid>) t2 -> {
List<CSVPid> pids = new ArrayList<>();
if (Optional.ofNullable(t2._2().getPid()).isPresent() && t2._2().getPid().size() > 0) {
pids.addAll(mapPid(t2._2().getPid(), t2._1()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression", "gzip")
.option("header","true")
.option("delimiter", Constants.SEP)
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_pid");
// map authors from the result
// per ogni autore nel result
// se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
// se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua
// la sua posizione nell'insieme degli autori) sempre con md5
Dataset<AuthorResult> authorResult = resultIds
.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String, R>, AuthorResult>) t2 -> {
int count = 0;
List<AuthorResult> arl = new ArrayList<>();
for (Author a : t2._2().getAuthor()) {
count += 1;
AuthorResult ar = new AuthorResult();
ar.setResultId(t2._1());
if (Optional.ofNullable(a.getRank()).isPresent()) {
if (a.getRank() > 0) {
ar.setRank(String.valueOf(a.getRank()));
} else {
ar.setRank(String.valueOf(count));
}
}
ar.setFirstName(a.getName());
ar.setLastName(a.getSurname());
ar.setFullName(a.getFullname());
ar.setOrcid(getOrcid(a.getPid()));
ar.autosetId();
arl.add(ar);
}
return arl.iterator();
}, Encoders.bean(AuthorResult.class));
private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class);
// map the relation between author and result
authorResult
.map(
(MapFunction<AuthorResult, CSVRelResAut>) ar -> {
CSVRelResAut ret = new CSVRelResAut();
ret.setResult_id(ar.getResultId() );
ret.setAuthor_id( ar.getAuthorId());
return ret;
},
Encoders.bean(CSVRelResAut.class))
.write()
.option("compression", "gzip")
.option("header","true")
.option("delimiter",Constants.SEP)
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_author");
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkDumpResults.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step2.json"));
// ma the authors in the working dir. I do not want to have them repeated
authorResult
.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> getAuthorDump(it.next()),
Encoders.bean(CSVAuthor.class))
.write()
.option("compression", "gzip")
.option("header","true")
.option("delimiter",Constants.SEP)
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/author");
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
}
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
return pid.stream().map(p -> p.getQualifier().getClassid().toLowerCase() + "@" + p.getValue().toLowerCase()).distinct().map(p -> {
CSVPid ret = new CSVPid();
ret.setId(DHPUtils.md5(p));
ret.setResult_id(resultId);
ret.setPid(split(p, "@")[1]);
ret.setType(split(p, "@")[0]);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
return ret;
}).collect(Collectors.toList());
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
}
final String resultType = parser.get("resultType");
log.info("resultType: {}", resultType);
private static CSVAuthor getAuthorDump(AuthorResult ar) {
CSVAuthor ret = new CSVAuthor();
ret.setFirstname(ar.getFirstName());
final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName);
ret.setId(ar.getAuthorId());
ret.setLastname(ar.getLastName());
final String workingPath = parser.get("workingPath");
ret.setFullname(ar.getFullName());
Class<? extends Result> inputClazz = (Class<? extends Result>) Class.forName(resultClassName);
if (ar.getOrcid() != null) {
ret.setOrcid(ar.getOrcid());
} else {
ret.setOrcid("");
}
SparkConf conf = new SparkConf();
return ret;
}
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath );
run(spark, inputPath, outputPath, inputClazz, resultType, workingPath);
private static String getOrcid(List<StructuredProperty> pid) {
if (!Optional.ofNullable(pid).isPresent())
return null;
if (pid.size() == 0)
return null;
for (StructuredProperty p : pid) {
if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)) {
return p.getValue();
}
}
return null;
}
});
private static String getFieldValue(Field<String> input){
if (input != null &&
StringUtils.isNotEmpty(input.getValue())) {
return input.getValue();
} else {
return "";
}
}
private static <R extends Result> CSVResult mapResultInfo(R r) {
CSVResult ret = new CSVResult();
ret.setId(r.getId());
ret.setType(r.getResulttype().getClassid());
ret.setTitle(getTitle(r.getTitle()));
ret.setDescription(getAbstract(r.getDescription()));
ret.setAccessright(r.getBestaccessright().getClassid());
ret.setPublication_date(getFieldValue(r.getDateofacceptance()));
ret.setPublisher(getFieldValue(r.getPublisher()));
}
private static <R extends Result> void run(SparkSession spark, String inputPath, String outputPath,
Class<R> inputClazz, String resultType, String workingPath) {
Dataset<String> resultIds = spark.read().textFile(workingPath + "/resultIds");
Dataset<R> results = Utils
.readPath(spark, inputPath + "/" + resultType, inputClazz)
.filter((FilterFunction<R>) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible());
// map results
resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.map((MapFunction<Tuple2<String,R>, CSVResult>) t2 -> mapResultInfo(t2._2()), Encoders.bean(CSVResult.class) )
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result");
ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> {
if (StringUtils.isNotEmpty(s.getValue()))
return s.getValue().toLowerCase();
else
return null;}).filter(Objects::nonNull).distinct().collect(Collectors.toList())));
// map relations between pid and result
resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String,R>, CSVPid>) t2 ->
{
List<CSVPid> pids = new ArrayList<>();
if(Optional.ofNullable(t2._2().getPid()).isPresent() && t2._2().getPid().size() > 0){
pids.addAll(mapPid(t2._2().getPid(), t2._1()));
}
return pids.iterator();
}, Encoders.bean(CSVPid.class))
.filter(Objects::nonNull)
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_pid");
ret.setCountry(String.join(", ", r.getCountry().stream().map(Country::getClassid).collect(Collectors.toList())));
if (StringUtils.isNotEmpty(r.getLanguage().getClassid())) {
ret.setLanguage(r.getLanguage().getClassid());
} else {
ret.setLanguage("");
}
//map authors from the result
//per ogni autore nel result
//se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid))
//se non ha orcid il suo id si costruisce come result_id + author_name + authorrank ( se non ha il rank si sua
//la sua posizione nell'insieme degli autori) sempre con md5
Dataset<AuthorResult> authorResult = resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id")))
.flatMap((FlatMapFunction<Tuple2<String, R>, AuthorResult>) t2 ->
{
int count = 0;
List<AuthorResult> arl = new ArrayList<>();
for (Author a : t2._2().getAuthor()) {
count += 1;
AuthorResult ar = new AuthorResult();
ar.setResultId(t2._1());
if (Optional.ofNullable(a.getRank()).isPresent()) {
if (a.getRank() > 0) {
ar.setRank(String.valueOf(a.getRank()));
} else {
ar.setRank(String.valueOf(count));
}
}
ar.setFirstName(a.getName());
ar.setLastName(a.getSurname());
ar.setFullName(a.getFullname());
ar.setOrcid(getOrcid(a.getPid()));
ar.autosetId();
arl.add(ar);
}
return arl.iterator();
}
, Encoders.bean(AuthorResult.class));
// map the relation between author and result
authorResult.map((MapFunction<AuthorResult, String>) ar -> ar.getResultId() + Constats.SEP + ar.getAuthorId(), Encoders.STRING() )
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/result_author");
// ma the authors in the working dir. I do not want to have them repeated
authorResult.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING() )
.mapGroups((MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> getAuthorDump(it.next()) , Encoders.bean(CSVAuthor.class))
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.csv(workingPath + "/" + resultType + "/author");
}
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
return pid.stream().map(p -> {
CSVPid ret = new CSVPid();
ret.setId(DHPUtils.md5(p.getQualifier().getClassid() + p.getValue()));
ret.setResult_id(resultId);
ret.setPid(p.getValue());
ret.setType(p.getQualifier().getClassid());
return ret;
}).collect(Collectors.toList());
}
private static CSVAuthor getAuthorDump(AuthorResult ar) {
CSVAuthor ret = new CSVAuthor();
ret.setFirstname(ar.getFirstName());
ret.setId(ar.getAuthorId());
ret.setLastname(ar.getLastName());
ret.setFullname(ar.getFullName());
if(ar.getOrcid() != null){
ret.setOrcid(ar.getOrcid());
}else{
ret.setOrcid("");
}
return ret;
}
private static String getOrcid(List<StructuredProperty> pid) {
if(!Optional.ofNullable(pid).isPresent())
return null;
if(pid.size() == 0)
return null;
for(StructuredProperty p : pid){
if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)){
return p.getValue();
}
}
return null;
}
private static <R extends Result> CSVResult mapResultInfo(R r) {
CSVResult ret = new CSVResult();
ret.setId(r.getId());
ret.setType(r.getResulttype().getClassid());
ret.setTitle(getTitle(r.getTitle()));
ret.setDescription(getAbstract(r.getDescription()));
ret.setAccessright(r.getBestaccessright().getClassid());
ret.setPublication_date(r.getDateofacceptance().getValue());
if (StringUtils.isNotEmpty(r.getPublisher().getValue())) {
ret.setPublisher(r.getPublisher().getValue());
} else {
ret.setPublisher("");
}
StringBuilder sbjs = new StringBuilder();
for(StructuredProperty sbj : r.getSubject()){
if(StringUtils.isNotEmpty(sbj.getValue())){
sbjs.append(sbj.getValue());
sbjs.append(",");
}
}
ret.setKeywords(sbjs.toString());
StringBuilder countries = new StringBuilder();
for(Country c: r.getCountry()){
if(StringUtils.isNotEmpty(c.getClassid())){
countries.append(c.getClassid());
}
}
ret.setCountry(countries.toString());
if(StringUtils.isNotEmpty(r.getLanguage().getClassid())){
ret.setLanguage(r.getLanguage().getClassid());
}else{
ret.setLanguage("");
}
return ret;
}
private static String getAbstract(List<Field<String>> description) {
for(Field<String> abs:description){
if(StringUtils.isNotEmpty(abs.getValue())){
return abs.getValue();
}
}
return "";
}
private static String getTitle(List<StructuredProperty> titles) {
String firstTitle = null;
for(StructuredProperty title : titles){
if(StringUtils.isEmpty(firstTitle)){
if(StringUtils.isNotEmpty(title.getValue()))
firstTitle = title.getValue();
}
if(title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())){
if(StringUtils.isNotEmpty(title.getValue()))
return title.getValue();
}
}
return "";
}
return ret;
}
private static String getAbstract(List<Field<String>> description) {
if(description == null)
return "";
for (Field<String> abs : description) {
if (StringUtils.isNotEmpty(abs.getValue())) {
return abs.getValue();
}
}
return "";
}
private static String getTitle(List<StructuredProperty> titles) {
String firstTitle = null;
for (StructuredProperty title : titles) {
if (StringUtils.isEmpty(firstTitle)) {
if (StringUtils.isNotEmpty(title.getValue()))
firstTitle = title.getValue();
}
if (title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) {
if (StringUtils.isNotEmpty(title.getValue()))
return title.getValue();
}
}
return "";
}
}

View File

@ -0,0 +1,41 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
},
{
"paramName":"wp",
"paramLongName":"workingPath",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"rt",
"paramLongName":"resultType",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
},
{
"paramName":"rtn",
"paramLongName":"resultTableName",
"paramDescription": "the name of the result table we are currently working on",
"paramRequired": true
}
]

View File

@ -0,0 +1,9 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class DumpCommunitiesTest {
}

View File

@ -0,0 +1,9 @@
package eu.dnetlib.dhp.oa.graph.dump.csv;
/**
* @author miriam.baglioni
* @Date 11/05/23
*/
public class DumpResultTest {
}

View File

@ -0,0 +1 @@
50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9

View File

@ -0,0 +1 @@
50|DansKnawCris::0224aae28af558f21768dbc6439c7a95

View File

@ -0,0 +1 @@
50|DansKnawCris::26780065282e607306372abd0d808245