[dumpCSV] addressing the issues fointed out by the Dare Lab people. Repeated relations from author to result due to the author repeated in the data. Repeated relations from result to result due to the same pid present in more that one result. Author table not properly formatted due to the bad formatting of the input data

This commit is contained in:
Miriam Baglioni 2023-07-07 17:44:19 +02:00
parent 8a44653dbe
commit 9d1b708a89
4 changed files with 42 additions and 19 deletions

View File

@ -137,6 +137,7 @@ public class SparkDumpResults implements Serializable {
.flatMap((FlatMapFunction<R, AuthorResult>) r -> {
int count = 0;
List<AuthorResult> arl = new ArrayList<>();
Set<String> authorIds = new HashSet();
if (Optional.ofNullable(r.getAuthor()).isPresent()) {
for (Author a : r.getAuthor()) {
count += 1;
@ -149,9 +150,9 @@ public class SparkDumpResults implements Serializable {
ar.setRank(String.valueOf(count));
}
}
ar.setFirstName(a.getName().replace("\t", " ").replace("\n", " ").replace("\r", " "));
ar.setLastName(a.getSurname().replace("\t", " ").replace("\n", " ").replace("\r", " "));
ar.setFullName(a.getFullname().replace("\t", " ").replace("\n", " ").replace("\r", " "));
ar.setFirstName(replace(a.getName()));
ar.setLastName(replace(a.getSurname()));
ar.setFullName(replace(a.getFullname()));
Tuple2<String, Boolean> orcid = getOrcid(a.getPid());
if (Optional.ofNullable(orcid).isPresent()) {
ar.setOrcid(orcid._1());
@ -159,7 +160,12 @@ public class SparkDumpResults implements Serializable {
}
ar.autosetId();
arl.add(ar);
if(!authorIds.contains(ar.getAuthorId())){
arl.add(ar);
authorIds.add(ar.getAuthorId());
}
}
}
@ -187,11 +193,21 @@ public class SparkDumpResults implements Serializable {
.mode(SaveMode.Overwrite)
.json(workingPath + "/" + resultType + "/result_author");
// ma the authors in the working dir. I do not want to have them repeated
// ma the authors in the working dir. I do not want to have them repeated. If I have an orcid as id, I choose the one from orcid if any
authorResult
.groupByKey((MapFunction<AuthorResult, String>) ar -> ar.getAuthorId(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> getAuthorDump(it.next()),
(MapGroupsFunction<String, AuthorResult, CSVAuthor>) (k, it) -> {
AuthorResult first = it.next();
if(!Optional.ofNullable(first.getFromOrcid()).isPresent() || first.getFromOrcid())
return getAuthorDump(first);
while(it.hasNext()){
AuthorResult ar = it.next();
if(ar.getFromOrcid())
return getAuthorDump(ar);
}
return getAuthorDump(first);
},
Encoders.bean(CSVAuthor.class))
.write()
.option("compression", "gzip")
@ -200,6 +216,13 @@ public class SparkDumpResults implements Serializable {
}
private static String replace(String input){
if (Optional.ofNullable(input).isPresent())
return input.replace("\t", " ").replace("\n", " ").replace("\r", " ").replace("\"", " ");
else
return "";
}
private static List<CSVPid> mapPid(List<StructuredProperty> pid, String resultId) {
return pid
.stream()

View File

@ -10,6 +10,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Optional;
import eu.dnetlib.dhp.oa.graph.dump.csv.model.CSVAuthor;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@ -117,7 +118,7 @@ public class DumpResultTest {
tmp.show(false);
Assertions.assertEquals(4, tmp.count());
Assertions.assertEquals(5, tmp.count());
CSVResult row = tmp
.filter(
(FilterFunction<CSVResult>) r -> r.getId().equals("50|DansKnawCris::0224aae28af558f21768dbc6439c7a95"))
@ -196,7 +197,7 @@ public class DumpResultTest {
SparkDumpResults.main(new String[] {
"-isSparkSessionManaged", Boolean.FALSE.toString(),
"-outputPath", workingDir.toString() + "/output",
"-workingPath", workingDir.toString() + "/working",
"-resultType", "publication",
"-resultTableName", "eu.dnetlib.dhp.schema.oaf.Publication",
@ -205,26 +206,23 @@ public class DumpResultTest {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Dataset<Row> tmp = spark
.read()
.option("header", "true")
.option("delimiter", Constants.SEP)
.csv(workingDir.toString() + "/working/publication/author");
Dataset<CSVAuthor> tmp = Utils
.readPath(spark, workingDir.toString() + "/working/publication/author", CSVAuthor.class);
Assertions.assertEquals(5, tmp.count());
Assertions.assertEquals(13, tmp.count());
Assertions.assertEquals(1, tmp.where("firstName == 'Maryam'").count());
Assertions
.assertEquals(
DHPUtils.md5("50|DansKnawCris::0224aae28af558f21768dbc6439c7a951"),
tmp.where("firstName == 'Maryam'").first().getAs("id"));
tmp.where("firstName == 'Maryam'").first().getId());
Assertions
.assertEquals(DHPUtils.md5("0000-0003-2914-2734"), tmp.where("firstName == 'Michael'").first().getAs("id"));
.assertEquals(DHPUtils.md5("0000-0003-2914-2734"), tmp.where("firstName == 'Michael'").first().getId());
Assertions
.assertEquals(
DHPUtils.md5("50|DansKnawCris::20c414a3b1c742d5dd3851f1b67df2d92"),
tmp.where("firstName == 'Mikhail'").first().getAs("id"));
DHPUtils.md5("0000-0002-6660-5673"),
tmp.where("firstName == 'Mikhail'").first().getId());
}

File diff suppressed because one or more lines are too long

View File

@ -1 +1,2 @@
50|DansKnawCris::26780065282e607306372abd0d808245
50|doi_________::16e142b54fbddb2cf1c71ff7460e2792