diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index f7a1ee49dd..9ff387c8c1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -1,14 +1,21 @@ package eu.dnetlib.dhp.oa.provision; -import com.lucidworks.spark.util.SolrSupport; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; -import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; -import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Optional; + +import javax.swing.text.html.Option; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; @@ -22,23 +29,24 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.stream.StreamResult; -import javax.xml.transform.stream.StreamSource; -import java.io.IOException; -import java.io.StringReader; -import java.io.StringWriter; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Optional; +import com.lucidworks.spark.util.SolrSupport; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; public class XmlIndexingJob { private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); + public enum OutputFormat { + SOLR, HDFS + } + private static final Integer DEFAULT_BATCH_SIZE = 1000; protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; @@ -49,6 +57,8 @@ public class XmlIndexingJob { private int batchSize; + private OutputFormat outputFormat; + private String outputPath; private SparkSession spark; @@ -75,17 +85,28 @@ public class XmlIndexingJob { final String format = parser.get("format"); log.info("format: {}", format); - final String outputPath = Optional.ofNullable(parser.get("outputPath")) - .orElse(null); + final String outputPath = Optional + .ofNullable(parser.get("outputPath")) + .map(StringUtils::trim) + .orElse(null); log.info("outputPath: {}", outputPath); - final Integer batchSize = parser.getObjectMap().containsKey("batchSize") - ? Integer.valueOf(parser.get("batchSize")) - : DEFAULT_BATCH_SIZE; + final Integer batchSize = Optional + .ofNullable(parser.get("batchSize")) + .map(Integer::valueOf) + .orElse(DEFAULT_BATCH_SIZE); log.info("batchSize: {}", batchSize); + final OutputFormat outputFormat = Optional + .ofNullable(parser.get("outputFormat")) + .map(OutputFormat::valueOf) + .orElse(OutputFormat.SOLR); + log.info("outputFormat: {}", outputFormat); + final SparkConf conf = new SparkConf(); - conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); runWithSparkSession( conf, @@ -94,15 +115,18 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup); + new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup); }); } - public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) { + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, + OutputFormat outputFormat, + String outputPath) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; + this.outputFormat = outputFormat; this.outputPath = outputPath; } @@ -126,21 +150,27 @@ public class XmlIndexingJob { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD docs = sc - .sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)); + .sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)); - if (StringUtils.isNotBlank(outputPath)) { - spark.createDataset( - docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), - Encoders.kryo(SerializableSolrInputDocument.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } else { - final String collection = ProvisionConstants.getCollectionName(format); - SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + switch (outputFormat) { + case SOLR: + final String collection = ProvisionConstants.getCollectionName(format); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + break; + case HDFS: + spark + .createDataset( + docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + break; + default: + throw new IllegalArgumentException("invalid outputFormat: " + outputFormat); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java index 05b39ab6f6..bbda1522e0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java @@ -1,22 +1,23 @@ -package eu.dnetlib.dhp.oa.provision.model; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.SolrInputField; +package eu.dnetlib.dhp.oa.provision.model; import java.util.HashMap; import java.util.Map; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; + /** * Wrapper class needed to make the SolrInputDocument compatible with the Kryo serialization mechanism. */ public class SerializableSolrInputDocument extends SolrInputDocument { - public SerializableSolrInputDocument() { - super(new HashMap<>()); - } + public SerializableSolrInputDocument() { + super(new HashMap<>()); + } - public SerializableSolrInputDocument(Map fields) { - super(fields); - } + public SerializableSolrInputDocument(Map fields) { + super(fields); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 3169648fb9..46286e06ad 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -24,7 +24,13 @@ "paramRequired": false }, { - "paramName": "o", + "paramName": "of", + "paramLongName": "outputFormat", + "paramDescription": "decides the job output format, SOLR | HDFS", + "paramRequired": false + }, + { + "paramName": "op", "paramLongName": "outputPath", "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", "paramRequired": false diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index ee636b68e0..9280678c14 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -42,6 +42,7 @@ *:* query used in the deleted by query operation + sparkDriverMemoryForJoining memory for driver process @@ -638,7 +639,8 @@ --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} - --outputPath${outputPath} + --outputFormat${outputFormat} + --outputPath${workingDir}/solr_documents diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java index b855f4d881..d7bcb31851 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -1,10 +1,10 @@ package eu.dnetlib.dhp.oa.provision; -import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; -import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.solr.client.solrj.SolrQuery; @@ -25,9 +25,10 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import java.io.IOException; -import java.io.StringReader; -import java.net.URI; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) public class XmlIndexingJobTest extends SolrTest { @@ -65,7 +66,9 @@ public class XmlIndexingJobTest extends SolrTest { SparkConf conf = new SparkConf(); conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); - conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); conf.setMaster("local[1]"); conf.set("spark.driver.host", "localhost"); @@ -95,13 +98,16 @@ public class XmlIndexingJobTest extends SolrTest { .sequenceFile(inputPath, Text.class, Text.class) .count(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, null).run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.SOLR, null) + .run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); QueryResponse rsp = miniCluster.getSolrClient().query(new SolrQuery().add(CommonParams.Q, "*:*")); - Assertions.assertEquals(nRecord, rsp.getResults().getNumFound(), + Assertions + .assertEquals( + nRecord, rsp.getResults().getNumFound(), "the number of indexed records should be equal to the number of input records"); } @@ -112,27 +118,30 @@ public class XmlIndexingJobTest extends SolrTest { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; final JavaPairRDD xmlRecords = JavaSparkContext - .fromSparkContext(spark.sparkContext()) - .sequenceFile(inputPath, Text.class, Text.class); + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class); long nRecord = xmlRecords.count(); long xmlIdUnique = xmlRecords - .map(t -> t._2().toString()) - .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) - .distinct().count(); + .map(t -> t._2().toString()) + .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) + .distinct() + .count(); Assertions.assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, outputPath).run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, XmlIndexingJob.OutputFormat.HDFS, outputPath) + .run(isLookupClient); - final Dataset solrDocs = spark.read() - .load(outputPath) - .as(Encoders.kryo(SerializableSolrInputDocument.class)); + final Dataset solrDocs = spark + .read() + .load(outputPath) + .as(Encoders.kryo(SerializableSolrInputDocument.class)); long docIdUnique = solrDocs.map((MapFunction) doc -> { final SolrInputField id = doc.getField("__indexrecordidentifier"); return id.getFirstValue().toString(); - }, Encoders.STRING()) - .distinct() - .count(); + }, Encoders.STRING()) + .distinct() + .count(); Assertions.assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records"); }