test for XmlIndexingJob based on a local miniSolrCluster

This commit is contained in:
Claudio Atzori 2020-11-18 10:58:05 +01:00
parent cfc01f136e
commit 8177ce7939
15 changed files with 963 additions and 1327 deletions

View File

@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
@ -21,7 +19,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;

View File

@ -14,6 +14,16 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@ -22,6 +32,12 @@
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>dom4j</groupId>
@ -82,9 +98,6 @@
<groupId>org.codehaus.woodstox</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>*</artifactId>
@ -109,11 +122,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -3,6 +3,10 @@ package eu.dnetlib.dhp.oa.provision;
public class ProvisionConstants {
public static final String LAYOUT = "index";
public static final String INTERPRETATION = "openaire";
public static final String SEPARATOR = "-";
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
@ -11,4 +15,8 @@ public class ProvisionConstants {
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_INSTANCES = 10;
public static String getCollectionName(String format) {
return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
}
}

View File

@ -14,11 +14,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.dhp.oa.provision.utils.ZkServers;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class SolrAdminApplication extends SolrApplication implements Closeable {
public class SolrAdminApplication implements Closeable {
private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class);
@ -54,12 +55,12 @@ public class SolrAdminApplication extends SolrApplication implements Closeable {
.orElse(false);
log.info("commit: {}", commit);
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
final String zkHost = getZkHost(isLookup);
final String zkHost = isLookup.getZkHost();
log.info("zkHost: {}", zkHost);
final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
final String collection = ProvisionConstants.getCollectionName(format);
log.info("collection: {}", collection);
try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) {

View File

@ -1,40 +0,0 @@
package eu.dnetlib.dhp.oa.provision;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public abstract class SolrApplication {
private static final Logger log = LoggerFactory.getLogger(SolrApplication.class);
protected static final String LAYOUT = "index";
protected static final String INTERPRETATION = "openaire";
protected static final String SEPARATOR = "-";
protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
/**
* Method retrieves from the information system the zookeeper quorum of the Solr server
*
* @param isLookup
* @return the zookeeper quorum of the Solr server
* @throws ISLookUpException
*/
protected static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(
isLookup,
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
}
protected static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
log.info(String.format("running xquery: %s", xquery));
final String res = isLookup.getResourceProfileByQuery(xquery);
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
return res;
}
}

View File

@ -16,31 +16,40 @@ import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class XmlIndexingJob extends SolrApplication {
public class XmlIndexingJob {
private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
private String inputPath;
private String format;
private int batchSize;
private SparkSession spark;
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -60,9 +69,6 @@ public class XmlIndexingJob extends SolrApplication {
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String format = parser.get("format");
log.info("format: {}", format);
@ -71,16 +77,36 @@ public class XmlIndexingJob extends SolrApplication {
: DEFAULT_BATCH_SIZE;
log.info("batchSize: {}", batchSize);
final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String fields = getLayoutSource(isLookup, format);
final SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
new XmlIndexingJob(spark, inputPath, format, batchSize).run(isLookup);
});
}
public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize) {
this.spark = spark;
this.inputPath = inputPath;
this.format = format;
this.batchSize = batchSize;
}
public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException {
final String fields = isLookup.getLayoutSource(format);
log.info("fields: {}", fields);
final String xslt = getLayoutTransformer(isLookup);
final String xslt = isLookup.getLayoutTransformer();
final String dsId = getDsId(format, isLookup);
final String dsId = isLookup.getDsId(format);
log.info("dsId: {}", dsId);
final String zkHost = getZkHost(isLookup);
final String zkHost = isLookup.getZkHost();
log.info("zkHost: {}", zkHost);
final String version = getRecordDatestamp();
@ -88,24 +114,17 @@ public class XmlIndexingJob extends SolrApplication {
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
log.info("indexRecordTransformer {}", indexRecordXslt);
final SparkConf conf = new SparkConf();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
RDD<SolrInputDocument> docs = sc
.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
RDD<SolrInputDocument> docs = sc
.sequenceFile(inputPath, Text.class, Text.class)
.map(t -> t._2().toString())
.map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
.map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
.rdd();
final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
SolrSupport.indexDocs(zkHost, collection, batchSize, docs);
});
final String collection = ProvisionConstants.getCollectionName(format);
SolrSupport.indexDocs(zkHost, collection, batchSize, docs);
}
protected static String toIndexRecord(Transformer tr, final String record) {
@ -151,56 +170,4 @@ public class XmlIndexingJob extends SolrApplication {
return new SimpleDateFormat(DATE_FORMAT).format(new Date());
}
/**
* Method retrieves from the information system the list of fields associated to the given MDFormat name
*
* @param isLookup the ISLookup service stub
* @param format the Metadata format name
* @return the string representation of the list of fields to be indexed
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutSource(final ISLookUpService isLookup, final String format)
throws ISLookUpDocumentNotFoundException, ISLookUpException {
return doLookup(
isLookup,
String
.format(
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']",
format, LAYOUT));
}
/**
* Method retrieves from the information system the openaireLayoutToRecordStylesheet
*
* @param isLookup the ISLookup service stub
* @return the string representation of the XSLT contained in the transformation rule profile
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException {
return doLookup(
isLookup,
"collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')"
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
}
/**
* Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
*
* @param format
* @param isLookup
* @return the IndexDS identifier
* @throws ISLookUpException
*/
private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException {
return doLookup(
isLookup,
String
.format(
"collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')"
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()",
format));
}
}

View File

@ -0,0 +1,95 @@
package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.oa.provision.ProvisionConstants;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public class ISLookupClient {
private static final Logger log = LoggerFactory.getLogger(ISLookupClient.class);
private ISLookUpService isLookup;
public ISLookupClient(ISLookUpService isLookup) {
this.isLookup = isLookup;
}
/**
* Method retrieves from the information system the list of fields associated to the given MDFormat name
*
* @param format the Metadata format name
* @return the string representation of the list of fields to be indexed
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
public String getLayoutSource(final String format)
throws ISLookUpDocumentNotFoundException, ISLookUpException {
return doLookup(
String
.format(
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']",
format, ProvisionConstants.LAYOUT));
}
/**
* Method retrieves from the information system the openaireLayoutToRecordStylesheet
*
* @return the string representation of the XSLT contained in the transformation rule profile
* @throws ISLookUpDocumentNotFoundException
* @throws ISLookUpException
*/
public String getLayoutTransformer() throws ISLookUpException {
return doLookup(
"collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')"
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
}
/**
* Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
*
* @param format
* @return the IndexDS identifier
* @throws ISLookUpException
*/
public String getDsId(String format) throws ISLookUpException {
return doLookup(
String
.format(
"collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')"
+ "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()",
format));
}
/**
* Method retrieves from the information system the zookeeper quorum of the Solr server
*
* @return the zookeeper quorum of the Solr server
* @throws ISLookUpException
*/
public String getZkHost() throws ISLookUpException {
return doLookup(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
}
private String doLookup(String xquery) throws ISLookUpException {
log.info(String.format("running xquery: %s", xquery));
final String res = getIsLookup().getResourceProfileByQuery(xquery);
log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
return res;
}
public ISLookUpService getIsLookup() {
return isLookup;
}
public void setIsLookup(ISLookUpService isLookup) {
this.isLookup = isLookup;
}
}

View File

@ -46,11 +46,6 @@ public class StreamingInputDocumentFactory {
private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier";
private static final String outFormat = "yyyy-MM-dd'T'hh:mm:ss'Z'";
private static final List<String> dateFormats = Arrays
.asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy");
private static final String DEFAULTDNETRESULT = "dnetResult";
private static final String TARGETFIELDS = "targetFields";
@ -125,13 +120,12 @@ public class StreamingInputDocumentFactory {
}
if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
indexDocument.clear();
System.err.println("missing indexrecord id:\n" + inputDocument);
throw new IllegalStateException("cannot extract record ID from: " + inputDocument);
}
return indexDocument;
} catch (XMLStreamException e) {
return new SolrInputDocument();
throw new IllegalStateException(e);
}
}

View File

@ -1,107 +1,18 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.File;
import java.nio.file.Path;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.Assert;
public class SolrAdminApplicationTest {
private static final Logger log = LoggerFactory.getLogger(SolrAdminApplicationTest.class);
public static final String DEFAULT_COLLECTION = "testCollection";
public static final String CONFIG_NAME = "testConfig";
private static MiniSolrCloudCluster miniCluster;
private static CloudSolrClient cloudSolrClient;
@TempDir
public static Path tempDir;
@BeforeAll
public static void setup() throws Exception {
// random unassigned HTTP port
final int jettyPort = 0;
final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build();
// create a MiniSolrCloudCluster instance
miniCluster = new MiniSolrCloudCluster(2, tempDir, jettyConfig);
// Upload Solr configuration directory to ZooKeeper
String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig";
File configDir = new File(solrZKConfigDir);
miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME);
// override settings in the solrconfig include
System.setProperty("solr.tests.maxBufferedDocs", "100000");
System.setProperty("solr.tests.maxIndexingThreads", "-1");
System.setProperty("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory");
cloudSolrClient = miniCluster.getSolrClient();
cloudSolrClient.setRequestWriter(new RequestWriter());
cloudSolrClient.setParser(new XMLResponseParser());
cloudSolrClient.setDefaultCollection(DEFAULT_COLLECTION);
cloudSolrClient.connect();
log.info(new ConfigSetAdminRequest.List().process(cloudSolrClient).toString());
log.info(CollectionAdminRequest.ClusterStatus.getClusterStatus().process(cloudSolrClient).toString());
createCollection(cloudSolrClient, DEFAULT_COLLECTION, 2, 1, CONFIG_NAME);
}
@AfterAll
public static void shutDown() throws Exception {
miniCluster.shutdown();
}
protected static NamedList<Object> createCollection(CloudSolrClient client, String name, int numShards,
int replicationFactor, String configName) throws Exception {
ModifiableSolrParams modParams = new ModifiableSolrParams();
modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name());
modParams.set("name", name);
modParams.set("numShards", numShards);
modParams.set("replicationFactor", replicationFactor);
modParams.set("collection.configName", configName);
QueryRequest request = new QueryRequest(modParams);
request.setPath("/admin/collections");
return client.request(request);
}
public class SolrAdminApplicationTest extends SolrTest {
@Test
public void testPing() throws Exception {
SolrPingResponse pingResponse = cloudSolrClient.ping();
SolrPingResponse pingResponse = miniCluster.getSolrClient().ping();
log.info("pingResponse: '{}'", pingResponse.getStatus());
Assert.assertTrue(pingResponse.getStatus() == 0);
}

View File

@ -0,0 +1,108 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.File;
import java.nio.file.Path;
import org.apache.commons.io.FileUtils;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class SolrTest {
protected static final Logger log = LoggerFactory.getLogger(SolrTest.class);
protected static final String FORMAT = "test";
protected static final String DEFAULT_COLLECTION = FORMAT + "-index-openaire";
protected static final String CONFIG_NAME = "testConfig";
protected static MiniSolrCloudCluster miniCluster;
@TempDir
public static Path workingDir;
@BeforeAll
public static void setup() throws Exception {
// random unassigned HTTP port
final int jettyPort = 0;
final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build();
log.info(String.format("working directory: %s", workingDir.toString()));
System.setProperty("solr.log.dir", workingDir.resolve("logs").toString());
// create a MiniSolrCloudCluster instance
miniCluster = new MiniSolrCloudCluster(2, workingDir.resolve("solr"), jettyConfig);
// Upload Solr configuration directory to ZooKeeper
String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig";
File configDir = new File(solrZKConfigDir);
miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME);
// override settings in the solrconfig include
// System.setProperty("solr.tests.maxBufferedDocs", "100000");
// System.setProperty("solr.tests.maxIndexingThreads", "-1");
// System.setProperty("solr.tests.ramBufferSizeMB", "100");
// use non-test classes so RandomizedRunner isn't necessary
// System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
// System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory");
log.info(new ConfigSetAdminRequest.List().process(miniCluster.getSolrClient()).toString());
log
.info(
CollectionAdminRequest.ClusterStatus
.getClusterStatus()
.process(miniCluster.getSolrClient())
.toString());
NamedList<Object> res = createCollection(
miniCluster.getSolrClient(), DEFAULT_COLLECTION, 4, 2, 20, CONFIG_NAME);
res.forEach(o -> log.info(o.toString()));
miniCluster.getSolrClient().setDefaultCollection(DEFAULT_COLLECTION);
log
.info(
CollectionAdminRequest.ClusterStatus
.getClusterStatus()
.process(miniCluster.getSolrClient())
.toString());
}
@AfterAll
public static void shutDown() throws Exception {
miniCluster.shutdown();
FileUtils.deleteDirectory(workingDir.toFile());
}
protected static NamedList<Object> createCollection(CloudSolrClient client, String name, int numShards,
int replicationFactor, int maxShardsPerNode, String configName) throws Exception {
ModifiableSolrParams modParams = new ModifiableSolrParams();
modParams.set(CoreAdminParams.ACTION, CollectionParams.CollectionAction.CREATE.name());
modParams.set("name", name);
modParams.set("numShards", numShards);
modParams.set("replicationFactor", replicationFactor);
modParams.set("collection.configName", configName);
modParams.set("maxShardsPerNode", maxShardsPerNode);
QueryRequest request = new QueryRequest(modParams);
request.setPath("/admin/collections");
return client.request(request);
}
}

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.oa.provision;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.params.CommonParams;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ExtendWith(MockitoExtension.class)
public class XmlIndexingJobTest extends SolrTest {
protected static SparkSession spark;
private static final Integer batchSize = 100;
@Mock
private ISLookUpService isLookUpService;
@Mock
private ISLookupClient isLookupClient;
@BeforeEach
public void prepareMocks() throws ISLookUpException, IOException {
isLookupClient.setIsLookup(isLookUpService);
int solrPort = URI.create("http://" + miniCluster.getZkClient().getZkServerAddress()).getPort();
Mockito
.when(isLookupClient.getDsId(Mockito.anyString()))
.thenReturn("313f0381-23b6-466f-a0b8-c72a9679ac4b_SW5kZXhEU1Jlc291cmNlcy9JbmRleERTUmVzb3VyY2VUeXBl");
Mockito.when(isLookupClient.getZkHost()).thenReturn(String.format("127.0.0.1:%s/solr", solrPort));
Mockito
.when(isLookupClient.getLayoutSource(Mockito.anyString()))
.thenReturn(IOUtils.toString(getClass().getResourceAsStream("fields.xml")));
Mockito
.when(isLookupClient.getLayoutTransformer())
.thenReturn(IOUtils.toString(getClass().getResourceAsStream("layoutToRecordTransformer.xsl")));
}
@BeforeAll
public static void before() {
SparkConf conf = new SparkConf();
conf.setAppName(XmlIndexingJobTest.class.getSimpleName());
conf.setMaster("local[1]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.resolve("spark").toString());
spark = SparkSession
.builder()
.appName(XmlIndexingJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void tearDown() {
spark.stop();
}
@Test
public void testXmlIndexingJob() throws Exception {
String inputPath = "src/test/resources/eu/dnetlib/dhp/oa/provision/xml";
long nRecord = JavaSparkContext
.fromSparkContext(spark.sparkContext())
.sequenceFile(inputPath, Text.class, Text.class)
.count();
new XmlIndexingJob(spark, inputPath, FORMAT, batchSize).run(isLookupClient);
Assertions.assertEquals(0, miniCluster.getSolrClient().commit().getStatus());
QueryResponse rsp = miniCluster.getSolrClient().query(new SolrQuery().add(CommonParams.Q, "*:*"));
Assertions.assertEquals(nRecord, rsp.getResults().getNumFound());
}
}

View File

@ -0,0 +1,31 @@
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- If this file is found in the config directory, it will only be
loaded once at startup. If it is found in Solr's data
directory, it will be re-loaded every commit.
See http://wiki.apache.org/solr/QueryElevationComponent for more info
-->
<elevate>
<!-- Query elevation examples
<query text="foo bar">
<doc id="1" />
<doc id="2" />
<doc id="3" />
</query>
for use with techproducts example
<query text="ipod">
<doc id="MA147LL/A" /> put the actual ipod at the top
<doc id="IW-02" exclude="true" /> exclude this cable
</query>
-->
</elevate>

View File

@ -83,6 +83,7 @@
<lib dir="${solr.install.dir:../../../..}/contrib/velocity/lib" regex=".*\.jar" />
<lib dir="${solr.install.dir:../../../..}/dist/" regex="solr-velocity-\d.*\.jar" />
<!-- an exact 'path' can be used instead of a 'dir' to specify a
specific jar file. This will cause a serious error to be logged
if it can't be loaded.
@ -112,7 +113,8 @@
One can force a particular implementation via solr.MMapDirectoryFactory,
solr.NIOFSDirectoryFactory, or solr.SimpleFSDirectoryFactory.
solr.RAMDirectoryFactory is memory based and not persistent.
solr.RAMDirectoryFactory is memory based, not
persistent, and doesn't work with replication.
-->
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
@ -204,7 +206,7 @@
More details on the nuances of each LockFactory...
http://wiki.apache.org/lucene-java/AvailableLockFactories
-->
<lockType>${solr.lock.type:single}</lockType>
<lockType>${solr.lock.type:native}</lockType>
<!-- Commit Deletion Policy
Custom deletion policies can be specified here. The class must
@ -331,6 +333,29 @@
postCommit - fired after every commit or optimize command
postOptimize - fired after every optimize command
-->
<!-- The RunExecutableListener executes an external command from a
hook such as postCommit or postOptimize.
exe - the name of the executable to run
dir - dir to use as the current working directory. (default=".")
wait - the calling thread waits until the executable returns.
(default="true")
args - the arguments to pass to the program. (default is none)
env - environment variables to set. (default is none)
-->
<!-- This example shows how RunExecutableListener could be used
with the script based replication...
http://wiki.apache.org/solr/CollectionDistribution
-->
<!--
<listener event="postCommit" class="solr.RunExecutableListener">
<str name="exe">solr/bin/snapshooter</str>
<str name="dir">.</str>
<bool name="wait">true</bool>
<arr name="args"> <str>arg1</str> <str>arg2</str> </arr>
<arr name="env"> <str>MYVAR=val1</str> </arr>
</listener>
-->
</updateHandler>
@ -366,14 +391,22 @@
Query section - these settings control query time things like caches
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<query>
<!-- Max Boolean Clauses
Maximum number of clauses in each BooleanQuery, an exception
is thrown if exceeded.
** WARNING **
This option actually modifies a global Lucene property that
will affect all SolrCores. If multiple solrconfig.xml files
disagree on this property, the value at any given moment will
be based on the last SolrCore to be initialized.
<!-- Maximum number of clauses in each BooleanQuery, an exception
is thrown if exceeded. It is safe to increase or remove this setting,
since it is purely an arbitrary limit to try and catch user errors where
large boolean queries may not be the best implementation choice.
-->
<maxBooleanClauses>1024</maxBooleanClauses>
<!-- Solr Internal Query Caches
There are two implementations of cache available for Solr,
@ -575,8 +608,21 @@
This section contains instructions for how the SolrDispatchFilter
should behave when processing requests for this SolrCore.
handleSelect is a legacy option that affects the behavior of requests
such as /select?qt=XXX
handleSelect="true" will cause the SolrDispatchFilter to process
the request and dispatch the query to a handler specified by the
"qt" param, assuming "/select" isn't already registered.
handleSelect="false" will cause the SolrDispatchFilter to
ignore "/select" requests, resulting in a 404 unless a handler
is explicitly registered with the name "/select"
handleSelect="true" is not recommended for new users, but is the default
for backwards compatibility
-->
<requestDispatcher>
<requestDispatcher handleSelect="false" >
<!-- Request Parsing
These settings indicate how Solr Requests may be parsed, and
@ -602,14 +648,15 @@
plugins.
*** WARNING ***
Before enabling remote streaming, you should make sure your
system has authentication enabled.
The settings below authorize Solr to fetch remote files, You
should make sure your system has some authentication before
using enableRemoteStreaming="true"
<requestParsers enableRemoteStreaming="false"
multipartUploadLimitInKB="-1"
formdataUploadLimitInKB="-1"
addHttpRequestToContext="false"/>
-->
<requestParsers enableRemoteStreaming="true"
multipartUploadLimitInKB="2048000"
formdataUploadLimitInKB="2048"
addHttpRequestToContext="false"/>
<!-- HTTP Caching
@ -673,6 +720,14 @@
Incoming queries will be dispatched to a specific handler by name
based on the path specified in the request.
Legacy behavior: If the request path uses "/select" but no Request
Handler has that name, and if handleSelect="true" has been specified in
the requestDispatcher, then the Request Handler is dispatched based on
the qt parameter. Handlers without a leading '/' are accessed this way
like so: http://host/app/[core/]select?qt=name If no qt is
given, then the requestHandler that declares default="true" will be
used or the one named "standard".
If a Request Handler is declared with startup="lazy", then it will
not be initialized until the first request that uses it.
@ -692,13 +747,9 @@
-->
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="q.op">AND</str>
<int name="rows">10</int>
<!-- Default search field
<str name="df">text</str>
-->
<!-- Change from JSON to XML format (the default prior to Solr 7.0)
<str name="wt">xml</str>
-->
<!-- <str name="df">text</str> -->
</lst>
<!-- In addition to defaults, "appends" params can be specified
to identify values which should be appended to the list of
@ -781,10 +832,18 @@
<initParams path="/update/**,/query,/select,/tvrh,/elevate,/spell,/browse">
<lst name="defaults">
<str name="df">_text_</str>
<str name="df">__all</str>
</lst>
</initParams>
<!-- This enabled schemaless mode
<initParams path="/update/**">
<lst name="defaults">
<str name="update.chain">add-unknown-fields-to-the-schema</str>
</lst>
</initParams>
-->
<!-- Solr Cell Update Request Handler
http://wiki.apache.org/solr/ExtractingRequestHandler
@ -796,10 +855,9 @@
<lst name="defaults">
<str name="lowernames">true</str>
<str name="fmap.meta">ignored_</str>
<str name="fmap.content">_text_</str>
<str name="fmap.content">__all</str>
</lst>
</requestHandler>
<!-- Search Components
Search components are registered to SolrCore and used by
@ -861,7 +919,7 @@
<!-- a spellchecker built from a field of the main index -->
<lst name="spellchecker">
<str name="name">default</str>
<str name="field">_text_</str>
<str name="field">__all</str>
<str name="classname">solr.DirectSolrSpellChecker</str>
<!-- the spellcheck distance measure used, the default is the internal levenshtein -->
<str name="distanceMeasure">internal</str>
@ -986,6 +1044,7 @@
<searchComponent name="elevator" class="solr.QueryElevationComponent" >
<!-- pick a fieldType to analyze queries -->
<str name="queryFieldType">string</str>
<str name="config-file">elevate.xml</str>
</searchComponent>
<!-- A request handler for demonstrating the elevator component -->
@ -1116,81 +1175,70 @@
<!-- Add unknown fields to the schema
Field type guessing update processors that will
An example field type guessing update processor that will
attempt to parse string-typed field values as Booleans, Longs,
Doubles, or Dates, and then add schema fields with the guessed
field types. Text content will be indexed as "text_general" as
well as a copy to a plain string version in *_str.
field types.
These require that the schema is both managed and mutable, by
This requires that the schema is both managed and mutable, by
declaring schemaFactory as ManagedIndexSchemaFactory, with
mutable specified as true.
See http://wiki.apache.org/solr/GuessingFieldTypes
-->
<updateProcessor class="solr.UUIDUpdateProcessorFactory" name="uuid"/>
<updateProcessor class="solr.RemoveBlankFieldUpdateProcessorFactory" name="remove-blank"/>
<updateProcessor class="solr.FieldNameMutatingUpdateProcessorFactory" name="field-name-mutating">
<str name="pattern">[^\w-\.]</str>
<str name="replacement">_</str>
</updateProcessor>
<updateProcessor class="solr.ParseBooleanFieldUpdateProcessorFactory" name="parse-boolean"/>
<updateProcessor class="solr.ParseLongFieldUpdateProcessorFactory" name="parse-long"/>
<updateProcessor class="solr.ParseDoubleFieldUpdateProcessorFactory" name="parse-double"/>
<updateProcessor class="solr.ParseDateFieldUpdateProcessorFactory" name="parse-date">
<arr name="format">
<str>yyyy-MM-dd'T'HH:mm:ss.SSSZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss,SSSZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss.SSS</str>
<str>yyyy-MM-dd'T'HH:mm:ss,SSS</str>
<str>yyyy-MM-dd'T'HH:mm:ssZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss</str>
<str>yyyy-MM-dd'T'HH:mmZ</str>
<str>yyyy-MM-dd'T'HH:mm</str>
<str>yyyy-MM-dd HH:mm:ss.SSSZ</str>
<str>yyyy-MM-dd HH:mm:ss,SSSZ</str>
<str>yyyy-MM-dd HH:mm:ss.SSS</str>
<str>yyyy-MM-dd HH:mm:ss,SSS</str>
<str>yyyy-MM-dd HH:mm:ssZ</str>
<str>yyyy-MM-dd HH:mm:ss</str>
<str>yyyy-MM-dd HH:mmZ</str>
<str>yyyy-MM-dd HH:mm</str>
<str>yyyy-MM-dd</str>
</arr>
</updateProcessor>
<updateProcessor class="solr.AddSchemaFieldsUpdateProcessorFactory" name="add-schema-fields">
<lst name="typeMapping">
<str name="valueClass">java.lang.String</str>
<str name="fieldType">text_general</str>
<lst name="copyField">
<str name="dest">*_str</str>
<int name="maxChars">256</int>
<updateRequestProcessorChain name="add-unknown-fields-to-the-schema">
<!-- UUIDUpdateProcessorFactory will generate an id if none is present in the incoming document -->
<processor class="solr.UUIDUpdateProcessorFactory" />
<processor class="solr.RemoveBlankFieldUpdateProcessorFactory"/>
<processor class="solr.FieldNameMutatingUpdateProcessorFactory">
<str name="pattern">[^\w-\.]</str>
<str name="replacement">_</str>
</processor>
<processor class="solr.ParseBooleanFieldUpdateProcessorFactory"/>
<processor class="solr.ParseLongFieldUpdateProcessorFactory"/>
<processor class="solr.ParseDoubleFieldUpdateProcessorFactory"/>
<processor class="solr.ParseDateFieldUpdateProcessorFactory">
<arr name="format">
<str>yyyy-MM-dd'T'HH:mm:ss.SSSZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss,SSSZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss.SSS</str>
<str>yyyy-MM-dd'T'HH:mm:ss,SSS</str>
<str>yyyy-MM-dd'T'HH:mm:ssZ</str>
<str>yyyy-MM-dd'T'HH:mm:ss</str>
<str>yyyy-MM-dd'T'HH:mmZ</str>
<str>yyyy-MM-dd'T'HH:mm</str>
<str>yyyy-MM-dd HH:mm:ss.SSSZ</str>
<str>yyyy-MM-dd HH:mm:ss,SSSZ</str>
<str>yyyy-MM-dd HH:mm:ss.SSS</str>
<str>yyyy-MM-dd HH:mm:ss,SSS</str>
<str>yyyy-MM-dd HH:mm:ssZ</str>
<str>yyyy-MM-dd HH:mm:ss</str>
<str>yyyy-MM-dd HH:mmZ</str>
<str>yyyy-MM-dd HH:mm</str>
<str>yyyy-MM-dd</str>
</arr>
</processor>
<processor class="solr.AddSchemaFieldsUpdateProcessorFactory">
<str name="defaultFieldType">strings</str>
<lst name="typeMapping">
<str name="valueClass">java.lang.Boolean</str>
<str name="fieldType">booleans</str>
</lst>
<!-- Use as default mapping instead of defaultFieldType -->
<bool name="default">true</bool>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.lang.Boolean</str>
<str name="fieldType">booleans</str>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.util.Date</str>
<str name="fieldType">pdates</str>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.lang.Long</str>
<str name="valueClass">java.lang.Integer</str>
<str name="fieldType">plongs</str>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.lang.Number</str>
<str name="fieldType">pdoubles</str>
</lst>
</updateProcessor>
<lst name="typeMapping">
<str name="valueClass">java.util.Date</str>
<str name="fieldType">tdates</str>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.lang.Long</str>
<str name="valueClass">java.lang.Integer</str>
<str name="fieldType">tlongs</str>
</lst>
<lst name="typeMapping">
<str name="valueClass">java.lang.Number</str>
<str name="fieldType">tdoubles</str>
</lst>
</processor>
<!-- The update.autoCreateFields property can be turned to false to disable schemaless mode -->
<updateRequestProcessorChain name="add-unknown-fields-to-the-schema" default="${update.autoCreateFields:true}"
processor="uuid,remove-blank,field-name-mutating,parse-boolean,parse-long,parse-double,parse-date,add-schema-fields">
<processor class="solr.LogUpdateProcessorFactory"/>
<processor class="solr.DistributedUpdateProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory"/>
@ -1313,7 +1361,7 @@
<!-- Query Parsers
https://lucene.apache.org/solr/guide/query-syntax-and-parsing.html
https://cwiki.apache.org/confluence/display/solr/Query+Syntax+and+Parsing
Multiple QParserPlugins can be registered by name, and then
used in either the "defType" param for the QueryComponent (used