urlList = Arrays.stream(urls.split(",")).map(String::trim).filter(StringUtils::isNotBlank).toList();
+
+ final CloudSolrClient cloudSolrClient = new CloudSolrClient.Builder(urlList, Optional.of(chroot))
+ .withParallelUpdates(true)
+ .withDefaultCollection(info.getColl())
+ .build();
+
+ return new SolrIndexClient(cloudSolrClient);
+ }
+
+}
diff --git a/src/main/java/eu/dnetlib/app/directindex/solr/SolrRecordMappper.java b/src/main/java/eu/dnetlib/app/directindex/solr/SolrRecordMappper.java
new file mode 100644
index 0000000..e190f3f
--- /dev/null
+++ b/src/main/java/eu/dnetlib/app/directindex/solr/SolrRecordMappper.java
@@ -0,0 +1,10 @@
+package eu.dnetlib.app.directindex.solr;
+
+public class SolrRecordMappper {
+
+ public static String toSolr(final String record) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/src/main/java/eu/dnetlib/app/directindex/solr/StreamingInputDocumentFactory.java b/src/main/java/eu/dnetlib/app/directindex/solr/StreamingInputDocumentFactory.java
new file mode 100644
index 0000000..185e264
--- /dev/null
+++ b/src/main/java/eu/dnetlib/app/directindex/solr/StreamingInputDocumentFactory.java
@@ -0,0 +1,250 @@
+package eu.dnetlib.app.directindex.solr;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLEventFactory;
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLEventWriter;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.Namespace;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+
+import org.apache.solr.common.SolrInputDocument;
+
+/**
+ * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
+ *
+ * Faster because:
+ *
+ * - Doesn't create a DOM for the full document
+ *
- Doesn't execute xpaths agains the DOM
+ *
- Quickly serialize the 'result' element directly in a string.
+ *
- Uses less memory: less pressure on GC and allows more threads to process this in parallel
+ *
+ *
+ * This class is fully reentrant and can be invoked in parallel.
+ *
+ * @author claudio
+ */
+public class StreamingInputDocumentFactory {
+
+ private static final String INDEX_FIELD_PREFIX = "__";
+
+ private static final String DS_VERSION = INDEX_FIELD_PREFIX + "dsversion";
+
+ private static final String DS_ID = INDEX_FIELD_PREFIX + "dsid";
+
+ private static final String RESULT = "result";
+
+ private static final String INDEX_RESULT = INDEX_FIELD_PREFIX + RESULT;
+
+ private static final String INDEX_RECORD_ID = INDEX_FIELD_PREFIX + "indexrecordidentifier";
+
+ private static final String DEFAULTDNETRESULT = "dnetResult";
+
+ private static final String TARGETFIELDS = "targetFields";
+
+ private static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
+
+ private static final String ROOT_ELEMENT = "indexRecord";
+
+ private static final int MAX_FIELD_LENGTH = 25000;
+
+ private final ThreadLocal inputFactory = ThreadLocal
+ .withInitial(XMLInputFactory::newInstance);
+
+ private final ThreadLocal outputFactory = ThreadLocal
+ .withInitial(XMLOutputFactory::newInstance);
+
+ private final ThreadLocal eventFactory = ThreadLocal
+ .withInitial(XMLEventFactory::newInstance);
+
+ private final String version;
+
+ private String resultName = DEFAULTDNETRESULT;
+
+ public StreamingInputDocumentFactory(final String version) {
+ this(version, DEFAULTDNETRESULT);
+ }
+
+ public StreamingInputDocumentFactory(final String version, final String resultName) {
+ this.version = version;
+ this.resultName = resultName;
+ }
+
+ public SolrInputDocument parseDocument(final String inputDocument) {
+
+ final StringWriter results = new StringWriter();
+ final List nsList = new LinkedList<>();
+ try {
+
+ final XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
+
+ final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
+
+ while (parser.hasNext()) {
+ final XMLEvent event = parser.nextEvent();
+ if (event != null && event.isStartElement()) {
+ final String localName = event.asStartElement().getName().getLocalPart();
+
+ if (ROOT_ELEMENT.equals(localName)) {
+ nsList.addAll(getNamespaces(event));
+ } else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
+ final XMLEvent text = parser.nextEvent();
+ final String recordId = getText(text);
+ indexDocument.addField(INDEX_RECORD_ID, recordId);
+ } else if (TARGETFIELDS.equals(localName)) {
+ parseTargetFields(indexDocument, parser);
+ } else if (resultName.equals(localName)) {
+ copyResult(indexDocument, results, parser, nsList, resultName);
+ }
+ }
+ }
+
+ if (version != null) {
+ indexDocument.addField(DS_VERSION, version);
+ }
+
+ if (!indexDocument.containsKey(INDEX_RECORD_ID)) { throw new IllegalStateException("cannot extract record ID from: " + inputDocument); }
+
+ return indexDocument;
+ } catch (final XMLStreamException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private List getNamespaces(final XMLEvent event) {
+ final List res = new LinkedList<>();
+ final Iterator nsIter = event.asStartElement().getNamespaces();
+ while (nsIter.hasNext()) {
+ final Namespace ns = nsIter.next();
+ res.add(ns);
+ }
+ return res;
+ }
+
+ /**
+ * Parse the targetFields block and add fields to the solr document.
+ *
+ * @param indexDocument
+ * @param parser
+ * @throws XMLStreamException
+ */
+ protected void parseTargetFields(
+ final SolrInputDocument indexDocument,
+ final XMLEventReader parser)
+ throws XMLStreamException {
+
+ boolean hasFields = false;
+
+ while (parser.hasNext()) {
+ final XMLEvent targetEvent = parser.nextEvent();
+ if (targetEvent.isEndElement()
+ && TARGETFIELDS.equals(targetEvent.asEndElement().getName().getLocalPart())) {
+ break;
+ }
+
+ if (targetEvent.isStartElement()) {
+ final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
+ final XMLEvent text = parser.nextEvent();
+
+ final String data = getText(text);
+
+ addField(indexDocument, fieldName, data);
+ hasFields = true;
+ }
+ }
+
+ if (!hasFields) {
+ indexDocument.clear();
+ }
+ }
+
+ /**
+ * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
+ *
+ * @param indexDocument
+ * @param results
+ * @param parser
+ * @param nsList
+ * @throws XMLStreamException
+ */
+ protected void copyResult(
+ final SolrInputDocument indexDocument,
+ final StringWriter results,
+ final XMLEventReader parser,
+ final List nsList,
+ final String dnetResult)
+ throws XMLStreamException {
+ final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
+
+ for (final Namespace ns : nsList) {
+ eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
+ }
+
+ final StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
+
+ // new root record
+ writer.add(newRecord);
+
+ // copy the rest as it is
+ while (parser.hasNext()) {
+ final XMLEvent resultEvent = parser.nextEvent();
+
+ // TODO: replace with depth tracking instead of close tag tracking.
+ if (resultEvent.isEndElement()
+ && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
+ writer.add(eventFactory.get().createEndElement("", null, RESULT));
+ break;
+ }
+
+ writer.add(resultEvent);
+ }
+ writer.close();
+ indexDocument.addField(INDEX_RESULT, results.toString());
+ }
+
+ /**
+ * Helper used to add a field to a solr doc. It avoids to add empy fields
+ *
+ * @param indexDocument
+ * @param field
+ * @param value
+ */
+ private final void addField(
+ final SolrInputDocument indexDocument,
+ final String field,
+ final String value) {
+ final String cleaned = value.trim();
+ if (!cleaned.isEmpty()) {
+ // log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
+ indexDocument.addField(field.toLowerCase(), cleaned);
+ }
+ }
+
+ /**
+ * Helper used to get the string from a text element.
+ *
+ * @param text
+ * @return the
+ */
+ protected final String getText(final XMLEvent text) {
+ if (text.isEndElement()) {
+ // text.asEndElement().getName().getLocalPart());
+ return "";
+ }
+
+ final String data = text.asCharacters().getData();
+ if (data != null && data.length() > MAX_FIELD_LENGTH) { return data.substring(0, MAX_FIELD_LENGTH); }
+
+ return data;
+ }
+}
diff --git a/src/main/java/eu/dnetlib/app/directindex/sword/model/SwordMetadataDocument.java b/src/main/java/eu/dnetlib/app/directindex/sword/model/SwordMetadataDocument.java
deleted file mode 100644
index ea3c5e5..0000000
--- a/src/main/java/eu/dnetlib/app/directindex/sword/model/SwordMetadataDocument.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package eu.dnetlib.app.directindex.sword.model;
-
-
-public class SwordMetadataDocument {
-
-}
diff --git a/src/main/java/eu/dnetlib/app/directindex/tasks/OafMapper.java b/src/main/java/eu/dnetlib/app/directindex/tasks/OafMapper.java
new file mode 100644
index 0000000..28e3579
--- /dev/null
+++ b/src/main/java/eu/dnetlib/app/directindex/tasks/OafMapper.java
@@ -0,0 +1,18 @@
+package eu.dnetlib.app.directindex.tasks;
+
+import eu.dnetlib.app.directindex.input.ResultEntry;
+
+public class OafMapper {
+
+ public static String toOAF(final ResultEntry result) {
+ // TODO
+
+ return null;
+ }
+
+ public static ResultEntry toResultEntry(final String oaf) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java
new file mode 100644
index 0000000..b4a3e42
--- /dev/null
+++ b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java
@@ -0,0 +1,113 @@
+package eu.dnetlib.app.directindex.tasks;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.app.directindex.errors.DirectIndexApiException;
+import eu.dnetlib.app.directindex.input.ResultEntry;
+import eu.dnetlib.app.directindex.is.ISLookupClient;
+import eu.dnetlib.app.directindex.is.IndexDsInfo;
+import eu.dnetlib.app.directindex.repo.PendingAction;
+import eu.dnetlib.app.directindex.repo.PendingActionRepository;
+import eu.dnetlib.app.directindex.solr.SolrIndexClient;
+import eu.dnetlib.app.directindex.solr.SolrIndexClientFactory;
+
+@Component
+@ConditionalOnProperty(value = "directindex.scheduling.enabled", havingValue = "true", matchIfMissing = false)
+public class ScheduledActions {
+
+ private static final Log log = LogFactory.getLog(ScheduledActions.class);
+
+ @Autowired
+ private ISLookupClient isLookupClient;
+
+ @Autowired
+ private SolrIndexClientFactory solrIndexClientFactory;
+
+ @Autowired
+ private PendingActionRepository pendingActionRepository;
+
+ @Scheduled(fixedDelay = 5 * 60 * 1000) // 5 minutes
+ public void indexNewRecords() throws DirectIndexApiException {
+ log.info("Indexing new records...");
+
+ final List list = pendingActionRepository.recentActions();
+
+ if (list.size() > 0) {
+ final IndexDsInfo info = isLookupClient.currentIndexDsInfo();
+ final SolrIndexClient solr = solrIndexClientFactory.getClient(info);
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+
+ solr.addRecords(list.stream()
+ .map(PendingAction::getBody)
+ .map(s -> {
+ try {
+ return objectMapper.readValue(s, ResultEntry.class);
+ } catch (final Exception e) {
+ log.error(e);
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .map(OafMapper::toOAF)
+ .filter(StringUtils::isNotBlank));
+
+ solr.commit();
+
+ updateExecutionDate(list);
+ }
+
+ log.info(String.format("done (indexed records: %s)", list.size()));
+
+ }
+
+ @Scheduled(fixedDelay = 30 * 60 * 1000) // 30 minutes
+ public void deleteRecords() {
+ log.info("Deleting records from index...");
+
+ final List list = pendingActionRepository.toDeleteRecords();
+
+ if (list.size() > 0) {
+ final IndexDsInfo info = isLookupClient.currentIndexDsInfo();
+ final SolrIndexClient solr = solrIndexClientFactory.getClient(info);
+
+ list.stream().map(PendingAction::getId).forEach(id -> {
+ try {
+ final String query = String.format("objidentifier:\"%s\" OR resultdupid:\"%s\"", id, id);
+ solr.deleteByQuery(query);
+ } catch (final DirectIndexApiException e) {
+ log.error(e);
+ }
+ });
+
+ updateExecutionDate(list);
+ }
+
+ log.info(String.format("done (deleted records: %s)", list.size()));
+ }
+
+ private void updateExecutionDate(final List list) {
+ final OffsetDateTime now = OffsetDateTime.now();
+ list.forEach(r -> r.setExecutionDate(now));
+ pendingActionRepository.saveAll(list);
+ }
+
+ @Scheduled(fixedDelay = 24 * 60 * 60 * 1000) // 24 hours
+ public void removeOldRecordsFromDB() {
+ // TODO
+
+ }
+
+}