diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/AuthorResult.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/AuthorResult.java index dcc89c5..d4c350a 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/AuthorResult.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/AuthorResult.java @@ -86,7 +86,7 @@ public class AuthorResult implements Serializable { if (orcid != null) { authorId = DHPUtils.md5(orcid); } else { - authorId = DHPUtils.md5(resultId + firstName + lastName + rank); + authorId = DHPUtils.md5(resultId + rank); } } diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/SparkDumpResults.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/SparkDumpResults.java index 4270e1b..c416334 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/SparkDumpResults.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/csv/SparkDumpResults.java @@ -1,5 +1,25 @@ + package eu.dnetlib.dhp.oa.graph.dump.csv; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static org.apache.commons.lang3.StringUtils.split; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVRelResAut; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.*; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor; @@ -7,282 +27,272 @@ import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVPid; import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVResult; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; - import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.utils.DHPUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.*; -import org.apache.spark.sql.Dataset; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Tuple2; -import java.io.Serializable; -import java.util.*; -import java.util.stream.Collectors; - - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - /** * @author miriam.baglioni * @Date 04/05/23 */ +//STEP 3 public class SparkDumpResults implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpResults.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultType = parser.get("resultType"); + log.info("resultType: {}", resultType); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String workingPath = parser.get("workingPath"); + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + run(spark, inputPath, outputPath, inputClazz, resultType, workingPath); + + }); + + } + + private static void run(SparkSession spark, String inputPath, String outputPath, + Class inputClazz, String resultType, String workingPath) { + + Dataset resultIds = spark.read().textFile(workingPath + "/resultIds"); + Dataset results = Utils + .readPath(spark, inputPath + "/" + resultType, inputClazz) + .filter( + (FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible()); + + // map results + resultIds + .joinWith(results, resultIds.col("value").equalTo(results.col("id"))) + .map( + (MapFunction, CSVResult>) t2 -> mapResultInfo(t2._2()), + Encoders.bean(CSVResult.class)) + .write() + .option("compression", "gzip") + .option("header","true") + .option("delimiter",Constants.SEP) + .mode(SaveMode.Overwrite) + .csv(workingPath + "/" + resultType + "/result"); + + // map relations between pid and result + resultIds + .joinWith(results, resultIds.col("value").equalTo(results.col("id"))) + .flatMap((FlatMapFunction, CSVPid>) t2 -> { + List pids = new ArrayList<>(); + if (Optional.ofNullable(t2._2().getPid()).isPresent() && t2._2().getPid().size() > 0) { + pids.addAll(mapPid(t2._2().getPid(), t2._1())); + } + return pids.iterator(); + }, Encoders.bean(CSVPid.class)) + .filter(Objects::nonNull) + .write() + .option("compression", "gzip") + .option("header","true") + .option("delimiter", Constants.SEP) + .mode(SaveMode.Overwrite) + .csv(workingPath + "/" + resultType + "/result_pid"); + + // map authors from the result + // per ogni autore nel result + // se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid)) + // se non ha orcid il suo id si costruisce come result_id + authorrank ( se non ha il rank si sua + // la sua posizione nell'insieme degli autori) sempre con md5 + Dataset authorResult = resultIds + .joinWith(results, resultIds.col("value").equalTo(results.col("id"))) + .flatMap((FlatMapFunction, AuthorResult>) t2 -> { + int count = 0; + List arl = new ArrayList<>(); + for (Author a : t2._2().getAuthor()) { + count += 1; + AuthorResult ar = new AuthorResult(); + ar.setResultId(t2._1()); + if (Optional.ofNullable(a.getRank()).isPresent()) { + if (a.getRank() > 0) { + ar.setRank(String.valueOf(a.getRank())); + } else { + ar.setRank(String.valueOf(count)); + } + } + ar.setFirstName(a.getName()); + ar.setLastName(a.getSurname()); + ar.setFullName(a.getFullname()); + ar.setOrcid(getOrcid(a.getPid())); + ar.autosetId(); + arl.add(ar); + } + return arl.iterator(); + }, Encoders.bean(AuthorResult.class)); - private static final Logger log = LoggerFactory.getLogger(SparkDumpResults.class); + // map the relation between author and result + authorResult + .map( + (MapFunction) ar -> { + CSVRelResAut ret = new CSVRelResAut(); + ret.setResult_id(ar.getResultId() ); + ret.setAuthor_id( ar.getAuthorId()); + return ret; + }, + Encoders.bean(CSVRelResAut.class)) + .write() + .option("compression", "gzip") + .option("header","true") + .option("delimiter",Constants.SEP) + .mode(SaveMode.Overwrite) + .csv(workingPath + "/" + resultType + "/result_author"); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpResults.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_step2.json")); + // ma the authors in the working dir. I do not want to have them repeated + authorResult + .groupByKey((MapFunction) ar -> ar.getAuthorId(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, it) -> getAuthorDump(it.next()), + Encoders.bean(CSVAuthor.class)) + .write() + .option("compression", "gzip") + .option("header","true") + .option("delimiter",Constants.SEP) + .mode(SaveMode.Overwrite) + .csv(workingPath + "/" + resultType + "/author"); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + } - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + private static List mapPid(List pid, String resultId) { + return pid.stream().map(p -> p.getQualifier().getClassid().toLowerCase() + "@" + p.getValue().toLowerCase()).distinct().map(p -> { + CSVPid ret = new CSVPid(); + ret.setId(DHPUtils.md5(p)); + ret.setResult_id(resultId); + ret.setPid(split(p, "@")[1]); + ret.setType(split(p, "@")[0]); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + return ret; + }).collect(Collectors.toList()); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + } - final String resultType = parser.get("resultType"); - log.info("resultType: {}", resultType); + private static CSVAuthor getAuthorDump(AuthorResult ar) { + CSVAuthor ret = new CSVAuthor(); + ret.setFirstname(ar.getFirstName()); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + ret.setId(ar.getAuthorId()); + ret.setLastname(ar.getLastName()); - final String workingPath = parser.get("workingPath"); + ret.setFullname(ar.getFullName()); - Class inputClazz = (Class) Class.forName(resultClassName); + if (ar.getOrcid() != null) { + ret.setOrcid(ar.getOrcid()); + } else { + ret.setOrcid(""); + } - SparkConf conf = new SparkConf(); + return ret; + } - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath ); - run(spark, inputPath, outputPath, inputClazz, resultType, workingPath); + private static String getOrcid(List pid) { + if (!Optional.ofNullable(pid).isPresent()) + return null; + if (pid.size() == 0) + return null; + for (StructuredProperty p : pid) { + if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)) { + return p.getValue(); + } + } + return null; + } - }); + private static String getFieldValue(Field input){ + if (input != null && + StringUtils.isNotEmpty(input.getValue())) { + return input.getValue(); + } else { + return ""; + } + } + private static CSVResult mapResultInfo(R r) { + CSVResult ret = new CSVResult(); + ret.setId(r.getId()); + ret.setType(r.getResulttype().getClassid()); + ret.setTitle(getTitle(r.getTitle())); + ret.setDescription(getAbstract(r.getDescription())); + ret.setAccessright(r.getBestaccessright().getClassid()); + ret.setPublication_date(getFieldValue(r.getDateofacceptance())); + ret.setPublisher(getFieldValue(r.getPublisher())); - } - - private static void run(SparkSession spark, String inputPath, String outputPath, - Class inputClazz, String resultType, String workingPath) { - - Dataset resultIds = spark.read().textFile(workingPath + "/resultIds"); - Dataset results = Utils - .readPath(spark, inputPath + "/" + resultType, inputClazz) - .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference() && !p.getDataInfo().getInvisible()); - - // map results - resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id"))) - .map((MapFunction, CSVResult>) t2 -> mapResultInfo(t2._2()), Encoders.bean(CSVResult.class) ) - .write() - .option("compression","gzip") - .mode(SaveMode.Overwrite) - .csv(workingPath + "/" + resultType + "/result"); + ret.setKeywords(String.join(", ", r.getSubject().stream().map(s -> { + if (StringUtils.isNotEmpty(s.getValue())) + return s.getValue().toLowerCase(); + else + return null;}).filter(Objects::nonNull).distinct().collect(Collectors.toList()))); - // map relations between pid and result - resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id"))) - .flatMap((FlatMapFunction, CSVPid>) t2 -> - { - List pids = new ArrayList<>(); - if(Optional.ofNullable(t2._2().getPid()).isPresent() && t2._2().getPid().size() > 0){ - pids.addAll(mapPid(t2._2().getPid(), t2._1())); - } - return pids.iterator(); - }, Encoders.bean(CSVPid.class)) - .filter(Objects::nonNull) - .write() - .option("compression","gzip") - .mode(SaveMode.Overwrite) - .csv(workingPath + "/" + resultType + "/result_pid"); + ret.setCountry(String.join(", ", r.getCountry().stream().map(Country::getClassid).collect(Collectors.toList()))); + if (StringUtils.isNotEmpty(r.getLanguage().getClassid())) { + ret.setLanguage(r.getLanguage().getClassid()); + } else { + ret.setLanguage(""); + } - //map authors from the result - //per ogni autore nel result - //se l'autore ha un orcid il suo id dipende dall'orcid (tipo md5(orcid)) - //se non ha orcid il suo id si costruisce come result_id + author_name + authorrank ( se non ha il rank si sua - //la sua posizione nell'insieme degli autori) sempre con md5 - Dataset authorResult = resultIds.joinWith(results, resultIds.col("value").equalTo(results.col("id"))) - .flatMap((FlatMapFunction, AuthorResult>) t2 -> - { - int count = 0; - List arl = new ArrayList<>(); - for (Author a : t2._2().getAuthor()) { - count += 1; - AuthorResult ar = new AuthorResult(); - ar.setResultId(t2._1()); - if (Optional.ofNullable(a.getRank()).isPresent()) { - if (a.getRank() > 0) { - ar.setRank(String.valueOf(a.getRank())); - } else { - ar.setRank(String.valueOf(count)); - } - } - ar.setFirstName(a.getName()); - ar.setLastName(a.getSurname()); - ar.setFullName(a.getFullname()); - ar.setOrcid(getOrcid(a.getPid())); - ar.autosetId(); - arl.add(ar); - } - return arl.iterator(); - } - , Encoders.bean(AuthorResult.class)); - - // map the relation between author and result - authorResult.map((MapFunction) ar -> ar.getResultId() + Constats.SEP + ar.getAuthorId(), Encoders.STRING() ) - .write() - .option("compression","gzip") - .mode(SaveMode.Overwrite) - .csv(workingPath + "/" + resultType + "/result_author"); - - - // ma the authors in the working dir. I do not want to have them repeated - authorResult.groupByKey((MapFunction) ar -> ar.getAuthorId(), Encoders.STRING() ) - .mapGroups((MapGroupsFunction) (k, it) -> getAuthorDump(it.next()) , Encoders.bean(CSVAuthor.class)) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .csv(workingPath + "/" + resultType + "/author"); - - } - - private static List mapPid(List pid, String resultId) { - return pid.stream().map(p -> { - CSVPid ret = new CSVPid(); - - ret.setId(DHPUtils.md5(p.getQualifier().getClassid() + p.getValue())); - ret.setResult_id(resultId); - ret.setPid(p.getValue()); - ret.setType(p.getQualifier().getClassid()); - - return ret; - }).collect(Collectors.toList()); - - } - - private static CSVAuthor getAuthorDump(AuthorResult ar) { - CSVAuthor ret = new CSVAuthor(); - ret.setFirstname(ar.getFirstName()); - - ret.setId(ar.getAuthorId()); - ret.setLastname(ar.getLastName()); - - ret.setFullname(ar.getFullName()); - - if(ar.getOrcid() != null){ - ret.setOrcid(ar.getOrcid()); - }else{ - ret.setOrcid(""); - } - - return ret; - } - - private static String getOrcid(List pid) { - if(!Optional.ofNullable(pid).isPresent()) - return null; - if(pid.size() == 0) - return null; - for(StructuredProperty p : pid){ - if (p.getQualifier().getClassid().equals(ModelConstants.ORCID)){ - return p.getValue(); - } - } - return null; - } - - - - private static CSVResult mapResultInfo(R r) { - CSVResult ret = new CSVResult(); - ret.setId(r.getId()); - ret.setType(r.getResulttype().getClassid()); - ret.setTitle(getTitle(r.getTitle())); - ret.setDescription(getAbstract(r.getDescription())); - ret.setAccessright(r.getBestaccessright().getClassid()); - ret.setPublication_date(r.getDateofacceptance().getValue()); - if (StringUtils.isNotEmpty(r.getPublisher().getValue())) { - ret.setPublisher(r.getPublisher().getValue()); - } else { - ret.setPublisher(""); - } - - StringBuilder sbjs = new StringBuilder(); - for(StructuredProperty sbj : r.getSubject()){ - if(StringUtils.isNotEmpty(sbj.getValue())){ - sbjs.append(sbj.getValue()); - sbjs.append(","); - } - } - ret.setKeywords(sbjs.toString()); - - StringBuilder countries = new StringBuilder(); - - for(Country c: r.getCountry()){ - if(StringUtils.isNotEmpty(c.getClassid())){ - countries.append(c.getClassid()); - } - } - ret.setCountry(countries.toString()); - - if(StringUtils.isNotEmpty(r.getLanguage().getClassid())){ - ret.setLanguage(r.getLanguage().getClassid()); - }else{ - ret.setLanguage(""); - } - - return ret; - } - - private static String getAbstract(List> description) { - for(Field abs:description){ - if(StringUtils.isNotEmpty(abs.getValue())){ - return abs.getValue(); - } - } - return ""; - } - - - private static String getTitle(List titles) { - String firstTitle = null; - for(StructuredProperty title : titles){ - if(StringUtils.isEmpty(firstTitle)){ - if(StringUtils.isNotEmpty(title.getValue())) - firstTitle = title.getValue(); - } - if(title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())){ - if(StringUtils.isNotEmpty(title.getValue())) - return title.getValue(); - } - } - return ""; - } - + return ret; + } + private static String getAbstract(List> description) { + if(description == null) + return ""; + for (Field abs : description) { + if (StringUtils.isNotEmpty(abs.getValue())) { + return abs.getValue(); + } + } + return ""; + } + private static String getTitle(List titles) { + String firstTitle = null; + for (StructuredProperty title : titles) { + if (StringUtils.isEmpty(firstTitle)) { + if (StringUtils.isNotEmpty(title.getValue())) + firstTitle = title.getValue(); + } + if (title.getQualifier().getClassid().equals(ModelConstants.MAIN_TITLE_QUALIFIER.getClassid())) { + if (StringUtils.isNotEmpty(title.getValue())) + return title.getValue(); + } + } + return ""; + } } diff --git a/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json new file mode 100644 index 0000000..87a01d8 --- /dev/null +++ b/dump/src/main/resources/eu/dnetlib/dhp/oa/graph/dump/input_dump_csv_ste3.json @@ -0,0 +1,41 @@ +[ + + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + }, + { + "paramName":"wp", + "paramLongName":"workingPath", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + }, + { + "paramName":"rt", + "paramLongName":"resultType", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + }, + { + "paramName":"rtn", + "paramLongName":"resultTableName", + "paramDescription": "the name of the result table we are currently working on", + "paramRequired": true + } +] + + diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpCommunitiesTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpCommunitiesTest.java new file mode 100644 index 0000000..605f1ec --- /dev/null +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpCommunitiesTest.java @@ -0,0 +1,9 @@ + +package eu.dnetlib.dhp.oa.graph.dump.csv; + +/** + * @author miriam.baglioni + * @Date 11/05/23 + */ +public class DumpCommunitiesTest { +} diff --git a/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpResultTest.java b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpResultTest.java new file mode 100644 index 0000000..fc40bd5 --- /dev/null +++ b/dump/src/test/java/eu/dnetlib/dhp/oa/graph/dump/csv/DumpResultTest.java @@ -0,0 +1,9 @@ + +package eu.dnetlib.dhp.oa.graph.dump.csv; + +/** + * @author miriam.baglioni + * @Date 11/05/23 + */ +public class DumpResultTest { +} diff --git a/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00000 b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00000 new file mode 100644 index 0000000..e69de29 diff --git a/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00049 b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00049 new file mode 100644 index 0000000..6a5ffaf --- /dev/null +++ b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00049 @@ -0,0 +1 @@ +50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d9 diff --git a/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00089 b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00089 new file mode 100644 index 0000000..07020d0 --- /dev/null +++ b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00089 @@ -0,0 +1 @@ +50|DansKnawCris::0224aae28af558f21768dbc6439c7a95 diff --git a/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00169 b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00169 new file mode 100644 index 0000000..7ceab26 --- /dev/null +++ b/dump/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/csv/working/resultIds/part-00169 @@ -0,0 +1 @@ +50|DansKnawCris::26780065282e607306372abd0d808245