dataset based provision WIP
This commit is contained in:
parent
e355961997
commit
77f59b1b10
|
@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
import com.lucidworks.spark.util.SolrSupport;
|
import com.lucidworks.spark.util.SolrSupport;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
|
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
|
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
|
||||||
|
@ -11,14 +10,11 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.rdd.RDD;
|
import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -35,9 +31,9 @@ import java.util.Optional;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
public class SparkXmlIndexingJob {
|
public class XmlIndexingJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkXmlIndexingJob.class);
|
private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
|
||||||
|
|
||||||
private static final Integer DEFAULT_BATCH_SIZE = 1000;
|
private static final Integer DEFAULT_BATCH_SIZE = 1000;
|
||||||
|
|
||||||
|
@ -50,7 +46,7 @@ public class SparkXmlIndexingJob {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils.toString(
|
IOUtils.toString(
|
||||||
SparkXmlIndexingJob.class.getResourceAsStream(
|
XmlIndexingJob.class.getResourceAsStream(
|
||||||
"/eu/dnetlib/dhp/oa/provision/input_params_update_index.json")));
|
"/eu/dnetlib/dhp/oa/provision/input_params_update_index.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
|
@ -1,7 +1,26 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{
|
||||||
{"paramName":"is", "paramLongName":"isLookupUrl", "paramDescription": "URL of the isLookUp Service", "paramRequired": true},
|
"paramName": "is",
|
||||||
{"paramName":"i", "paramLongName":"inputPath", "paramDescription": "the path of the sequence file to read the XML records", "paramRequired": true},
|
"paramLongName": "isLookupUrl",
|
||||||
{"paramName":"f", "paramLongName":"format", "paramDescription": "MDFormat name found in the IS profile", "paramRequired": true},
|
"paramDescription": "URL of the isLookUp Service",
|
||||||
{"paramName":"b", "paramLongName":"batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false}
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "i",
|
||||||
|
"paramLongName": "inputPath",
|
||||||
|
"paramDescription": "the path of the sequence file to read the XML records",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "f",
|
||||||
|
"paramLongName": "format",
|
||||||
|
"paramDescription": "MDFormat name found in the IS profile",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "b",
|
||||||
|
"paramLongName": "batchSize",
|
||||||
|
"paramDescription": "size of the batch of documents sent to solr",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
]
|
]
|
||||||
|
|
|
@ -393,12 +393,12 @@
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>to_solr_index</name>
|
<name>to_solr_index</name>
|
||||||
<class>eu.dnetlib.dhp.oa.provision.SparkXmlIndexingJob</class>
|
<class>eu.dnetlib.dhp.oa.provision.XmlIndexingJob</class>
|
||||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores ${sparkExecutorCoresForIndexing}
|
--executor-memory=${sparkExecutorMemoryForIndexing}
|
||||||
--executor-memory ${sparkExecutorMemoryForIndexing}
|
|
||||||
--driver-memory=${sparkDriverMemoryForIndexing}
|
--driver-memory=${sparkDriverMemoryForIndexing}
|
||||||
|
--conf spark.dynamicAllocation.enabled=true
|
||||||
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
|
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
|
Loading…
Reference in New Issue