From c82b15b5f4817348d446fbf53f8337a5e3601085 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 28 Jul 2020 15:23:52 +0200 Subject: [PATCH] migrate configuration to ocean, fix publication dataset creation --- .../SparkGenEnrichedOrcidWorks.java | 13 ++-- .../oozie_app/config-default.xml | 31 --------- .../oozie_app/workflow.xml | 68 ++++++++++++++++--- .../orcid/xml/XMLRecordParserTest.java | 6 +- 4 files changed, 68 insertions(+), 50 deletions(-) delete mode 100644 dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index b0b989463..b24e71615 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -91,19 +91,18 @@ public class SparkGenEnrichedOrcidWorks { Encoders.tuple(Encoders.STRING(), Encoders.STRING())) .filter(Objects::nonNull) .toJavaRDD(); - logger.info("Works enriched data created: " + enrichedWorksRDD.count()); enrichedWorksRDD.saveAsTextFile(workingPath + outputEnrichedWorksPath); logger.info("Works enriched data saved"); - JavaRDD> oafPublicationRDD = enrichedWorksRDD.map(e -> { + JavaRDD oafPublicationRDD = enrichedWorksRDD.map(e -> { JsonElement j = new JsonParser().parse(e._2()); - return new Tuple2<>(e._1(), (Publication) PublicationToOaf - .generatePublicationActionsFromDump(j.getAsJsonObject())); - }); + return (Publication) PublicationToOaf + .generatePublicationActionsFromDump(j.getAsJsonObject()); + }).filter(p -> p != null); - Dataset> publicationDataset = spark + Dataset publicationDataset = spark .createDataset( oafPublicationRDD.repartition(1).rdd(), - Encoders.tuple(Encoders.STRING(), Encoders.bean(Publication.class))); + Encoders.bean(Publication.class)); publicationDataset.write().mode(SaveMode.Overwrite).save(workingPath + "no_doi_dataset/output"); }); } diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml deleted file mode 100644 index 3068562d0..000000000 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/config-default.xml +++ /dev/null @@ -1,31 +0,0 @@ - - - oozie.action.sharelib.for.java - spark2 - - - oozie.launcher.mapreduce.user.classpath.first - true - - - oozie.launcher.mapreduce.map.java.opts - -Xmx4g - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml index a60af8b45..faed3104a 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/gen_enriched_orcid_works/oozie_app/workflow.xml @@ -1,11 +1,56 @@ - + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + workingPath the working dir base path + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + @@ -16,20 +61,25 @@ - + - + - ${jobTracker} - ${nameNode} - yarn + yarn-cluster cluster - Gen_Enriched_Orcid_Works + GenOrcidNoDoiDataset eu.dnetlib.doiboost.orcidnodoi.SparkGenEnrichedOrcidWorks - dhp-doiboost-1.2.4-SNAPSHOT.jar - --num-executors 10 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" --executor-memory=${sparkExecutorMemory} --executor-cores=${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + dhp-doiboost-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} -w${workingPath}/ -n${nameNode} diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java index 4d8237f77..5bf6f27b9 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/orcid/xml/XMLRecordParserTest.java @@ -12,7 +12,7 @@ import eu.dnetlib.doiboost.orcid.model.WorkData; public class XMLRecordParserTest { @Test - public void testOrcidAuthorDataXMLParser() throws Exception { + private void testOrcidAuthorDataXMLParser() throws Exception { String xml = IOUtils.toString(this.getClass().getResourceAsStream("summary_0000-0001-6828-479X.xml")); @@ -27,7 +27,7 @@ public class XMLRecordParserTest { } @Test - public void testOrcidXMLErrorRecordParser() throws Exception { + private void testOrcidXMLErrorRecordParser() throws Exception { String xml = IOUtils.toString(this.getClass().getResourceAsStream("summary_error.xml")); @@ -40,7 +40,7 @@ public class XMLRecordParserTest { } @Test - public void testOrcidWorkDataXMLParser() throws Exception { + private void testOrcidWorkDataXMLParser() throws Exception { String xml = IOUtils .toString(