From d9e07a242b8bb7f43231b8b62ec7e2fa2e5525ec Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 18 Nov 2020 14:34:55 +0100 Subject: [PATCH] extended XmlIndexingJob to accept an optional parameter: outputPath. When present, forces the job to write its output on the specified HDFS location --- .../dhp/oa/provision/XmlIndexingJob.java | 85 +++++++++++-------- .../model/SerializableSolrInputDocument.java | 3 + .../provision/input_params_update_index.json | 6 ++ .../dhp/oa/provision/oozie_app/workflow.xml | 1 + .../eu/dnetlib/dhp/oa/provision/SolrTest.java | 11 +-- .../dhp/oa/provision/XmlIndexingJobTest.java | 59 +++++++++++-- 6 files changed, 117 insertions(+), 48 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index c51dc3b58..f7a1ee49d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -1,8 +1,31 @@ package eu.dnetlib.dhp.oa.provision; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import com.lucidworks.spark.util.SolrSupport; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.solr.common.SolrInputDocument; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.stream.StreamResult; +import javax.xml.transform.stream.StreamSource; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; @@ -10,29 +33,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Optional; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerException; -import javax.xml.transform.stream.StreamResult; -import javax.xml.transform.stream.StreamSource; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.solr.common.SolrInputDocument; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.lucidworks.spark.util.SolrSupport; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; -import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; public class XmlIndexingJob { @@ -48,6 +49,8 @@ public class XmlIndexingJob { private int batchSize; + private String outputPath; + private SparkSession spark; public static void main(String[] args) throws Exception { @@ -72,12 +75,17 @@ public class XmlIndexingJob { final String format = parser.get("format"); log.info("format: {}", format); + final String outputPath = Optional.ofNullable(parser.get("outputPath")) + .orElse(null); + log.info("outputPath: {}", outputPath); + final Integer batchSize = parser.getObjectMap().containsKey("batchSize") ? Integer.valueOf(parser.get("batchSize")) : DEFAULT_BATCH_SIZE; log.info("batchSize: {}", batchSize); final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); runWithSparkSession( conf, @@ -86,15 +94,16 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize).run(isLookup); + new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup); }); } - public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize) { + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; + this.outputPath = outputPath; } public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException { @@ -116,15 +125,23 @@ public class XmlIndexingJob { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - RDD docs = sc - .sequenceFile(inputPath, Text.class, Text.class) - .map(t -> t._2().toString()) - .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) - .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)) - .rdd(); + JavaRDD docs = sc + .sequenceFile(inputPath, Text.class, Text.class) + .map(t -> t._2().toString()) + .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) + .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)); - final String collection = ProvisionConstants.getCollectionName(format); - SolrSupport.indexDocs(zkHost, collection, batchSize, docs); + if (StringUtils.isNotBlank(outputPath)) { + spark.createDataset( + docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } else { + final String collection = ProvisionConstants.getCollectionName(format); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + } } protected static String toIndexRecord(Transformer tr, final String record) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java index e8d910e8c..05b39ab6f 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java @@ -6,6 +6,9 @@ import org.apache.solr.common.SolrInputField; import java.util.HashMap; import java.util.Map; +/** + * Wrapper class needed to make the SolrInputDocument compatible with the Kryo serialization mechanism. + */ public class SerializableSolrInputDocument extends SolrInputDocument { public SerializableSolrInputDocument() { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json index 3396020e0..3169648fb 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json @@ -22,5 +22,11 @@ "paramLongName": "batchSize", "paramDescription": "size of the batch of documents sent to solr", "paramRequired": false + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments", + "paramRequired": false } ] diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index e2b74b9aa..ee636b68e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -638,6 +638,7 @@ --isLookupUrl${isLookupUrl} --format${format} --batchSize${batchSize} + --outputPath${outputPath} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java index 426c8e678..186cb964a 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrTest.java @@ -54,13 +54,14 @@ public abstract class SolrTest { miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME); // override settings in the solrconfig include - // System.setProperty("solr.tests.maxBufferedDocs", "100000"); - // System.setProperty("solr.tests.maxIndexingThreads", "-1"); - // System.setProperty("solr.tests.ramBufferSizeMB", "100"); + System.setProperty("solr.tests.maxBufferedDocs", "100000"); + System.setProperty("solr.tests.maxIndexingThreads", "-1"); + System.setProperty("solr.tests.ramBufferSizeMB", "100"); // use non-test classes so RandomizedRunner isn't necessary - // System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); - // System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); + System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler"); + System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory"); + System.setProperty("solr.lock.type", "single"); log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString()); log diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java index 2d62fc0af..b855f4d88 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJobTest.java @@ -1,26 +1,33 @@ package eu.dnetlib.dhp.oa.provision; -import java.io.IOException; -import java.net.URI; - +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; +import org.dom4j.io.SAXReader; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; @ExtendWith(MockitoExtension.class) public class XmlIndexingJobTest extends SolrTest { @@ -58,6 +65,7 @@ public class XmlIndexingJobTest extends SolrTest { SparkConf conf = new SparkConf(); conf.setAppName(XmlIndexingJobTest.class.getSimpleName()); + conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class }); conf.setMaster("local[1]"); conf.set("spark.driver.host", "localhost"); @@ -78,7 +86,7 @@ public class XmlIndexingJobTest extends SolrTest { } @Test - public void testXmlIndexingJob() throws Exception { + public void testXmlIndexingJob_onSolr() throws Exception { String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; @@ -87,13 +95,46 @@ public class XmlIndexingJobTest extends SolrTest { .sequenceFile(inputPath, Text.class, Text.class) .count(); - new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, null).run(isLookupClient); Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus()); QueryResponse rsp = miniCluster.getSolrClient().query(new SolrQuery().add(CommonParams.Q, "*:*")); - Assertions.assertEquals(nRecord, rsp.getResults().getNumFound()); + Assertions.assertEquals(nRecord, rsp.getResults().getNumFound(), + "the number of indexed records should be equal to the number of input records"); + } + + @Test + public void testXmlIndexingJob_saveOnHDFS() throws Exception { + final String ID_XPATH = "//header/*[local-name()='objIdentifier']"; + + String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml"; + + final JavaPairRDD xmlRecords = JavaSparkContext + .fromSparkContext(spark.sparkContext()) + .sequenceFile(inputPath, Text.class, Text.class); + long nRecord = xmlRecords.count(); + long xmlIdUnique = xmlRecords + .map(t -> t._2().toString()) + .map(s -> new SAXReader().read(new StringReader(s)).valueOf(ID_XPATH)) + .distinct().count(); + Assertions.assertEquals(nRecord, xmlIdUnique, "IDs should be unique among input records"); + + final String outputPath = workingDir.resolve("outputPath").toAbsolutePath().toString(); + new XmlIndexingJob(spark, inputPath, FORMAT, batchSize, outputPath).run(isLookupClient); + + final Dataset solrDocs = spark.read() + .load(outputPath) + .as(Encoders.kryo(SerializableSolrInputDocument.class)); + long docIdUnique = solrDocs.map((MapFunction) doc -> { + final SolrInputField id = doc.getField("__indexrecordidentifier"); + return id.getFirstValue().toString(); + }, Encoders.STRING()) + .distinct() + .count(); + Assertions.assertEquals(xmlIdUnique, docIdUnique, "IDs should be unique among the output records"); + } }