diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
index 8a53c3a50..cb34b0cb3 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java
@@ -7,8 +7,6 @@ import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.List;
import java.util.Set;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
@@ -21,7 +19,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
-import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.Publication;
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
diff --git a/dhp-workflows/dhp-graph-provision/pom.xml b/dhp-workflows/dhp-graph-provision/pom.xml
index 8194d4d01..1547056b9 100644
--- a/dhp-workflows/dhp-graph-provision/pom.xml
+++ b/dhp-workflows/dhp-graph-provision/pom.xml
@@ -22,6 +22,12 @@
com.jayway.jsonpath
json-path
+
+
+ org.slf4j
+ slf4j-api
+
+
dom4j
@@ -82,9 +88,6 @@
org.codehaus.woodstox
*
-
-
-
com.github.ben-manes.caffeine
*
@@ -109,11 +112,10 @@
org.apache.hadoop
*
-
-
-
-
-
+
+ org.apache.zookeeper
+ zookeeper
+
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
index 9bc3706cd..d13b54e01 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
@@ -3,6 +3,10 @@ package eu.dnetlib.dhp.oa.provision;
public class ProvisionConstants {
+ public static final String LAYOUT = "index";
+ public static final String INTERPRETATION = "openaire";
+ public static final String SEPARATOR = "-";
+
public static final int MAX_EXTERNAL_ENTITIES = 50;
public static final int MAX_AUTHORS = 200;
public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
@@ -11,4 +15,8 @@ public class ProvisionConstants {
public static final int MAX_ABSTRACT_LENGTH = 100000;
public static final int MAX_INSTANCES = 10;
+ public static String getCollectionName(String format) {
+ return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
+ }
+
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java
index 8c8947298..5fe452efe 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplication.java
@@ -14,11 +14,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.dhp.oa.provision.utils.ZkServers;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-public class SolrAdminApplication extends SolrApplication implements Closeable {
+public class SolrAdminApplication implements Closeable {
private static final Logger log = LoggerFactory.getLogger(SolrAdminApplication.class);
@@ -54,12 +55,12 @@ public class SolrAdminApplication extends SolrApplication implements Closeable {
.orElse(false);
log.info("commit: {}", commit);
- final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
+ final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
- final String zkHost = getZkHost(isLookup);
+ final String zkHost = isLookup.getZkHost();
log.info("zkHost: {}", zkHost);
- final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
+ final String collection = ProvisionConstants.getCollectionName(format);
log.info("collection: {}", collection);
try (SolrAdminApplication app = new SolrAdminApplication(zkHost)) {
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java
deleted file mode 100644
index a824c6c2c..000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SolrApplication.java
+++ /dev/null
@@ -1,40 +0,0 @@
-
-package eu.dnetlib.dhp.oa.provision;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-
-public abstract class SolrApplication {
-
- private static final Logger log = LoggerFactory.getLogger(SolrApplication.class);
-
- protected static final String LAYOUT = "index";
- protected static final String INTERPRETATION = "openaire";
- protected static final String SEPARATOR = "-";
- protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
-
- /**
- * Method retrieves from the information system the zookeeper quorum of the Solr server
- *
- * @param isLookup
- * @return the zookeeper quorum of the Solr server
- * @throws ISLookUpException
- */
- protected static String getZkHost(ISLookUpService isLookup) throws ISLookUpException {
- return doLookup(
- isLookup,
- "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
- }
-
- protected static String doLookup(ISLookUpService isLookup, String xquery) throws ISLookUpException {
- log.info(String.format("running xquery: %s", xquery));
- final String res = isLookup.getResourceProfileByQuery(xquery);
- log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
- return res;
- }
-
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
index 5b5596162..f7a1ee49d 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java
@@ -1,8 +1,31 @@
package eu.dnetlib.dhp.oa.provision;
-import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+import com.lucidworks.spark.util.SolrSupport;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
+import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
+import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
+import eu.dnetlib.dhp.utils.ISLookupClientFactory;
+import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
@@ -10,37 +33,26 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.rdd.RDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.lucidworks.spark.util.SolrSupport;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
-import eu.dnetlib.dhp.utils.ISLookupClientFactory;
-import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-
-public class XmlIndexingJob extends SolrApplication {
+public class XmlIndexingJob {
private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
private static final Integer DEFAULT_BATCH_SIZE = 1000;
+ protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
+
+ private String inputPath;
+
+ private String format;
+
+ private int batchSize;
+
+ private String outputPath;
+
+ private SparkSession spark;
+
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@@ -60,27 +72,50 @@ public class XmlIndexingJob extends SolrApplication {
final String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
- final String isLookupUrl = parser.get("isLookupUrl");
- log.info("isLookupUrl: {}", isLookupUrl);
-
final String format = parser.get("format");
log.info("format: {}", format);
+ final String outputPath = Optional.ofNullable(parser.get("outputPath"))
+ .orElse(null);
+ log.info("outputPath: {}", outputPath);
+
final Integer batchSize = parser.getObjectMap().containsKey("batchSize")
? Integer.valueOf(parser.get("batchSize"))
: DEFAULT_BATCH_SIZE;
log.info("batchSize: {}", batchSize);
- final ISLookUpService isLookup = ISLookupClientFactory.getLookUpService(isLookupUrl);
- final String fields = getLayoutSource(isLookup, format);
+ final SparkConf conf = new SparkConf();
+ conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class });
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ final String isLookupUrl = parser.get("isLookupUrl");
+ log.info("isLookupUrl: {}", isLookupUrl);
+ final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl));
+ new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup);
+ });
+ }
+
+ public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) {
+ this.spark = spark;
+ this.inputPath = inputPath;
+ this.format = format;
+ this.batchSize = batchSize;
+ this.outputPath = outputPath;
+ }
+
+ public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException {
+ final String fields = isLookup.getLayoutSource(format);
log.info("fields: {}", fields);
- final String xslt = getLayoutTransformer(isLookup);
+ final String xslt = isLookup.getLayoutTransformer();
- final String dsId = getDsId(format, isLookup);
+ final String dsId = isLookup.getDsId(format);
log.info("dsId: {}", dsId);
- final String zkHost = getZkHost(isLookup);
+ final String zkHost = isLookup.getZkHost();
log.info("zkHost: {}", zkHost);
final String version = getRecordDatestamp();
@@ -88,24 +123,25 @@ public class XmlIndexingJob extends SolrApplication {
final String indexRecordXslt = getLayoutTransformer(format, fields, xslt);
log.info("indexRecordTransformer {}", indexRecordXslt);
- final SparkConf conf = new SparkConf();
+ final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- runWithSparkSession(
- conf,
- isSparkSessionManaged,
- spark -> {
- final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD docs = sc
+ .sequenceFile(inputPath, Text.class, Text.class)
+ .map(t -> t._2().toString())
+ .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
+ .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s));
- RDD docs = sc
- .sequenceFile(inputPath, Text.class, Text.class)
- .map(t -> t._2().toString())
- .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s))
- .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s))
- .rdd();
-
- final String collection = format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
- SolrSupport.indexDocs(zkHost, collection, batchSize, docs);
- });
+ if (StringUtils.isNotBlank(outputPath)) {
+ spark.createDataset(
+ docs.map(s -> new SerializableSolrInputDocument(s)).rdd(),
+ Encoders.kryo(SerializableSolrInputDocument.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+ } else {
+ final String collection = ProvisionConstants.getCollectionName(format);
+ SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd());
+ }
}
protected static String toIndexRecord(Transformer tr, final String record) {
@@ -151,56 +187,4 @@ public class XmlIndexingJob extends SolrApplication {
return new SimpleDateFormat(DATE_FORMAT).format(new Date());
}
- /**
- * Method retrieves from the information system the list of fields associated to the given MDFormat name
- *
- * @param isLookup the ISLookup service stub
- * @param format the Metadata format name
- * @return the string representation of the list of fields to be indexed
- * @throws ISLookUpDocumentNotFoundException
- * @throws ISLookUpException
- */
- private static String getLayoutSource(final ISLookUpService isLookup, final String format)
- throws ISLookUpDocumentNotFoundException, ISLookUpException {
- return doLookup(
- isLookup,
- String
- .format(
- "collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']",
- format, LAYOUT));
- }
-
- /**
- * Method retrieves from the information system the openaireLayoutToRecordStylesheet
- *
- * @param isLookup the ISLookup service stub
- * @return the string representation of the XSLT contained in the transformation rule profile
- * @throws ISLookUpDocumentNotFoundException
- * @throws ISLookUpException
- */
- private static String getLayoutTransformer(ISLookUpService isLookup) throws ISLookUpException {
- return doLookup(
- isLookup,
- "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')"
- + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
- }
-
- /**
- * Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
- *
- * @param format
- * @param isLookup
- * @return the IndexDS identifier
- * @throws ISLookUpException
- */
- private static String getDsId(String format, ISLookUpService isLookup) throws ISLookUpException {
- return doLookup(
- isLookup,
- String
- .format(
- "collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')"
- + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()",
- format));
- }
-
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java
new file mode 100644
index 000000000..05b39ab6f
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SerializableSolrInputDocument.java
@@ -0,0 +1,22 @@
+package eu.dnetlib.dhp.oa.provision.model;
+
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Wrapper class needed to make the SolrInputDocument compatible with the Kryo serialization mechanism.
+ */
+public class SerializableSolrInputDocument extends SolrInputDocument {
+
+ public SerializableSolrInputDocument() {
+ super(new HashMap<>());
+ }
+
+ public SerializableSolrInputDocument(Map fields) {
+ super(fields);
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java
new file mode 100644
index 000000000..29a51cb29
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/ISLookupClient.java
@@ -0,0 +1,95 @@
+
+package eu.dnetlib.dhp.oa.provision.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import eu.dnetlib.dhp.oa.provision.ProvisionConstants;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+
+public class ISLookupClient {
+
+ private static final Logger log = LoggerFactory.getLogger(ISLookupClient.class);
+
+ private ISLookUpService isLookup;
+
+ public ISLookupClient(ISLookUpService isLookup) {
+ this.isLookup = isLookup;
+ }
+
+ /**
+ * Method retrieves from the information system the list of fields associated to the given MDFormat name
+ *
+ * @param format the Metadata format name
+ * @return the string representation of the list of fields to be indexed
+ * @throws ISLookUpDocumentNotFoundException
+ * @throws ISLookUpException
+ */
+ public String getLayoutSource(final String format)
+ throws ISLookUpDocumentNotFoundException, ISLookUpException {
+ return doLookup(
+ String
+ .format(
+ "collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='%s']//LAYOUT[@name='%s']",
+ format, ProvisionConstants.LAYOUT));
+ }
+
+ /**
+ * Method retrieves from the information system the openaireLayoutToRecordStylesheet
+ *
+ * @return the string representation of the XSLT contained in the transformation rule profile
+ * @throws ISLookUpDocumentNotFoundException
+ * @throws ISLookUpException
+ */
+ public String getLayoutTransformer() throws ISLookUpException {
+ return doLookup(
+ "collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType')"
+ + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/SCRIPT/TITLE/text() = 'openaireLayoutToRecordStylesheet']//CODE/node()");
+ }
+
+ /**
+ * Method retrieves from the information system the IndexDS profile ID associated to the given MDFormat name
+ *
+ * @param format
+ * @return the IndexDS identifier
+ * @throws ISLookUpException
+ */
+ public String getDsId(String format) throws ISLookUpException {
+ return doLookup(
+ String
+ .format(
+ "collection('/db/DRIVER/IndexDSResources/IndexDSResourceType')"
+ + "//RESOURCE_PROFILE[./BODY/CONFIGURATION/METADATA_FORMAT/text() = '%s']//RESOURCE_IDENTIFIER/@value/string()",
+ format));
+ }
+
+ /**
+ * Method retrieves from the information system the zookeeper quorum of the Solr server
+ *
+ * @return the zookeeper quorum of the Solr server
+ * @throws ISLookUpException
+ */
+ public String getZkHost() throws ISLookUpException {
+ return doLookup(
+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
+ }
+
+ private String doLookup(String xquery) throws ISLookUpException {
+ log.info(String.format("running xquery: %s", xquery));
+ final String res = getIsLookup().getResourceProfileByQuery(xquery);
+ log.info(String.format("got response (100 chars): %s", StringUtils.left(res, 100) + " ..."));
+ return res;
+ }
+
+ public ISLookUpService getIsLookup() {
+ return isLookup;
+ }
+
+ public void setIsLookup(ISLookUpService isLookup) {
+ this.isLookup = isLookup;
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java
index 3e8abbd9f..f16ee260f 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/StreamingInputDocumentFactory.java
@@ -46,11 +46,6 @@ public class StreamingInputDocumentFactory {
private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier";
- private static final String outFormat = "yyyy-MM-dd'T'hh:mm:ss'Z'";
-
- private static final List dateFormats = Arrays
- .asList("yyyy-MM-dd'T'hh:mm:ss", "yyyy-MM-dd", "dd-MM-yyyy", "dd/MM/yyyy", "yyyy");
-
private static final String DEFAULTDNETRESULT = "dnetResult";
private static final String TARGETFIELDS = "targetFields";
@@ -125,13 +120,12 @@ public class StreamingInputDocumentFactory {
}
if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
- indexDocument.clear();
- System.err.println("missing indexrecord id:\n" + inputDocument);
+ throw new IllegalStateException("cannot extract record ID from: " + inputDocument);
}
return indexDocument;
} catch (XMLStreamException e) {
- return new SolrInputDocument();
+ throw new IllegalStateException(e);
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
index 3396020e0..3169648fb 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_update_index.json
@@ -22,5 +22,11 @@
"paramLongName": "batchSize",
"paramDescription": "size of the batch of documents sent to solr",
"paramRequired": false
+ },
+ {
+ "paramName": "o",
+ "paramLongName": "outputPath",
+ "paramDescription": "path on hdfs activating an alternative output for the SolrInputDocuments",
+ "paramRequired": false
}
]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index e2b74b9aa..ee636b68e 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -638,6 +638,7 @@
--isLookupUrl${isLookupUrl}
--format${format}
--batchSize${batchSize}
+ --outputPath${outputPath}
diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java
index cbd7b2de2..33def91b3 100644
--- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java
+++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SolrAdminApplicationTest.java
@@ -1,107 +1,18 @@
package eu.dnetlib.dhp.oa.provision;
-import java.io.File;
-import java.nio.file.Path;
-
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.embedded.JettyConfig;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.XMLResponseParser;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.ConfigSetAdminResponse;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import junit.framework.Assert;
-public class SolrAdminApplicationTest {
-
- private static final Logger log = LoggerFactory.getLogger(SolrAdminApplicationTest.class);
- public static final String DEFAULT_COLLECTION = "testCollection";
- public static final String CONFIG_NAME = "testConfig";
-
- private static MiniSolrCloudCluster miniCluster;
- private static CloudSolrClient cloudSolrClient;
-
- @TempDir
- public static Path tempDir;
-
- @BeforeAll
- public static void setup() throws Exception {
-
- // random unassigned HTTP port
- final int jettyPort = 0;
-
- final JettyConfig jettyConfig = JettyConfig.builder().setPort(jettyPort).build();
-
- // create a MiniSolrCloudCluster instance
- miniCluster = new MiniSolrCloudCluster(2, tempDir, jettyConfig);
-
- // Upload Solr configuration directory to ZooKeeper
- String solrZKConfigDir = "src/test/resources/eu/dnetlib/dhp/oa/provision/solr/conf/testConfig";
- File configDir = new File(solrZKConfigDir);
-
- miniCluster.uploadConfigSet(configDir.toPath(), CONFIG_NAME);
-
- // override settings in the solrconfig include
- System.setProperty("solr.tests.maxBufferedDocs", "100000");
- System.setProperty("solr.tests.maxIndexingThreads", "-1");
- System.setProperty("solr.tests.ramBufferSizeMB", "100");
-
- // use non-test classes so RandomizedRunner isn't necessary
- System.setProperty("solr.tests.mergeScheduler", "org.apache.lucene.index.ConcurrentMergeScheduler");
- System.setProperty("solr.directoryFactory", "solr.RAMDirectoryFactory");
-
- cloudSolrClient = miniCluster.getSolrClient();
- cloudSolrClient.setRequestWriter(new RequestWriter());
- cloudSolrClient.setParser(new XMLResponseParser());
- cloudSolrClient.setDefaultCollection(DEFAULT_COLLECTION);
- cloudSolrClient.connect();
-
- log.info(new ConfigSetAdminRequest.List().process(cloudSolrClient).toString());
- log.info(CollectionAdminRequest.ClusterStatus.getClusterStatus().process(cloudSolrClient).toString());
-
- createCollection(cloudSolrClient, DEFAULT_COLLECTION, 2, 1, CONFIG_NAME);
- }
-
- @AfterAll
- public static void shutDown() throws Exception {
- miniCluster.shutdown();
- }
-
- protected static NamedList