refactoring

This commit is contained in:
Michele Artini 2024-12-09 14:46:49 +01:00
parent c518c18a12
commit 605147e456
13 changed files with 216 additions and 187 deletions

View File

@ -84,7 +84,7 @@ public class CommunityClient implements HasCache {
@CacheEvict(value = { "contexts" }, allEntries = true)
public void clearCache() {}
public class Community implements Serializable {
public static class Community implements Serializable {
private static final long serialVersionUID = 6566834038680683536L;
@ -145,7 +145,7 @@ public class CommunityClient implements HasCache {
}
public class SubCommunity implements Serializable {
public static class SubCommunity implements Serializable {
private static final long serialVersionUID = 6363561947231890039L;
@ -179,7 +179,7 @@ public class CommunityClient implements HasCache {
}
public class ZenodoContextList implements Serializable {
public static class ZenodoContextList implements Serializable {
private static final long serialVersionUID = -8575901008472098218L;
@ -207,7 +207,7 @@ public class CommunityClient implements HasCache {
}
public class ContextInfo implements Serializable {
public static class ContextInfo implements Serializable {
private static final long serialVersionUID = 96456546778111904L;

View File

@ -49,7 +49,7 @@ public class DatasourceManagerClient implements HasCache {
@CacheEvict(value = "datasources", allEntries = true)
public void clearCache() {}
public class DsmDatasourceInfo implements Serializable {
public static class DsmDatasourceInfo implements Serializable {
private static final long serialVersionUID = -593392381920400974L;
@ -83,7 +83,7 @@ public class DatasourceManagerClient implements HasCache {
}
public class DsmSearchRequest implements Serializable {
public static class DsmSearchRequest implements Serializable {
private static final long serialVersionUID = -2532361140043817319L;
@ -104,7 +104,7 @@ public class DatasourceManagerClient implements HasCache {
}
}
public class DsmSearchResponse implements Serializable {
public static class DsmSearchResponse implements Serializable {
private static final long serialVersionUID = 8944122902111813747L;

View File

@ -1,5 +1,7 @@
package eu.dnetlib.app.directindex.clients;
import java.io.Serializable;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -210,4 +212,99 @@ public class ProjectClient implements HasCache {
return "RCUK".equals(funderShortName) ? "UKRI" : funderShortName;
}
public static class ProjectInfo implements Serializable {
private static final long serialVersionUID = 4433787349231982285L;
private String id;
private String acronym;
private String title;
private String code;
private String jurisdiction;
private String funderId;
private String funderShortName;
private String funderName;
private String fundingId;
private String fundingName;
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(final String acronym) {
this.acronym = acronym;
}
public String getTitle() {
return title;
}
public void setTitle(final String title) {
this.title = title;
}
public String getCode() {
return code;
}
public void setCode(final String code) {
this.code = code;
}
public String getJurisdiction() {
return jurisdiction;
}
public void setJurisdiction(final String jurisdiction) {
this.jurisdiction = jurisdiction;
}
public String getFunderId() {
return funderId;
}
public void setFunderId(final String funderId) {
this.funderId = funderId;
}
public String getFunderShortName() {
return funderShortName;
}
public void setFunderShortName(final String funderShortName) {
this.funderShortName = funderShortName;
}
public String getFunderName() {
return funderName;
}
public void setFunderName(final String funderName) {
this.funderName = funderName;
}
public String getFundingId() {
return fundingId;
}
public void setFundingId(final String fundingId) {
this.fundingId = fundingId;
}
public String getFundingName() {
return fundingName;
}
public void setFundingName(final String fundingName) {
this.fundingName = fundingName;
}
}
}

View File

@ -1,99 +0,0 @@
package eu.dnetlib.app.directindex.clients;
import java.io.Serializable;
public class ProjectInfo implements Serializable {
private static final long serialVersionUID = 4433787349231982285L;
private String id;
private String acronym;
private String title;
private String code;
private String jurisdiction;
private String funderId;
private String funderShortName;
private String funderName;
private String fundingId;
private String fundingName;
public String getId() {
return id;
}
public void setId(final String id) {
this.id = id;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(final String acronym) {
this.acronym = acronym;
}
public String getTitle() {
return title;
}
public void setTitle(final String title) {
this.title = title;
}
public String getCode() {
return code;
}
public void setCode(final String code) {
this.code = code;
}
public String getJurisdiction() {
return jurisdiction;
}
public void setJurisdiction(final String jurisdiction) {
this.jurisdiction = jurisdiction;
}
public String getFunderId() {
return funderId;
}
public void setFunderId(final String funderId) {
this.funderId = funderId;
}
public String getFunderShortName() {
return funderShortName;
}
public void setFunderShortName(final String funderShortName) {
this.funderShortName = funderShortName;
}
public String getFunderName() {
return funderName;
}
public void setFunderName(final String funderName) {
this.funderName = funderName;
}
public String getFundingId() {
return fundingId;
}
public void setFundingId(final String fundingId) {
this.fundingId = fundingId;
}
public String getFundingName() {
return fundingName;
}
public void setFundingName(final String fundingName) {
this.fundingName = fundingName;
}
}

View File

@ -57,7 +57,7 @@ public class VocabularyClient implements HasCache {
@CacheEvict(value = "vocabularies", allEntries = true)
public void clearCache() {}
public class Vocabulary implements Serializable {
public static class Vocabulary implements Serializable {
private static final long serialVersionUID = -865664758603896385L;
@ -108,7 +108,7 @@ public class VocabularyClient implements HasCache {
}
}
public class VocTerm implements Serializable {
public static class VocTerm implements Serializable {
private static final long serialVersionUID = 1618105318956602387L;

View File

@ -11,14 +11,17 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.app.directindex.clients.CommunityClient;
import eu.dnetlib.app.directindex.clients.CommunityClient.ContextInfo;
import eu.dnetlib.app.directindex.clients.DatasourceManagerClient;
import eu.dnetlib.app.directindex.clients.ProjectClient;
import eu.dnetlib.app.directindex.clients.ProjectInfo;
import eu.dnetlib.app.directindex.clients.ProjectClient.ProjectInfo;
import eu.dnetlib.app.directindex.clients.VocabularyClient;
import eu.dnetlib.app.directindex.errors.DirectIndexApiException;
import eu.dnetlib.app.directindex.input.DatasourceEntry;
@ -46,6 +49,7 @@ import eu.dnetlib.dhp.schema.solr.Result;
import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.schema.solr.SolrRecordHeader;
import eu.dnetlib.dhp.schema.solr.SolrRecordHeader.Status;
import eu.dnetlib.dhp.solr.mapping.SolrInputDocumentMapper;
@Component
public class SolrRecordMapper {
@ -64,6 +68,11 @@ public class SolrRecordMapper {
private static final Log log = LogFactory.getLog(SolrRecordMapper.class);
public SolrInputDocument toSolrInputRecord(final ResultEntry result) throws JsonProcessingException {
final SolrRecord sr = toSolrRecord(result);
return SolrInputDocumentMapper.map(sr, XMLSolrSerializer.generateXML(sr));
}
public SolrRecord toSolrRecord(final ResultEntry result) {
final Provenance collectedFrom = prepareProvenance(result.getCollectedFromId());
final Provenance hostedBy = StringUtils.isNotBlank(result.getHostedById()) ? prepareProvenance(result.getHostedById()) : collectedFrom;
@ -439,4 +448,5 @@ public class SolrRecordMapper {
public void setProjectClient(final ProjectClient projectClient) {
this.projectClient = projectClient;
}
}

View File

@ -36,6 +36,9 @@ public class PendingAction {
@Column(name = "execution_date")
private LocalDateTime executionDate;
@Column(name = "error")
private String error;
public String getId() {
return id;
}
@ -92,4 +95,12 @@ public class PendingAction {
this.executionDate = executionDate;
}
public String getError() {
return error;
}
public void setError(final String error) {
this.error = error;
}
}

View File

@ -49,4 +49,6 @@ public interface PendingActionRepository extends JpaRepository<PendingAction, St
@Query(value = "select max(execution_date) from pending_actions", nativeQuery = true)
LocalDateTime findMaxExecutionDate();
long countByErrorIsNotNull();
}

View File

@ -140,6 +140,8 @@ public class DirectIndexService {
info.put("min_execution_date", pendingActionRepository.findMinExecutionDate());
info.put("max_execution_date", pendingActionRepository.findMaxExecutionDate());
info.put("errors", pendingActionRepository.countByErrorIsNotNull());
return info;
}

View File

@ -5,62 +5,49 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.app.directindex.errors.DirectIndexApiException;
import eu.dnetlib.app.directindex.mapping.XMLSolrSerializer;
import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.solr.mapping.SolrInputDocumentMapper;
public class SolrIndexClient {
private static final Log log = LogFactory.getLog(SolrIndexClient.class);
private final SolrClient solrClient;
public SolrIndexClient(final SolrClient solrClient) {
this.solrClient = solrClient;
}
public void addRecords(final Stream<SolrRecord> records) throws DirectIndexApiException {
public void commit() throws DirectIndexApiException {
try {
final Iterator<SolrInputDocument> iter = records.map(this::prepareSolrDocument)
.map(r -> {
if (log.isDebugEnabled()) {
try {
log.debug("Indexing record: " + new ObjectMapper().writeValueAsString(r));
} catch (final JsonProcessingException e) {
log.debug("The record seems invalid: " + r);
}
}
return r;
})
.filter(o -> o != null)
.iterator();
solrClient.add(iter);
solrClient.commit();
} catch (final Throwable e) {
throw new DirectIndexApiException("Error performing commit", e);
}
}
public UpdateResponse addRecords(final Iterator<SolrInputDocument> iter) throws DirectIndexApiException {
try {
// TODO: parse update response
return solrClient.add(iter);
} catch (final Throwable e) {
throw new DirectIndexApiException("Error creating solr document", e);
}
}
public void addRecords(final SolrRecord... records) throws Exception {
addRecords(Arrays.stream(records));
public void addRecords(final SolrInputDocument... records) throws Exception {
addRecords(Arrays.stream(records).iterator());
}
public void deleteRecord(final String id) throws DirectIndexApiException {
@ -69,21 +56,11 @@ public class SolrIndexClient {
// ClientUtils.escapeQueryChars(id));
final String query = String.format("objidentifier:\"%s\" OR resultdupid:\"%s\"", id, id);
solrClient.deleteByQuery(query);
solrClient.commit();
} catch (SolrServerException | IOException e) {
throw new DirectIndexApiException(e);
}
}
protected SolrInputDocument prepareSolrDocument(final SolrRecord record) {
try {
return SolrInputDocumentMapper.map(record, XMLSolrSerializer.generateXML(record));
} catch (final Throwable e) {
log.error("Error preparing solr record", e);
return null;
}
}
public SolrRecord findRecord(final String id) throws DirectIndexApiException {
try {
final QueryResponse response = solrClient.query(queryParamsForId(id));

View File

@ -1,12 +1,16 @@
package eu.dnetlib.app.directindex.tasks;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
@ -14,7 +18,6 @@ import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.app.directindex.errors.DirectIndexApiException;
import eu.dnetlib.app.directindex.input.ResultEntry;
import eu.dnetlib.app.directindex.mapping.SolrRecordMapper;
import eu.dnetlib.app.directindex.repo.PendingAction;
@ -39,41 +42,54 @@ public class ScheduledActions {
@Autowired
private PendingActionRepository pendingActionRepository;
@Scheduled(initialDelay = 5, fixedDelay = 5, timeUnit = TimeUnit.MINUTES)
public void indexNewRecords() throws DirectIndexApiException {
@Scheduled(initialDelay = 1, fixedDelay = 5, timeUnit = TimeUnit.MINUTES)
public void indexRecords() {
if (!enabled) {
log.info("SKIP");
return;
}
log.info("Indexing new records...");
try {
log.info("Indexing new records...");
final List<PendingAction> list = pendingActionRepository.findInsertOrUpdateOperations();
final List<PendingAction> list = pendingActionRepository.findInsertOrUpdateOperations();
if (list.size() > 0) {
final SolrIndexClient solr = solrIndexClientFactory.getClient();
final Map<String, String> invalids = new HashMap<>();
final ObjectMapper objectMapper = new ObjectMapper();
if (list.size() > 0) {
final SolrIndexClient solr = solrIndexClientFactory.getClient();
solr.addRecords(list.stream()
.map(PendingAction::getBody)
.map(s -> {
try {
return objectMapper.readValue(s, ResultEntry.class);
} catch (final Exception e) {
log.error(e);
return null;
}
})
.filter(Objects::nonNull)
.map(solrRecordMapper::toSolrRecord)
.filter(Objects::nonNull));
final ObjectMapper objectMapper = new ObjectMapper();
updateExecutionDate(list);
final Iterator<SolrInputDocument> iter = list.stream()
.map(pendingAction -> {
final String id = pendingAction.getId();
final String body = pendingAction.getBody();
try {
final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class);
final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry);
return doc;
} catch (final Throwable e) {
invalids.put(id, e.getClass().getName() + ": " + e.getMessage());
log.error(e);
return null;
}
})
.filter(Objects::nonNull)
.iterator();
solr.addRecords(iter);
solr.commit();
updateExecutionDate(list, invalids);
}
log.info(String.format("Indexed records: %s", list.size()));
} catch (final Throwable e) {
log.error("The scheduled task is failed", e);
}
log.info(String.format("Indexed records: %s", list.size()));
}
@Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.MINUTES)
@ -83,30 +99,42 @@ public class ScheduledActions {
return;
}
log.info("Deleting records from index...");
try {
log.info("Deleting records from index...");
final List<PendingAction> list = pendingActionRepository.findDeleteOperations();
final List<PendingAction> list = pendingActionRepository.findDeleteOperations();
final Map<String, String> invalids = new HashMap<>();
if (list.size() > 0) {
final SolrIndexClient solr = solrIndexClientFactory.getClient();
if (list.size() > 0) {
final SolrIndexClient solr = solrIndexClientFactory.getClient();
list.stream().map(PendingAction::getId).forEach(id -> {
try {
solr.deleteRecord(id);
} catch (final DirectIndexApiException e) {
log.error(e);
}
});
list.stream().map(PendingAction::getId).forEach(id -> {
try {
solr.deleteRecord(id);
} catch (final Throwable e) {
invalids.put(id, e.getMessage());
log.error(e);
}
});
updateExecutionDate(list);
solr.commit();
updateExecutionDate(list, invalids);
}
log.info(String.format("Deleted records: %s", list.size()));
} catch (final Throwable e) {
log.error("The scheduled task is failed", e);
}
log.info(String.format("Deleted records: %s", list.size()));
}
private void updateExecutionDate(final List<PendingAction> list) {
private void updateExecutionDate(final List<PendingAction> list, final Map<String, String> invalids) {
final LocalDateTime now = LocalDateTime.now();
list.forEach(r -> r.setExecutionDate(now));
list.forEach(r -> {
r.setError(invalids.getOrDefault(r.getId(), null));
r.setExecutionDate(now);
});
pendingActionRepository.saveAll(list);
}

View File

@ -7,5 +7,6 @@ CREATE TABLE pending_actions(
body text,
created_by text,
creation_date timestamp with time zone NOT NULL DEFAULT now(),
execution_date timestamp with time zone
execution_date timestamp with time zone,
error text
);

View File

@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.app.directindex.clients.CommunityClient;
import eu.dnetlib.app.directindex.clients.DatasourceManagerClient;
import eu.dnetlib.app.directindex.clients.ProjectClient;
import eu.dnetlib.app.directindex.clients.ProjectInfo;
import eu.dnetlib.app.directindex.clients.ProjectClient.ProjectInfo;
import eu.dnetlib.app.directindex.clients.VocabularyClient;
import eu.dnetlib.app.directindex.errors.DirectIndexApiException;
import eu.dnetlib.app.directindex.input.DatasourceEntry;