From 77f59b1b1084cc79ee0bd9e64222fa30eed05f7a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 6 Apr 2020 19:37:27 +0200 Subject: [PATCH] dataset based provision WIP --- ...mlIndexingJob.java => XmlIndexingJob.java} | 10 ++----- .../provision/input_params_update_index.json | 29 +++++++++++++++---- .../dhp/oa/provision/oozie_app/workflow.xml | 6 ++-- 3 files changed, 30 insertions(+), 15 deletions(-) rename dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/{SparkXmlIndexingJob.java => XmlIndexingJob.java} (95%) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java similarity index 95% rename from dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java rename to dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index eae8cf1a1..84538c924 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SparkXmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.provision; import com.lucidworks.spark.util.SolrSupport; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; @@ -11,14 +10,11 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.solr.common.SolrInputDocument; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +31,9 @@ import java.util.Optional; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -public class SparkXmlIndexingJob { +public class XmlIndexingJob { - private static final Logger log = LoggerFactory.getLogger(SparkXmlIndexingJob.class); + private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); private static final Integer DEFAULT_BATCH_SIZE = 1000; @@ -50,7 +46,7 @@ public class SparkXmlIndexingJob { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( - SparkXmlIndexingJob.class.getResourceAsStream( + XmlIndexingJob.class.getResourceAsStream( "/eu/dnetlib/dhp/oa/provision/input_params_update_index.json"))); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 146cc9943..3396020e0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -1,7 +1,26 @@ [ - {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, - {"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true}, - {"paramName":"i", "paramLongName":"inputPath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true}, - {"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true}, - {"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false} + { + "paramName": "is", + "paramLongName": "isLookupUrl", + "paramDescription": "URL of the isLookUp Service", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "the path of the sequence file to read the XML records", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "format", + "paramDescription": "MDFormat name found in the IS profile", + "paramRequired": true + }, + { + "paramName": "b", + "paramLongName": "batchSize", + "paramDescription": "size of the batch of documents sent to solr", + "paramRequired": false + } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 5bc89396b..e6587ef5e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -393,12 +393,12 @@ yarn cluster to_solr_index - eu.dnetlib.dhp.oa.provision.SparkXmlIndexingJob + eu.dnetlib.dhp.oa.provision.XmlIndexingJob dhp-graph-provision-${projectVersion}.jar - --executor-cores ${sparkExecutorCoresForIndexing} - --executor-memory ${sparkExecutorMemoryForIndexing} + --executor-memory=${sparkExecutorMemoryForIndexing} --driver-memory=${sparkDriverMemoryForIndexing} + --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}