index manager to query and save records

This commit is contained in:
Michele Artini 2023-10-26 12:22:48 +02:00
parent eac1514b42
commit d3ac59190e
8 changed files with 466 additions and 12 deletions

View File

@ -19,6 +19,12 @@
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>9.4.0</version>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -0,0 +1,170 @@
package eu.dnetlib.services.index.service;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrQuery.ORDER;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import eu.dnetlib.domain.index.IndexConfiguration;
import eu.dnetlib.domain.index.IndexField;
import eu.dnetlib.domain.index.IndexFieldType;
import eu.dnetlib.errors.DnetException;
import eu.dnetlib.errors.DnetRuntimeException;
import eu.dnetlib.utils.DateUtils;
@Service
public class SolrService {
// https://solr.apache.org/guide/solr/latest/deployment-guide/solrj.html
@Value("${dnet.index.solr.urls}")
private String[] solrUrls;
private static final Log log = LogFactory.getLog(SolrService.class);
public List<Map<String, Object>> query(final IndexConfiguration conf, final String q, final int from, final int limit, final String sortField)
throws DnetException {
try (final SolrClient solr = newSolrClient()) {
final SolrQuery query = new SolrQuery(q);
query.setStart(from);
query.setRows(limit);
conf.getFields()
.stream()
.filter(IndexField::isResult)
.map(IndexField::getName)
.forEach(query::addField);
if (StringUtils.isNotBlank(sortField)) {
query.setSort("id", ORDER.asc);
}
final QueryResponse response = solr.query(conf.getId(), query);
final SolrDocumentList documents = response.getResults();
log.debug("Found " + documents.getNumFound() + " documents");
return documents.stream()
.map(SolrDocument::getFieldValueMap)
.collect(Collectors.toList());
} catch (final Throwable e) {
log.error("error executing query", e);
throw new DnetException("error executing query", e);
}
}
public int indexRecord(final IndexConfiguration conf, final String xml, final boolean commit) throws DnetException {
try (final SolrClient solr = newSolrClient()) {
final UpdateResponse updateResponse = solr.add(conf.getId(), asSolrDocument(conf, xml));
if (commit) {
forceCommit(solr, conf.getId());
}
return updateResponse.getResponse().size();
} catch (final Throwable e) {
log.error("error indexing a record", e);
log.debug(xml);
throw new DnetException("error indexing a record", e);
}
}
public int indexRecords(final IndexConfiguration conf, final Stream<String> inputStream) throws DnetException {
try (final SolrClient solr = newSolrClient()) {
final Iterator<SolrInputDocument> iterator = inputStream.map(s -> asSolrDocument(conf, s)).iterator();
final UpdateResponse updateResponse = solr.add(conf.getId(), iterator);
forceCommit(solr, conf.getId());
return updateResponse.getResponse().size();
} catch (final Throwable e) {
log.error("error indexing a record from the stream", e);
throw new DnetException("error indexing a record from the stream", e);
}
}
public void commit(final IndexConfiguration conf) throws DnetException {
try (final SolrClient solr = newSolrClient()) {
forceCommit(solr, conf.getId());
} catch (final Throwable e) {
log.error("error executing commit", e);
throw new DnetException("error executing commit", e);
}
}
private SolrClient newSolrClient() {
return new CloudSolrClient.Builder(Arrays.asList(solrUrls)).build();
}
private void forceCommit(final SolrClient client, final String solrCollection) throws SolrServerException, IOException {
client.commit(solrCollection);
}
private SolrInputDocument asSolrDocument(final IndexConfiguration conf, final String xml) {
try {
final Document xmlDoc = DocumentHelper.parseText(xml);
final SolrInputDocument doc = new SolrInputDocument();
conf.getFields()
.stream()
.filter(f -> StringUtils.isNotBlank(f.getName()))
.forEach(f -> {
if (StringUtils.isNotBlank(f.getXpath()) && f.isMultiValued()) {
final Object[] arr = xmlDoc.selectNodes(f.getXpath())
.stream()
.map(Node::getText)
.map(String::trim)
.distinct()
.map(s -> convertToType(s, f.getType()))
.toArray();
doc.addField(f.getName(), arr);
} else if (StringUtils.isNotBlank(f.getXpath()) && !f.isMultiValued()) {
doc.addField(f.getName(), convertToType(xmlDoc.valueOf(f.getXpath()), f.getType()));
} else {
doc.addField(f.getName(), convertToType(f.getConstant(), f.getType()));
}
});
return doc;
} catch (final Throwable e) {
log.error("error preparing solr document", e);
log.debug(xml);
throw new DnetRuntimeException("error preparing solr document", e);
}
}
private Object convertToType(final String s, final IndexFieldType type) {
return switch (type) {
case STRING -> s;
case LONG -> NumberUtils.toLong(s);
case DOUBLE -> NumberUtils.toDouble(s);
case BOOLEAN -> BooleanUtils.toBoolean(s);
case DATE -> DateUtils.parseDate(s);
case DATETIME -> DateUtils.parseDateTime(s);
default -> s;
};
}
}

