From 34a4b3cbdfe3bc4d07b565b95089c89ddbd25f9c Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Fri, 24 Nov 2023 12:39:58 +0100 Subject: [PATCH] Implemented ORCID Enrichment --- .../eu/dnetlib/dhp/oa/merge/AuthorMerger.java | 292 +++++++++++++++++- .../eu/dnetlib/oa/merge/AuthorMergerTest.java | 125 ++++++++ .../collection/orcid/DownloadORCIDTest.java | 39 --- .../orcid/enrich_graph_orcid_parameters.json | 26 ++ .../enrich/orcid/oozie_app/config-default.xml | 34 ++ .../dhp/enrich/orcid/oozie_app/workflow.xml | 52 ++++ .../dhp/enrich/orcid/AuthorEnricher.scala | 37 +++ .../SparkEnrichGraphWithOrcidAuthors.scala | 119 +++++++ .../dhp/enrich/orcid/EnrichOrcidTest.scala | 12 + 9 files changed, 696 insertions(+), 40 deletions(-) create mode 100644 dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java index aea046203..6c3058303 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java @@ -1,11 +1,18 @@ package eu.dnetlib.dhp.oa.merge; +import java.io.FileWriter; +import java.io.IOException; import java.text.Normalizer; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.jetbrains.annotations.NotNull; import com.wcohen.ss.JaroWinkler; @@ -14,6 +21,28 @@ import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.pace.model.Person; import scala.Tuple2; +class SimilarityCellInfo implements Comparable { + + public int authorPosition = 0; + public int orcidPosition = 0; + + public double maxColumnSimilarity = 0.0; + + public SimilarityCellInfo() { + } + + public void setValues(final int authPos, final int orcidPos, final double similarity) { + this.authorPosition = authPos; + this.orcidPosition = orcidPos; + this.maxColumnSimilarity = similarity; + } + + @Override + public int compareTo(@NotNull SimilarityCellInfo o) { + return Double.compare(maxColumnSimilarity, o.maxColumnSimilarity); + } +} + public class AuthorMerger { private static final Double THRESHOLD = 0.95; @@ -119,6 +148,267 @@ public class AuthorMerger { }); } + public static String normalizeFullName(final String fullname) { + return nfd(fullname) + .toLowerCase() + // do not compact the regexes in a single expression, would cause StackOverflowError + // in case + // of large input strings + .replaceAll("(\\W)+", " ") + .replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ") + .replaceAll("(\\p{Punct})+", " ") + .replaceAll("(\\d)+", " ") + .replaceAll("(\\n)+", " ") + .trim(); +// return Arrays.stream(fullname.split("[\\s | , | ;]+")).map(String::toLowerCase).sorted().collect(Collectors.joining()); + } + + private static String generateAuthorkey(final Author a) { + if (a.getSurname() == null) + return "NOSURNAME"; + + return normalize(a.getSurname()); + } + +// +// public static List enrichOrcid2(List baseAuthor, List orcidAuthor) { +// if (baseAuthor == null || baseAuthor.isEmpty()) +// return orcidAuthor; +// +// if (orcidAuthor == null || orcidAuthor.isEmpty()) +// return baseAuthor; +// +// if (baseAuthor.size() == 1 && orcidAuthor.size() > 10) +// return baseAuthor; +// +// +// Map> pubClusters = baseAuthor.stream().collect(Collectors.toMap(AuthorMerger::generateAuthorkey, Arrays::asList, (a, b) -> { +// a.addAll(b); +// return a; +// })); +// +// Map> orcidClusters = baseAuthor.stream().collect(Collectors.toMap(AuthorMerger::generateAuthorkey, Arrays::asList, (a, b) -> { +// a.addAll(b); +// return a; +// })); +// +// System.out.println(pubClusters.keySet().size()); +// System.out.println(orcidClusters.keySet().size()); +// +// +// +// +// return null; +// +// +// } + + static int hammingDist(String str1, String str2) { + if (str1.length() != str2.length()) + return Math.max(str1.length(), str2.length()); + int i = 0, count = 0; + while (i < str1.length()) { + if (str1.charAt(i) != str2.charAt(i)) + count++; + i++; + } + return count; + } + + private static String authorFieldToBeCompared(Author author) { + if (StringUtils.isNotBlank(author.getSurname())) { + return author.getSurname(); + + } + if (StringUtils.isNotBlank(author.getFullname())) { + return author.getFullname(); + } + return null; + } + + public static boolean checkSimilarity3(final Author left, final Author right) { + + if (StringUtils.isNotBlank(left.getSurname()) && StringUtils.isNotBlank(left.getName()) + && + StringUtils.isNotBlank(right.getSurname()) && StringUtils.isNotBlank(right.getName()) + + ) + return left.getSurname().equalsIgnoreCase(right.getSurname()) + && left.getName().substring(0, 1).equalsIgnoreCase(right.getName().substring(0, 1)); + + final Person pl = parse(left); + final Person pr = parse(right); + + // If one of them didn't have a surname the match is false + if (!(pl.getSurname() != null && pl.getSurname().stream().anyMatch(StringUtils::isNotBlank) && + pr.getSurname() != null && pr.getSurname().stream().anyMatch(StringUtils::isNotBlank))) + return false; + + // The Authors have one surname in common + if (pl.getSurname().stream().anyMatch(sl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(sl)))) { + + // If one of them has only a surname and is the same we can say that they are the same author + if ((pl.getName() == null || pl.getName().stream().allMatch(StringUtils::isBlank)) || + (pr.getName() == null || pr.getName().stream().allMatch(StringUtils::isBlank))) + return true; + // The authors have the same initials of Name in common + if (pl + .getName() + .stream() + .anyMatch( + nl -> pr + .getName() + .stream() + .anyMatch(nr -> nr.substring(0, 1).equalsIgnoreCase(nl.substring(0, 1))))) + return true; + } + return false; + } + + public static boolean checkSimilarity2(final Author left, final Author right) { + final Person pl = parse(left); + final Person pr = parse(right); + + // If one of them didn't have a surname the match is false + if (!(pl.getSurname() != null && pl.getSurname().stream().anyMatch(StringUtils::isNotBlank) && + pr.getSurname() != null && pr.getSurname().stream().anyMatch(StringUtils::isNotBlank))) + return false; + + // The Authors have one surname in common + if (pl.getSurname().stream().anyMatch(sl -> pr.getSurname().stream().anyMatch(sr -> sr.equalsIgnoreCase(sl)))) { + + // If one of them has only a surname and is the same we can say that they are the same author + if ((pl.getName() == null || pl.getName().stream().allMatch(StringUtils::isBlank)) || + (pr.getName() == null || pr.getName().stream().allMatch(StringUtils::isBlank))) + return true; + // The authors have the same initials of Name in common + if (pl + .getName() + .stream() + .anyMatch( + nl -> pr + .getName() + .stream() + .anyMatch(nr -> nr.substring(0, 1).equalsIgnoreCase(nl.substring(0, 1))))) + return true; + } + return false; + } + + public static boolean checkSimilarity(final Author left, final Author right) { + + if (left.getSurname() == null && left.getFullname() == null) + return false; + if (right.getSurname() == null && right.getFullname() == null) + return false; + + // The Authors have the same surname, or we are tolerant from 1 different char(lets say 1 Typo) + if (StringUtils.isNotBlank(left.getSurname()) && StringUtils.isNotBlank(right.getSurname())) { + if (left.getSurname().equalsIgnoreCase(right.getSurname()) + || hammingDist(left.getSurname().toLowerCase(), right.getSurname().toLowerCase()) < 2) { + // IN case on of the two Authors has no given Name the match is true + if (StringUtils.isBlank(left.getName()) || StringUtils.isBlank(right.getName())) + return true; + // If the surname is correct, and they have the same name or the name starts with the same Letter we can + // say is the same author + if (left.getName().equalsIgnoreCase(right.getName()) + || left.getName().substring(0, 1).equalsIgnoreCase(right.getName().substring(0, 1))) + return true; + } + // Different SURNAME + else { + return false; + } + } else { + // This is the case where the two authors have or the surname or the fullname + // get the first not null of the surname or fullname of both + final String l = authorFieldToBeCompared(left); + final String r = authorFieldToBeCompared(right); + if (l == null || r == null) + return false; + // The same length means they are the same field + if (l.length() == r.length()) { + return normalize(l).equals(normalize(r)); + } + // In this case probably l contains the surname and r contains the fullname + if (l.length() < r.length()) + return normalize(r).contains(normalize(l)); + // In this case probably l contains the fullname and r contains the surname + return normalize(l).contains(normalize(r)); + } + return false; + } + + public static List enrichOrcid2(List baseAuthor, List orcidAuthor) { + + final Integer match_itm = 0; + if (baseAuthor == null || baseAuthor.isEmpty()) + return orcidAuthor; + + if (orcidAuthor == null || orcidAuthor.isEmpty()) + return baseAuthor; + + if (baseAuthor.size() == 1 && orcidAuthor.size() > 10) + return baseAuthor; + + final List oAuthor = new ArrayList<>(); + oAuthor.addAll(orcidAuthor); + + baseAuthor.forEach(ba -> { + Optional aMatch = oAuthor.stream().filter(oa -> checkSimilarity2(ba, oa)).findFirst(); + if (aMatch.isPresent()) { + final Author sameAuthor = aMatch.get(); + addPid(ba, sameAuthor.getPid()); + oAuthor.remove(sameAuthor); + } + }); + return baseAuthor; + } + + public static List enrichOrcid(List baseAuthor, List orcidAuthor) { + + if (baseAuthor == null || baseAuthor.isEmpty()) + return orcidAuthor; + + if (orcidAuthor == null || orcidAuthor.isEmpty()) + return baseAuthor; + + if (baseAuthor.size() == 1 && orcidAuthor.size() > 10) + return baseAuthor; + + final Double similarityMatrix[][] = new Double[baseAuthor.size()][orcidAuthor.size()]; + + final List maxColums = new ArrayList<>(); + + for (int i = 0; i < orcidAuthor.size(); i++) + maxColums.add(new SimilarityCellInfo()); + + for (int i = 0; i < baseAuthor.size(); i++) { + for (int j = 0; j < orcidAuthor.size(); j++) { + similarityMatrix[i][j] = sim(baseAuthor.get(i), orcidAuthor.get(j)); + if (maxColums.get(j).maxColumnSimilarity < similarityMatrix[i][j]) + maxColums.get(j).setValues(i, j, similarityMatrix[i][j]); + } + } + maxColums + .stream() + .sorted() + .filter(si -> si.maxColumnSimilarity > 0.85) + .forEach(si -> addPid(baseAuthor.get(si.authorPosition), orcidAuthor.get(si.orcidPosition).getPid())); + return baseAuthor; + + } + + private static void addPid(final Author a, final List pids) { + + if (a.getPid() == null) { + a.setPid(new ArrayList<>()); + } + + a.getPid().addAll(pids); + + } + public static String pidToComparableString(StructuredProperty pid) { final String classid = pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""; @@ -171,7 +461,7 @@ public class AuthorMerger { } } - private static String normalize(final String s) { + public static String normalize(final String s) { String[] normalized = nfd(s) .toLowerCase() // do not compact the regexes in a single expression, would cause StackOverflowError diff --git a/dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java b/dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java new file mode 100644 index 000000000..7f8d673d1 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/oa/merge/AuthorMergerTest.java @@ -0,0 +1,125 @@ + +package eu.dnetlib.oa.merge; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.StringUtils; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.merge.AuthorMerger; +import eu.dnetlib.dhp.schema.oaf.Author; + +public class AuthorMergerTest { + + @Test + public void testNormalization() { + + assertEquals("bruzzolasandro", AuthorMerger.normalizeFullName("Sandro, La Bruzzo")); + assertEquals("baglionimiriam", AuthorMerger.normalizeFullName("Miriam Baglioni")); + assertEquals("baglionimiriam", AuthorMerger.normalizeFullName("Miriam ;Baglioni,")); + + } + + public void testEnrcichAuthor() throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + + BufferedReader pr = new BufferedReader(new InputStreamReader( + AuthorMergerTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_publication.json"))); + BufferedReader or = new BufferedReader(new InputStreamReader( + AuthorMergerTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/authors_orcid.json"))); + + TypeReference> aclass = new TypeReference>() { + }; + String pubLine; + + int i = 0; + while ((pubLine = pr.readLine()) != null) { + final String pubId = pubLine; + final String MatchPidOrcid = or.readLine(); + final String pubOrcid = or.readLine(); + + final String data = pr.readLine(); + + if (StringUtils.isNotBlank(data)) { + List publicationAuthors = mapper.readValue(data, aclass); + List orcidAuthors = mapper.readValue(or.readLine(), aclass); + System.out.printf("OAF ID = %s \n", pubId); + System.out.printf("ORCID Intersected ID = %s \n", pubOrcid); + System.out.printf("OAF Author Size = %d \n", publicationAuthors.size()); + System.out.printf("Oricd Author Size = %d \n", orcidAuthors.size()); + System.out.printf("Oricd Matched PID = %s \n", MatchPidOrcid); + + long originalAuthorWithPiD = publicationAuthors + .stream() + .filter( + a -> a.getPid() != null && a + .getPid() + .stream() + .anyMatch( + p -> p.getQualifier() != null + && p.getQualifier().getClassid().toLowerCase().contains("orcid"))) + .count(); + long start = System.currentTimeMillis(); + +// final List enrichedList = AuthorMerger.enrichOrcid(publicationAuthors, orcidAuthors); + final List enrichedList = AuthorMerger.enrichOrcid2(publicationAuthors, orcidAuthors); + + long enrichedAuthorWithPid = enrichedList + .stream() + .filter( + a -> a.getPid() != null && a + .getPid() + .stream() + .anyMatch( + p -> p.getQualifier() != null + && p.getQualifier().getClassid().toLowerCase().contains("orcid"))) + .count(); + + long totalTime = (System.currentTimeMillis() - start) / 1000; + System.out + .printf( + "Enriched authors in %d seconds from %d pid to %d pid \n", totalTime, originalAuthorWithPiD, + enrichedAuthorWithPid); + + System.out.println("================="); + + if (++i > 30) + break; + } + + } + + } + + @Test + public void checkSimilarityTest() { + final Author left = new Author(); + left.setSurname("Wu"); + left.setName("M."); + left.setFullname("Wu, M."); + + System.out.println(AuthorMerger.normalizeFullName(left.getFullname())); + + final Author right = new Author(); + right.setName("Xin"); + right.setSurname("Wu"); + right.setFullname("Xin Wu"); +// System.out.println(AuthorMerger.normalize(right.getFullname())); + boolean same = AuthorMerger.checkSimilarity2(left, right); + + assertFalse(same); + + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java index be5555fc0..868f4e92d 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/orcid/DownloadORCIDTest.java @@ -32,45 +32,6 @@ import eu.dnetlib.dhp.parser.utility.VtdException; public class DownloadORCIDTest { private final Logger log = LoggerFactory.getLogger(DownloadORCIDTest.class); -// public void test() throws Exception { -// -// Configuration conf = new Configuration(); -// // Set FileSystem URI -//// conf.set("fs.defaultFS", "file://"); -// // Because of Maven -// conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); -// conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); -// -// System.setProperty("hadoop.home.dir", "file:///Users/sandro/orcid/"); -// -// final FileSystem fileSystem = FileSystem.get(conf); -// -// new ExtractORCIDDump(fileSystem).run("/Users/sandro/orcid/", "/Users/sandro/orcid/extracted"); -// -//// final GZIPInputStream gzip = new GZIPInputStream(Files.newInputStream(Paths.get("/Users/sandro/orcid/ORCID_2023_10_activities_1.tar.gz"))); -//// try(final TarArchiveInputStream tais = new TarArchiveInputStream(gzip)) { -//// -//// TarArchiveEntry entry; -//// while ((entry = tais.getNextTarEntry()) != null) { -//// -//// if (entry.isFile() && entry.getName().contains("employments")) { -//// -//// System.out.println(entry.getName()); -//// final String [] items = entry.getName().split("/"); -//// -//// final String res = IOUtils.toString(new BufferedReader(new InputStreamReader(tais))); -//// System.out.println("res = " + res); -//// -//// System.out.println(items[items.length-2]); -//// break; -//// } -//// -//// -//// } -//// } -// -// } - @Test public void testSummary() throws Exception { final String xml = IOUtils diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json new file mode 100644 index 000000000..765c0e8ff --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "mt", + "paramLongName": "master", + "paramDescription": "should be local or yarn", + "paramRequired": true + }, + { + "paramName": "op", + "paramLongName": "orcidPath", + "paramDescription": "the path of the orcid Table generated by the dump", + "paramRequired": true + }, + { + "paramName": "gp", + "paramLongName": "graphPath", + "paramDescription": "the path of the graph we want to apply enrichment", + "paramRequired": true + }, + { + "paramName": "tp", + "paramLongName": "targetPath", + "paramDescription": "the output path of the graph enriched", + "paramRequired": true + } +] \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/config-default.xml new file mode 100644 index 000000000..8a7bc8942 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/config-default.xml @@ -0,0 +1,34 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hiveMetastoreUris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + hiveJdbcUrl + jdbc:hive2://iis-cdh5-test-m3.ocean.icm.edu.pl:10000 + + + hiveDbName + openaire + + + oozie.launcher.mapreduce.user.classpath.first + true + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml new file mode 100644 index 000000000..1284cceda --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -0,0 +1,52 @@ + + + + orcidPath + the path of the orcid Table generated by the dump + + + graphPath + the path of the graph we want to apply enrichment + + + targetPath + the output path of the graph enriched + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Enrich Graph with ORCID + eu.dnetlib.dhp.enrich.orcid.SparkEnrichGraphWithOrcidAuthors + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=2g + --conf spark.sql.shuffle.partitions=3000 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --orcidPath${orcidPath} + --targetPath${targetPath} + --graphPath${graphPath}/publication + --masteryarn + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala new file mode 100644 index 000000000..a67de4b95 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/AuthorEnricher.scala @@ -0,0 +1,37 @@ +package eu.dnetlib.dhp.enrich.orcid + +import eu.dnetlib.dhp.schema.oaf.{Author, Publication} +import eu.dnetlib.dhp.schema.sx.OafUtils +import org.apache.spark.sql.Row + +import scala.collection.JavaConverters._ + +object AuthorEnricher extends Serializable { + + def createAuthor(givenName: String, familyName: String, orcid: String): Author = { + val a = new Author + a.setName(givenName) + a.setSurname(familyName) + a.setFullname(s"$givenName $familyName") + a.setPid(List(OafUtils.createSP(orcid, "ORCID", "ORCID")).asJava) + a + + } + + def toOAFAuthor(r: Row): java.util.List[Author] = { + r.getList[Row](1) + .asScala + .map(s => createAuthor(s.getAs[String]("givenName"), s.getAs[String]("familyName"), s.getAs[String]("orcid"))) + .toList + .asJava + } + +// def enrichAuthor(p:Publication,r:Row): Unit = { +// val k:Map[String, OAuthor] =r.getList[Row](1).asScala.map(s => (s.getAs[String]("orcid"), OAuthor(s.getAs[String]("givenName") ,s.getAs[String]("familyName") ))).groupBy(_._1).mapValues(_.map(_._2).head) +// println(k) +// +// +// +// } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala new file mode 100644 index 000000000..e190b2b33 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/enrich/orcid/SparkEnrichGraphWithOrcidAuthors.scala @@ -0,0 +1,119 @@ +package eu.dnetlib.dhp.enrich.orcid + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.oa.merge.AuthorMerger +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Publication, StructuredProperty} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{col, collect_set, concat, explode, expr, first, flatten, lower, size, struct} +import org.slf4j.{Logger, LoggerFactory} +import org.apache.spark.sql.types._ + +class SparkEnrichGraphWithOrcidAuthors(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val graphPath = parser.get("graphPath") + log.info(s"graphPath is '$graphPath'") + val orcidPath = parser.get("orcidPath") + log.info(s"orcidPath is '$orcidPath'") + val targetPath = parser.get("targetPath") + log.info(s"targetPath is '$targetPath'") + enrichResult(spark, graphPath, orcidPath, targetPath) + } + + def enrichResult(spark: SparkSession, graphPath: String, orcidPath: String, outputPath: String): Unit = { + val orcidPublication = generateOrcidTable(spark, orcidPath) + implicit val publicationEncoder = Encoders.bean(classOf[Publication]) + + val aschema = new StructType() + .add("id", StringType) + .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema) + .add( + "author",Encoders.bean(classOf[Author]).schema + + ) + + val schema = new StructType() + .add("id", StringType) + .add("dataInfo", Encoders.bean(classOf[DataInfo]).schema) + .add( + "instance", + ArrayType(new StructType().add("pid", ArrayType(Encoders.bean(classOf[StructuredProperty]).schema))) + ) + val entities = spark.read + .schema(schema) + .json(graphPath) + .where("datainfo.deletedbyinference = false") + .drop("datainfo") + .withColumn("instances", explode(col("instance"))) + .withColumn("pids", explode(col("instances.pid"))) + .select( + col("pids.qualifier.classid").alias("pid_schema"), + col("pids.value").alias("pid_value"), + col("id").alias("dnet_id") + ) + val orcidDnet = orcidPublication + .join( + entities, + lower(col("schema")).equalTo(lower(col("pid_schema"))) && + lower(col("value")).equalTo(lower(col("pid_value"))), + "inner" + ) + .groupBy(col("dnet_id")) + .agg(collect_set(orcidPublication("author")).alias("orcid_authors")) + .select("dnet_id", "orcid_authors") + .cache() + + val publication = spark.read.schema(publicationEncoder.schema).json(graphPath).as[Publication] + + publication + .joinWith(orcidDnet, publication("id").equalTo(orcidDnet("dnet_id")), "left") + .map { + case (p: Publication, null) => { + p + } + case (p: Publication, r: Row) => + p.setAuthor(AuthorMerger.enrichOrcid2(p.getAuthor, AuthorEnricher.toOAFAuthor(r))) + p + } + .write + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath) + } + + def generateOrcidTable(spark: SparkSession, inputPath: String): Dataset[Row] = { + val orcidAuthors = + spark.read.load(s"$inputPath/Authors").select("orcid", "familyName", "givenName", "creditName", "otherNames") + val orcidWorks = spark.read + .load(s"$inputPath/Works") + .select(col("orcid"), explode(col("pids")).alias("identifier")) + .where( + "identifier.schema = 'doi' or identifier.schema ='pmid' or identifier.schema ='pmc' or identifier.schema ='arxiv' or identifier.schema ='handle'" + ) + orcidAuthors + .join(orcidWorks, orcidAuthors("orcid").equalTo(orcidWorks("orcid"))) + .select( + col("identifier.schema").alias("schema"), + col("identifier.value").alias("value"), + struct(orcidAuthors("orcid").alias("orcid"), col("givenName"), col("familyName")).alias("author") + ) + } +} + +object SparkEnrichGraphWithOrcidAuthors { + + val log: Logger = LoggerFactory.getLogger(SparkEnrichGraphWithOrcidAuthors.getClass) + + def main(args: Array[String]): Unit = { + + new SparkEnrichGraphWithOrcidAuthors("/eu/dnetlib/dhp/enrich/orcid/enrich_graph_orcid_parameters.json", args, log) + .initialize() + .run() + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala new file mode 100644 index 000000000..0ddb7c0aa --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/EnrichOrcidTest.scala @@ -0,0 +1,12 @@ +package eu.dnetlib.dhp.enrich.orcid + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.junit.jupiter.api.Test +import org.slf4j.{Logger, LoggerFactory} + +class EnrichOrcidTest { + + val log: Logger = LoggerFactory.getLogger(getClass) + +}