From b7b6be12a51c81b2b7469684cf18bc8a3014aec4 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Mon, 29 Jun 2020 18:03:16 +0200 Subject: [PATCH] fixed enriched works generation --- .../doiboost/orcid/json/JsonHelper.java | 6 +-- .../orcidnodoi/ActivitiesDumpReader.java | 4 +- .../orcidnodoi/GenOrcidAuthorWork.java | 1 + .../SparkGenEnrichedOrcidWorks.java | 29 +++++------ .../orcidnodoi/similarity/AuthorMatcher.java | 48 +++++-------------- .../orcidnodoi/xml/XMLRecordParserNoDoi.java | 4 +- .../oozie_app/config-default.xml | 17 +++++-- .../oozie_app/workflow.xml | 24 +++++++--- 8 files changed, 66 insertions(+), 67 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/json/JsonHelper.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/json/JsonHelper.java index bfd6f7447d..94f7d8c913 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/json/JsonHelper.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/json/JsonHelper.java @@ -2,16 +2,12 @@ package eu.dnetlib.doiboost.orcid.json; import com.google.gson.Gson; -import com.google.gson.JsonObject; import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi; public class JsonHelper { public static String createOidWork(WorkDataNoDoi workData) { - JsonObject oidWork = new JsonObject(); - oidWork.addProperty("oid", workData.getOid()); - oidWork.addProperty("work", new Gson().toJson(workData)); - return oidWork.toString(); + return new Gson().toJson(workData); } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/ActivitiesDumpReader.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/ActivitiesDumpReader.java index 506641b813..bf63568d8b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/ActivitiesDumpReader.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/ActivitiesDumpReader.java @@ -26,8 +26,8 @@ import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi; public class ActivitiesDumpReader { - private static final int MAX_XML_WORKS_PARSED = -1; - private static final int XML_WORKS_PARSED_COUNTER_LOG_INTERVAL = 100000; + private static final int MAX_XML_WORKS_PARSED = 100; + private static final int XML_WORKS_PARSED_COUNTER_LOG_INTERVAL = 10; public static void parseGzActivities(Configuration conf, String inputUri, Path outputPath) throws Exception { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/GenOrcidAuthorWork.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/GenOrcidAuthorWork.java index bbaa5acca3..8dcee796c2 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/GenOrcidAuthorWork.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/GenOrcidAuthorWork.java @@ -45,6 +45,7 @@ public class GenOrcidAuthorWork extends OrcidDSManager { Log.info("HDFS URI: " + hdfsServerUri); workingPath = parser.get("workingPath"); Log.info("Working Path: " + workingPath); + hdfsOrcidDefaultPath = workingPath; activitiesFileNameTarGz = parser.get("activitiesFileNameTarGz"); Log.info("Activities File Name: " + activitiesFileNameTarGz); outputWorksPath = parser.get("outputWorksPath"); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index 9d9c5bc4a4..ae1e4dae61 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -24,6 +24,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.doiboost.orcid.json.JsonHelper; import eu.dnetlib.doiboost.orcid.model.AuthorData; import eu.dnetlib.doiboost.orcidnodoi.model.WorkDataNoDoi; import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher; @@ -31,9 +32,9 @@ import scala.Tuple2; public class SparkGenEnrichedOrcidWorks { + static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class); + public static void main(String[] args) throws IOException, Exception { - Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class); - logger.info("[ SparkGenerateDoiAuthorList STARTED]"); final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils @@ -46,13 +47,9 @@ public class SparkGenEnrichedOrcidWorks { .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); - logger.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String workingPath = parser.get("workingPath"); - logger.info("workingPath: ", workingPath); final String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath"); - logger.info("outputEnrichedWorksPath: ", outputEnrichedWorksPath); final String outputWorksPath = parser.get("outputWorksPath"); - logger.info("outputWorksPath: ", outputWorksPath); SparkConf conf = new SparkConf(); runWithSparkSession( @@ -67,30 +64,33 @@ public class SparkGenEnrichedOrcidWorks { .createDataset( summariesRDD.map(seq -> loadAuthorFromJson(seq._1(), seq._2())).rdd(), Encoders.bean(AuthorData.class)); + logger.info("Authors data loaded: " + summariesDataset.count()); JavaPairRDD activitiesRDD = sc - .sequenceFile(workingPath + outputWorksPath + "works_X.seq", Text.class, Text.class); + .sequenceFile(workingPath + outputWorksPath + "*.seq", Text.class, Text.class); Dataset activitiesDataset = spark .createDataset( activitiesRDD.map(seq -> loadWorkFromJson(seq._1(), seq._2())).rdd(), Encoders.bean(WorkDataNoDoi.class)); + logger.info("Works data loaded: " + activitiesDataset.count()); - activitiesDataset + JavaRDD> enrichedWorksRDD = activitiesDataset .joinWith( summariesDataset, activitiesDataset.col("oid").equalTo(summariesDataset.col("oid")), "inner") .map( - (MapFunction, Tuple2>) value -> { + (MapFunction, Tuple2>) value -> { WorkDataNoDoi w = value._1; AuthorData a = value._2; AuthorMatcher.match(a, w.getContributors()); - return new Tuple2<>(a.getOid(), w); + return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w)); }, - Encoders.tuple(Encoders.STRING(), Encoders.bean(WorkDataNoDoi.class))) + Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .filter(Objects::nonNull) - .toJavaRDD() - .saveAsTextFile(workingPath + outputEnrichedWorksPath); - ; + .toJavaRDD(); + logger.info("Works enriched data created: " + enrichedWorksRDD.count()); + enrichedWorksRDD.repartition(10).saveAsTextFile(workingPath + outputEnrichedWorksPath); + logger.info("Works enriched data saved"); }); } @@ -105,6 +105,7 @@ public class SparkGenEnrichedOrcidWorks { } private static WorkDataNoDoi loadWorkFromJson(Text orcidId, Text json) { + WorkDataNoDoi workData = new Gson().fromJson(json.toString(), WorkDataNoDoi.class); return workData; } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/similarity/AuthorMatcher.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/similarity/AuthorMatcher.java index 09fd8b36b3..1e4c38bef5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/similarity/AuthorMatcher.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/similarity/AuthorMatcher.java @@ -33,15 +33,13 @@ public class AuthorMatcher { List matchCounters = Arrays.asList(matchCounter); Contributor contributor = null; contributors.forEach(c -> { - if (normalize(c.getCreditName()).contains(normalize(author.getName())) || - normalize(c.getCreditName()).contains(normalize(author.getSurname())) || - ((author.getOtherName() != null) - && normalize(c.getCreditName()).contains(normalize(author.getOtherName())))) { + if (simpleMatch(c.getCreditName(), author.getName()) || + simpleMatch(c.getCreditName(), author.getSurname()) || + simpleMatch(c.getCreditName(), author.getOtherName())) { matchCounters.set(0, matchCounters.get(0) + 1); c.setSimpleMatch(true); } }); - logger.info("match counter: " + Integer.toString(matchCounters.get(0))); if (matchCounters.get(0) == 1) { updateAuthorsSimpleMatch(contributors, author); } else if (matchCounters.get(0) > 1) { @@ -50,7 +48,6 @@ public class AuthorMatcher { .filter(c -> c.isSimpleMatch()) .map(c -> { c.setScore(bestMatch(author.getName(), author.getSurname(), c.getCreditName())); - logger.debug("nella map: " + c.getCreditName() + " score: " + c.getScore()); return c; }) .filter(c -> c.getScore() >= threshold) @@ -59,24 +56,21 @@ public class AuthorMatcher { if (optCon.isPresent()) { bestMatchContributor = optCon.get(); bestMatchContributor.setBestMatch(true); - logger.info("best match: " + bestMatchContributor.getCreditName()); updateAuthorsSimilarityMatch(contributors, author); } } - logger.info("UPDATED contributors: "); - contributors.forEach(c -> { - logger - .info( - c.getOid() + " - " + c.getCreditName() + " - " + - c.getName() + " - " + c.getSurname() + " - " + - c.getRole() + " - " + c.getSequence()); - }); + } + + private static boolean simpleMatch(String name, String searchValue) { + if (searchValue == null) { + return false; + } + return normalize(name).contains(normalize(searchValue)); } private static Double bestMatch(String authorSurname, String authorName, String contributor) { - logger.debug(authorSurname + " " + authorName + " vs " + contributor); String[] contributorSplitted = contributor.split(" "); if (contributorSplitted.length == 0) { return 0.0; @@ -90,10 +84,6 @@ public class AuthorMatcher { } contributorSurname = joiner.toString(); } - logger - .debug( - "contributorName: " + contributorName + - " contributorSurname: " + contributorSurname); String authorNameNrm = normalize(authorName); String authorSurnameNrm = normalize(authorSurname); String contributorNameNrm = normalize(contributorName); @@ -108,8 +98,6 @@ public class AuthorMatcher { private static Double similarity(String nameA, String surnameA, String nameB, String surnameB) { Double score = similarityJaroWinkler(nameA, surnameA, nameB, surnameB); - logger - .debug(nameA + ", " + surnameA + " <> " + nameB + ", " + surnameB + " score: " + Double.toString(score)); return score; } @@ -118,6 +106,9 @@ public class AuthorMatcher { } private static String normalize(final String s) { + if (s == null) { + return new String(""); + } return nfd(s) .toLowerCase() // do not compact the regexes in a single expression, would cause StackOverflowError @@ -142,7 +133,6 @@ public class AuthorMatcher { private static void updateAuthorsSimpleMatch(List contributors, AuthorData author) { contributors.forEach(c -> { if (c.isSimpleMatch()) { - logger.info("simple match on : " + c.getCreditName()); c.setName(author.getName()); c.setSurname(author.getSurname()); c.setOid(author.getOid()); @@ -152,21 +142,10 @@ public class AuthorMatcher { } private static void updateAuthorsSimilarityMatch(List contributors, AuthorData author) { - logger.info("inside updateAuthorsSimilarityMatch ..."); - contributors.forEach(c -> { - logger - .info( - c.getOid() + " - " + c.getCreditName() + " - " + - c.getName() + " - " + c.getSurname() + " - " + - c.getRole() + " - " + c.getSequence() + " - best: " + c.isBestMatch() + " - simpe: " - + c.isSimpleMatch()); - }); - contributors .stream() .filter(c -> c.isBestMatch()) .forEach(c -> { - logger.info("similarity match on : " + c.getCreditName()); c.setName(author.getName()); c.setSurname(author.getSurname()); c.setOid(author.getOid()); @@ -184,7 +163,6 @@ public class AuthorMatcher { c.getSequence().equals("additional"))) .count() > 0) { seqFound = true; - logger.info("sequence data found"); } if (!seqFound) { List seqIds = Arrays.asList(0); diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/xml/XMLRecordParserNoDoi.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/xml/XMLRecordParserNoDoi.java index 6e5771547a..ae96a322f3 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/xml/XMLRecordParserNoDoi.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/xml/XMLRecordParserNoDoi.java @@ -41,7 +41,6 @@ public class XMLRecordParserNoDoi { public static WorkDataNoDoi VTDParseWorkData(byte[] bytes) throws VtdException, EncodingException, EOFException, EntityException, ParseException, XPathParseException, NavException, XPathEvalException { - logger.info("parsing xml ..."); final VTDGen vg = new VTDGen(); vg.setDoc(bytes); vg.parse(true); @@ -191,6 +190,9 @@ public class XMLRecordParserNoDoi { nameIndex++; } } + if (contributors.size() == 0) { + return contributors; + } int sequenceIndex = 0; ap.selectXPath("//work:contributor/work:contributor-attributes/work:contributor-sequence"); diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml index f2d51e260f..3068562d06 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml @@ -8,15 +8,24 @@ true - oozie.launcher.mapreduce.map.java.opts - -Xmx4g + oozie.launcher.mapreduce.map.java.opts + -Xmx4g jobTracker - hadoop-rm3.garr-pa1.d4science.org:8032 + yarnRM nameNode - hdfs://hadoop-rm1.garr-pa1.d4science.org:8020 + hdfs://nameservice1 + + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml index 33fbdf8756..df5e0e76f7 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml @@ -71,10 +71,9 @@ the shell command that downloads and puts to hdfs orcid activity file X - + - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -133,6 +132,7 @@ -n${nameNode} -fORCID_2019_activites_0.tar.gz -owno_doi_works/works_0.seq + -oewno_doi_enriched_works/ @@ -169,6 +169,7 @@ -n${nameNode} -fORCID_2019_activites_1.tar.gz -owno_doi_works/works_1.seq + -oewno_doi_enriched_works/ @@ -205,6 +206,7 @@ -n${nameNode} -fORCID_2019_activites_2.tar.gz -owno_doi_works/works_2.seq + -oewno_doi_enriched_works/ @@ -241,6 +243,7 @@ -n${nameNode} -fORCID_2019_activites_3.tar.gz -owno_doi_works/works_3.seq + -oewno_doi_enriched_works/ @@ -277,6 +280,7 @@ -n${nameNode} -fORCID_2019_activites_4.tar.gz -owno_doi_works/works_4.seq + -oewno_doi_enriched_works/ @@ -313,6 +317,7 @@ -n${nameNode} -fORCID_2019_activites_5.tar.gz -owno_doi_works/works_5.seq + -oewno_doi_enriched_works/ @@ -349,6 +354,7 @@ -n${nameNode} -fORCID_2019_activites_6.tar.gz -owno_doi_works/works_6.seq + -oewno_doi_enriched_works/ @@ -386,6 +392,7 @@ -n${nameNode} -fORCID_2019_activites_7.tar.gz -owno_doi_works/works_7.seq + -oewno_doi_enriched_works/ @@ -422,6 +429,7 @@ -n${nameNode} -fORCID_2019_activites_8.tar.gz -owno_doi_works/works_8.seq + -oewno_doi_enriched_works/ @@ -458,6 +466,7 @@ -n${nameNode} -fORCID_2019_activites_9.tar.gz -owno_doi_works/works_9.seq + -oewno_doi_enriched_works/ @@ -494,11 +503,12 @@ -n${nameNode} -fORCID_2019_activites_X.tar.gz -owno_doi_works/works_X.seq + -oewno_doi_enriched_works/ - + @@ -509,12 +519,14 @@ cluster Gen_Enriched_Orcid_Works eu.dnetlib.doiboost.orcidnodoi.SparkGenEnrichedOrcidWorks - dhp-doiboost-1.2.2-SNAPSHOT.jar + dhp-doiboost-1.2.4-SNAPSHOT.jar --num-executors 10 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} -w${workingPath}/ + -n${nameNode} + -f- -owno_doi_works/ - -oewno_doi_enriched_works/ + -oewno_doi_enriched_works/output