package eu.dnetlib.dhp.oa.graph.dump.csv; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static org.apache.commons.lang3.StringUtils.remove; import static org.apache.commons.lang3.StringUtils.split; import java.io.Serializable; import java.util.*; import java.util.stream.Collector; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.*; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; /** * @author miriam.baglioni * @Date 04/05/23 */ //STEP 3 public class SparkDumpResults implements Serializable { private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( SparkDumpResults.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String resultType = parser.get("resultType"); log.info("resultType: {}", resultType); final String resultClassName = parser.get("resultTableName"); log.info("resultTableName: {}", resultClassName); final String workingPath = parser.get("workingPath"); Class inputClazz = (Class) Class.forName(resultClassName); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { // Utils.removeOutputDir(spark, outputPath); run(spark, inputPath, inputClazz, resultType, workingPath); }); } private static void run(SparkSession spark, String inputPath, Class inputClazz, String resultType, String workingPath) { Dataset resultIds = spark.read().textFile(workingPath + "/resultIds"); // resultIds.foreach((ForeachFunction) r -> System.out.println(r)); Dataset results = Utils .readPath(spark, inputPath + "/" + resultType, inputClazz) .filter( (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible()); resultIds .joinWith(results, resultIds.col("value").equalTo(results.col("id"))) .map((MapFunction, R>) t2 -> t2._2(), Encoders.bean(inputClazz)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingPath + "/" + resultType + "/temp/result"); // map results results = Utils.readPath(spark, workingPath + "/" + resultType + "/temp/result", inputClazz); results .map( (MapFunction) r -> mapResultInfo(r), Encoders.bean(CSVResult.class)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingPath + "/" + resultType + "/result"); // map relations between pid and result results .flatMap((FlatMapFunction) r -> { List pids = new ArrayList<>(); if (Optional.ofNullable(r.getPid()).isPresent() && r.getPid().size() > 0) { pids.addAll(mapPid(r.getPid(), r.getId())); } return pids.iterator(); }, Encoders.bean(CSVPid.class)) .filter(Objects::nonNull) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingPath + "/" + resultType + "/result_pid"); // map authors from the result // per ogni autore nel result // se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid)) // se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua // la sua posizione nell'insieme degli autori) sempre con md5 results .flatMap((FlatMapFunction) r -> { int count = 0; List arl = new ArrayList<>(); Set authorIds = new HashSet(); if (Optional.ofNullable(r.getAuthor()).isPresent()) { for (Author a : r.getAuthor()) { count += 1; AuthorResult ar = new AuthorResult(); ar.setResultId(r.getId()); if (Optional.ofNullable(a.getRank()).isPresent()) { if (a.getRank() > 0) { ar.setRank(String.valueOf(a.getRank())); } else { ar.setRank(String.valueOf(count)); } } ar.setFirstName(replace(a.getName())); ar.setLastName(replace(a.getSurname())); ar.setFullName(replace(a.getFullname())); Tuple2 orcid = getOrcid(a.getPid()); if (Optional.ofNullable(orcid).isPresent()) { ar.setOrcid(orcid._1()); ar.setFromOrcid(orcid._2()); } ar.autosetId(); if (!authorIds.contains(ar.getAuthorId())) { arl.add(ar); authorIds.add(ar.getAuthorId()); } } } return arl.iterator(); }, Encoders.bean(AuthorResult.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingPath + "/" + resultType + "/temp/authorresult"); Dataset authorResult = Utils .readPath(spark, workingPath + "/" + resultType + "/temp/authorresult", AuthorResult.class); // map the relation between author and result authorResult .map( (MapFunction) ar -> { CSVRelResAut ret = new CSVRelResAut(); ret.setResult_id(ar.getResultId()); ret.setAuthor_id(ar.getAuthorId()); return ret; }, Encoders.bean(CSVRelResAut.class)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingPath + "/" + resultType + "/result_author"); // ma the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose // the one from orcid if any authorResult .groupByKey((MapFunction) ar -> ar.getAuthorId(), Encoders.STRING()) .mapGroups( (MapGroupsFunction) (k, it) -> { AuthorResult first = it.next(); if (!Optional.ofNullable(first.getFromOrcid()).isPresent() || first.getFromOrcid()) return getAuthorDump(first); while (it.hasNext()) { AuthorResult ar = it.next(); if (ar.getFromOrcid()) return getAuthorDump(ar); } return getAuthorDump(first); }, Encoders.bean(CSVAuthor.class)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingPath + "/" + resultType + "/author"); } private static String replace(String input) { if (Optional.ofNullable(input).isPresent()) return input.replace("\t", " ").replace("\n", " ").replace("\r", " ").replace("\"", " "); else return ""; } private static List mapPid(List pid, String resultId) { return pid .stream() .map(p -> p.getQualifier().getClassid().toLowerCase() + "@" + p.getValue().toLowerCase()) .distinct() .map(p -> { CSVPid ret = new CSVPid(); ret.setId(DHPUtils.md5(p + "@" + resultId)); ret.setResult_id(resultId); ret.setPid(split(p, "@")[1]); ret.setType(split(p, "@")[0]); return ret; }) .collect(Collectors.toList()); } private static CSVAuthor getAuthorDump(AuthorResult ar) { CSVAuthor ret = new CSVAuthor(); ret.setFirstname(ar.getFirstName()); ret.setId(ar.getAuthorId()); ret.setLastname(ar.getLastName()); ret.setFullname(ar.getFullName()); if (ar.getOrcid() != null) { ret.setOrcid(ar.getOrcid()); ret.setFromOrcid(ar.getFromOrcid()); } else { ret.setOrcid(""); } return ret; } private static Tuple2 getOrcid(List pid) { if (!Optional.ofNullable(pid).isPresent()) return null; if (pid.size() == 0) return null; for (StructuredProperty p : pid) { if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)) { return new Tuple2<>(p.getValue(), Boolean.TRUE); } } for (StructuredProperty p : pid) { if (p.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) { return new Tuple2<>(p.getValue(), Boolean.FALSE); } } return null; } private static String getFieldValue(Field input) { if (input != null && StringUtils.isNotEmpty(input.getValue())) { return removeBreaks(input.getValue()); } else { return ""; } } private static CSVResult mapResultInfo(R r) { CSVResult ret = new CSVResult(); ret.setId(removeBreaks(r.getId())); ret.setType(removeBreaks(r.getResulttype().getClassid())); ret.setTitle(getTitle(r.getTitle())); ret.setDescription(getAbstract(r.getDescription())); ret.setAccessright(removeBreaks(r.getBestaccessright().getClassid())); ret.setPublication_date(removeBreaks(getFieldValue(r.getDateofacceptance()))); ret.setPublisher(removeBreaks(getFieldValue(r.getPublisher()))); if (Optional.ofNullable(r.getSubject()).isPresent()) ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> { if (StringUtils.isNotEmpty(s.getValue())) return removeBreaks(s.getValue().toLowerCase()); else return null; }).filter(Objects::nonNull).distinct().collect(Collectors.toList()))); else ret.setKeywords(""); if (Optional.ofNullable(r.getCountry()).isPresent()) ret .setCountry( String.join(", ", r.getCountry().stream().map(Country::getClassid).collect(Collectors.toList()))); else ret.setCountry(""); if (Optional.ofNullable(r.getLanguage()).isPresent() && StringUtils.isNotEmpty(r.getLanguage().getClassid())) { ret.setLanguage(r.getLanguage().getClassid()); } else { ret.setLanguage(""); } return ret; } private static String getAbstract(List> description) { if (description == null) return ""; for (Field abs : description) { if (StringUtils.isNotEmpty(abs.getValue())) { return removeBreaks(abs.getValue()); } } return ""; } private static String getTitle(List titles) { String firstTitle = null; for (StructuredProperty title : titles) { if (StringUtils.isEmpty(firstTitle)) { if (StringUtils.isNotEmpty(title.getValue())) firstTitle = removeBreaks(title.getValue()); } if (title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) { if (StringUtils.isNotEmpty(title.getValue())) return removeBreaks(title.getValue()); } } if (firstTitle != null) { return removeBreaks(firstTitle); } return ""; } private static String removeBreaks(String input) { return input.replace("\n", " ").replace("\t", " ") .replace("\r", " ") .replace("\\\"", " ") .replace("\"", " ") ; } }