View File

@ -2,6 +2,7 @@ package eu.dnetlib.utils;
import java.io.StringWriter;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
@ -14,6 +15,9 @@ public class DateUtils {
private static final long HOUR = MINUTE * 60;
private static final long DAY = HOUR * 24;
private static final long YEAR = DAY * 365;
private static final DateTimeFormatter DATEFORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd", Locale.getDefault());
private static final DateTimeFormatter ISO8601FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ", Locale.getDefault());
public static String elapsedTime(long t) {
@ -82,15 +86,12 @@ public class DateUtils {
public static String calculate_ISO8601(final long l) {
final LocalDateTime time =
LocalDateTime.ofInstant(Instant.ofEpochMilli(l), TimeZone
.getDefault()
.toZoneId());
LocalDateTime.ofInstant(Instant.ofEpochMilli(l), TimeZone
.getDefault()
.toZoneId());
String result = time.format(ISO8601FORMAT);
// convert YYYYMMDDTHH:mm:ss+HH00 into YYYYMMDDTHH:mm:ss+HH:00
// - note the added colon for the Timezone
result = result.substring(0, result.length() - 2) + ":" + result.substring(result.length() - 2);
return result;
final String result = time.format(ISO8601FORMAT);
return result.substring(0, result.length() - 2) + ":" + result.substring(result.length() - 2);
}
private static String floor(final long d, final long n) {
@ -101,4 +102,12 @@ public class DateUtils {
return Instant.now().toEpochMilli();
}
public static LocalDate parseDate(final String s) {
return LocalDate.parse(s, DATEFORMAT);
}
public static LocalDateTime parseDateTime(final String s) {
return LocalDateTime.parse(s, ISO8601FORMAT);
}
}

View File

@ -1,10 +1,54 @@
package eu.dnetlib.domain.index;
public class IndexConfiguration {
import java.io.Serializable;
import java.util.Set;
// TODO (MEDIUM PRIORITY)
import jakarta.persistence.CascadeType;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.FetchType;
import jakarta.persistence.Id;
import jakarta.persistence.OneToMany;
import jakarta.persistence.Table;
@Entity
@Table(name = "index_configurations")
public class IndexConfiguration implements Serializable {
private static final long serialVersionUID = -7273938084794386017L;
@Id
@Column(name = "id")
private String id;
@Column(name = "description")
private String description;
@OneToMany(mappedBy = "indexId", cascade = CascadeType.ALL, fetch = FetchType.EAGER)
private Set<IndexField> fields;
public String getId() {
return null;
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getDescription() {
return description;
}
public void setDescription(final String description) {
this.description = description;
}
public Set<IndexField> getFields() {
return fields;
}
public void setFields(final Set<IndexField> fields) {
this.fields = fields;
}
}

View File

@ -0,0 +1,166 @@
package eu.dnetlib.domain.index;
import java.io.Serializable;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.EnumType;
import jakarta.persistence.Enumerated;
import jakarta.persistence.Id;
import jakarta.persistence.IdClass;
import jakarta.persistence.Table;
@Entity
@IdClass(IndexFieldPK.class)
@Table(name = "index_fields")
public class IndexField implements Serializable {
private static final long serialVersionUID = -1414722965422426989L;
@Id
@Column(name = "idx_id")
private String indexId;
@Id
@Column(name = "name")
private String name;
@Column(name = "constant")
private String constant;
@Column(name = "xpath")
private String xpath;
@Enumerated(EnumType.STRING)
@Column(name = "type")
private IndexFieldType type;
@Column(name = "indexable")
private boolean indexable;
@Column(name = "result")
private boolean result;
@Column(name = "header")
private boolean header;
@Column(name = "stat")
private boolean stat;
@Column(name = "tokenizable")
private boolean tokenizable;
@Column(name = "multivalued")
private boolean multiValued;
@Column(name = "stored")
private boolean stored;
@Column(name = "copy")
private boolean copy;
public String getIndexId() {
return indexId;
}
public void setIndexId(final String indexId) {
this.indexId = indexId;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String getXpath() {
return xpath;
}
public void setXpath(final String xpath) {
this.xpath = xpath;
}
public String getConstant() {
return constant;
}
public void setConstant(final String constant) {
this.constant = constant;
}
public IndexFieldType getType() {
return type;
}
public void setType(final IndexFieldType type) {
this.type = type;
}
public boolean isIndexable() {
return indexable;
}
public void setIndexable(final boolean indexable) {
this.indexable = indexable;
}
public boolean isResult() {
return result;
}
public void setResult(final boolean result) {
this.result = result;
}
public boolean isHeader() {
return header;
}
public void setHeader(final boolean header) {
this.header = header;
}
public boolean isStat() {
return stat;
}
public void setStat(final boolean stat) {
this.stat = stat;
}
public boolean isTokenizable() {
return tokenizable;
}
public void setTokenizable(final boolean tokenizable) {
this.tokenizable = tokenizable;
}
public boolean isMultiValued() {
return multiValued;
}
public void setMultiValued(final boolean multiValued) {
this.multiValued = multiValued;
}
public boolean isStored() {
return stored;
}
public void setStored(final boolean stored) {
this.stored = stored;
}
public boolean isCopy() {
return copy;
}
public void setCopy(final boolean copy) {
this.copy = copy;
}
}

View File

@ -0,0 +1,54 @@
package eu.dnetlib.domain.index;
import java.io.Serializable;
import java.util.Objects;
public class IndexFieldPK implements Serializable {
private static final long serialVersionUID = -2193243559770663481L;
private String indexId;
private String field;
public IndexFieldPK() {}
public IndexFieldPK(final String indexId, final String field) {
this.indexId = indexId;
this.field = field;
}
public String getIndexId() {
return indexId;
}
public void setIndexId(final String indexId) {
this.indexId = indexId;
}
public String getField() {
return field;
}
public void setField(final String field) {
this.field = field;
}
@Override
public int hashCode() {
return Objects.hash(field, indexId);
}
@Override
public boolean equals(final Object obj) {
if (this == obj) { return true; }
if ((obj == null) || (getClass() != obj.getClass())) { return false; }
final IndexFieldPK other = (IndexFieldPK) obj;
return Objects.equals(field, other.field) && Objects.equals(indexId, other.indexId);
}
@Override
public String toString() {
return "IndexFieldPK [indexId=" + indexId + ", field=" + field + "]";
}
}

View File

@ -0,0 +1,5 @@
package eu.dnetlib.domain.index;
public enum IndexFieldType {
STRING, DOUBLE, BOOLEAN, LONG, DATE, DATETIME
}

View File

@ -199,7 +199,7 @@ services:
- wfs
solr:
image: solr:9.3.0
image: solr:9.4.0
expose:
- ${SOLR_PORT}
networks: