From 605147e4568fc387c49d185116ec075a2f84f0d1 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 9 Dec 2024 14:46:49 +0100 Subject: [PATCH] refactoring --- .../directindex/clients/CommunityClient.java | 8 +- .../clients/DatasourceManagerClient.java | 6 +- .../directindex/clients/ProjectClient.java | 97 ++++++++++++++++ .../app/directindex/clients/ProjectInfo.java | 99 ---------------- .../directindex/clients/VocabularyClient.java | 4 +- .../directindex/mapping/SolrRecordMapper.java | 12 +- .../app/directindex/repo/PendingAction.java | 11 ++ .../repo/PendingActionRepository.java | 2 + .../service/DirectIndexService.java | 2 + .../app/directindex/solr/SolrIndexClient.java | 49 +++----- .../directindex/tasks/ScheduledActions.java | 108 +++++++++++------- src/main/resources/sql/schema.sql | 3 +- .../mapping/SolrRecordMapperTest.java | 2 +- 13 files changed, 216 insertions(+), 187 deletions(-) delete mode 100644 src/main/java/eu/dnetlib/app/directindex/clients/ProjectInfo.java diff --git a/src/main/java/eu/dnetlib/app/directindex/clients/CommunityClient.java b/src/main/java/eu/dnetlib/app/directindex/clients/CommunityClient.java index 2445d90..b2445d6 100644 --- a/src/main/java/eu/dnetlib/app/directindex/clients/CommunityClient.java +++ b/src/main/java/eu/dnetlib/app/directindex/clients/CommunityClient.java @@ -84,7 +84,7 @@ public class CommunityClient implements HasCache { @CacheEvict(value = { "contexts" }, allEntries = true) public void clearCache() {} - public class Community implements Serializable { + public static class Community implements Serializable { private static final long serialVersionUID = 6566834038680683536L; @@ -145,7 +145,7 @@ public class CommunityClient implements HasCache { } - public class SubCommunity implements Serializable { + public static class SubCommunity implements Serializable { private static final long serialVersionUID = 6363561947231890039L; @@ -179,7 +179,7 @@ public class CommunityClient implements HasCache { } - public class ZenodoContextList implements Serializable { + public static class ZenodoContextList implements Serializable { private static final long serialVersionUID = -8575901008472098218L; @@ -207,7 +207,7 @@ public class CommunityClient implements HasCache { } - public class ContextInfo implements Serializable { + public static class ContextInfo implements Serializable { private static final long serialVersionUID = 96456546778111904L; diff --git a/src/main/java/eu/dnetlib/app/directindex/clients/DatasourceManagerClient.java b/src/main/java/eu/dnetlib/app/directindex/clients/DatasourceManagerClient.java index e4fd7e0..b301a3c 100644 --- a/src/main/java/eu/dnetlib/app/directindex/clients/DatasourceManagerClient.java +++ b/src/main/java/eu/dnetlib/app/directindex/clients/DatasourceManagerClient.java @@ -49,7 +49,7 @@ public class DatasourceManagerClient implements HasCache { @CacheEvict(value = "datasources", allEntries = true) public void clearCache() {} - public class DsmDatasourceInfo implements Serializable { + public static class DsmDatasourceInfo implements Serializable { private static final long serialVersionUID = -593392381920400974L; @@ -83,7 +83,7 @@ public class DatasourceManagerClient implements HasCache { } - public class DsmSearchRequest implements Serializable { + public static class DsmSearchRequest implements Serializable { private static final long serialVersionUID = -2532361140043817319L; @@ -104,7 +104,7 @@ public class DatasourceManagerClient implements HasCache { } } - public class DsmSearchResponse implements Serializable { + public static class DsmSearchResponse implements Serializable { private static final long serialVersionUID = 8944122902111813747L; diff --git a/src/main/java/eu/dnetlib/app/directindex/clients/ProjectClient.java b/src/main/java/eu/dnetlib/app/directindex/clients/ProjectClient.java index 63b7d06..87c1e32 100644 --- a/src/main/java/eu/dnetlib/app/directindex/clients/ProjectClient.java +++ b/src/main/java/eu/dnetlib/app/directindex/clients/ProjectClient.java @@ -1,5 +1,7 @@ package eu.dnetlib.app.directindex.clients; +import java.io.Serializable; + import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -210,4 +212,99 @@ public class ProjectClient implements HasCache { return "RCUK".equals(funderShortName) ? "UKRI" : funderShortName; } + public static class ProjectInfo implements Serializable { + + private static final long serialVersionUID = 4433787349231982285L; + + private String id; + private String acronym; + private String title; + private String code; + private String jurisdiction; + private String funderId; + private String funderShortName; + private String funderName; + private String fundingId; + private String fundingName; + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getAcronym() { + return acronym; + } + + public void setAcronym(final String acronym) { + this.acronym = acronym; + } + + public String getTitle() { + return title; + } + + public void setTitle(final String title) { + this.title = title; + } + + public String getCode() { + return code; + } + + public void setCode(final String code) { + this.code = code; + } + + public String getJurisdiction() { + return jurisdiction; + } + + public void setJurisdiction(final String jurisdiction) { + this.jurisdiction = jurisdiction; + } + + public String getFunderId() { + return funderId; + } + + public void setFunderId(final String funderId) { + this.funderId = funderId; + } + + public String getFunderShortName() { + return funderShortName; + } + + public void setFunderShortName(final String funderShortName) { + this.funderShortName = funderShortName; + } + + public String getFunderName() { + return funderName; + } + + public void setFunderName(final String funderName) { + this.funderName = funderName; + } + + public String getFundingId() { + return fundingId; + } + + public void setFundingId(final String fundingId) { + this.fundingId = fundingId; + } + + public String getFundingName() { + return fundingName; + } + + public void setFundingName(final String fundingName) { + this.fundingName = fundingName; + } + } } diff --git a/src/main/java/eu/dnetlib/app/directindex/clients/ProjectInfo.java b/src/main/java/eu/dnetlib/app/directindex/clients/ProjectInfo.java deleted file mode 100644 index 7f7da78..0000000 --- a/src/main/java/eu/dnetlib/app/directindex/clients/ProjectInfo.java +++ /dev/null @@ -1,99 +0,0 @@ -package eu.dnetlib.app.directindex.clients; - -import java.io.Serializable; - -public class ProjectInfo implements Serializable { - - private static final long serialVersionUID = 4433787349231982285L; - - private String id; - private String acronym; - private String title; - private String code; - private String jurisdiction; - private String funderId; - private String funderShortName; - private String funderName; - private String fundingId; - private String fundingName; - - public String getId() { - return id; - } - - public void setId(final String id) { - this.id = id; - } - - public String getAcronym() { - return acronym; - } - - public void setAcronym(final String acronym) { - this.acronym = acronym; - } - - public String getTitle() { - return title; - } - - public void setTitle(final String title) { - this.title = title; - } - - public String getCode() { - return code; - } - - public void setCode(final String code) { - this.code = code; - } - - public String getJurisdiction() { - return jurisdiction; - } - - public void setJurisdiction(final String jurisdiction) { - this.jurisdiction = jurisdiction; - } - - public String getFunderId() { - return funderId; - } - - public void setFunderId(final String funderId) { - this.funderId = funderId; - } - - public String getFunderShortName() { - return funderShortName; - } - - public void setFunderShortName(final String funderShortName) { - this.funderShortName = funderShortName; - } - - public String getFunderName() { - return funderName; - } - - public void setFunderName(final String funderName) { - this.funderName = funderName; - } - - public String getFundingId() { - return fundingId; - } - - public void setFundingId(final String fundingId) { - this.fundingId = fundingId; - } - - public String getFundingName() { - return fundingName; - } - - public void setFundingName(final String fundingName) { - this.fundingName = fundingName; - } -} diff --git a/src/main/java/eu/dnetlib/app/directindex/clients/VocabularyClient.java b/src/main/java/eu/dnetlib/app/directindex/clients/VocabularyClient.java index 55a0f31..872bfa3 100644 --- a/src/main/java/eu/dnetlib/app/directindex/clients/VocabularyClient.java +++ b/src/main/java/eu/dnetlib/app/directindex/clients/VocabularyClient.java @@ -57,7 +57,7 @@ public class VocabularyClient implements HasCache { @CacheEvict(value = "vocabularies", allEntries = true) public void clearCache() {} - public class Vocabulary implements Serializable { + public static class Vocabulary implements Serializable { private static final long serialVersionUID = -865664758603896385L; @@ -108,7 +108,7 @@ public class VocabularyClient implements HasCache { } } - public class VocTerm implements Serializable { + public static class VocTerm implements Serializable { private static final long serialVersionUID = 1618105318956602387L; diff --git a/src/main/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapper.java b/src/main/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapper.java index 7849b48..17e4676 100644 --- a/src/main/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapper.java +++ b/src/main/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapper.java @@ -11,14 +11,17 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.solr.common.SolrInputDocument; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import com.fasterxml.jackson.core.JsonProcessingException; + import eu.dnetlib.app.directindex.clients.CommunityClient; import eu.dnetlib.app.directindex.clients.CommunityClient.ContextInfo; import eu.dnetlib.app.directindex.clients.DatasourceManagerClient; import eu.dnetlib.app.directindex.clients.ProjectClient; -import eu.dnetlib.app.directindex.clients.ProjectInfo; +import eu.dnetlib.app.directindex.clients.ProjectClient.ProjectInfo; import eu.dnetlib.app.directindex.clients.VocabularyClient; import eu.dnetlib.app.directindex.errors.DirectIndexApiException; import eu.dnetlib.app.directindex.input.DatasourceEntry; @@ -46,6 +49,7 @@ import eu.dnetlib.dhp.schema.solr.Result; import eu.dnetlib.dhp.schema.solr.SolrRecord; import eu.dnetlib.dhp.schema.solr.SolrRecordHeader; import eu.dnetlib.dhp.schema.solr.SolrRecordHeader.Status; +import eu.dnetlib.dhp.solr.mapping.SolrInputDocumentMapper; @Component public class SolrRecordMapper { @@ -64,6 +68,11 @@ public class SolrRecordMapper { private static final Log log = LogFactory.getLog(SolrRecordMapper.class); + public SolrInputDocument toSolrInputRecord(final ResultEntry result) throws JsonProcessingException { + final SolrRecord sr = toSolrRecord(result); + return SolrInputDocumentMapper.map(sr, XMLSolrSerializer.generateXML(sr)); + } + public SolrRecord toSolrRecord(final ResultEntry result) { final Provenance collectedFrom = prepareProvenance(result.getCollectedFromId()); final Provenance hostedBy = StringUtils.isNotBlank(result.getHostedById()) ? prepareProvenance(result.getHostedById()) : collectedFrom; @@ -439,4 +448,5 @@ public class SolrRecordMapper { public void setProjectClient(final ProjectClient projectClient) { this.projectClient = projectClient; } + } diff --git a/src/main/java/eu/dnetlib/app/directindex/repo/PendingAction.java b/src/main/java/eu/dnetlib/app/directindex/repo/PendingAction.java index 640ed38..a8db97d 100644 --- a/src/main/java/eu/dnetlib/app/directindex/repo/PendingAction.java +++ b/src/main/java/eu/dnetlib/app/directindex/repo/PendingAction.java @@ -36,6 +36,9 @@ public class PendingAction { @Column(name = "execution_date") private LocalDateTime executionDate; + @Column(name = "error") + private String error; + public String getId() { return id; } @@ -92,4 +95,12 @@ public class PendingAction { this.executionDate = executionDate; } + public String getError() { + return error; + } + + public void setError(final String error) { + this.error = error; + } + } diff --git a/src/main/java/eu/dnetlib/app/directindex/repo/PendingActionRepository.java b/src/main/java/eu/dnetlib/app/directindex/repo/PendingActionRepository.java index 0b20c79..b92121d 100644 --- a/src/main/java/eu/dnetlib/app/directindex/repo/PendingActionRepository.java +++ b/src/main/java/eu/dnetlib/app/directindex/repo/PendingActionRepository.java @@ -49,4 +49,6 @@ public interface PendingActionRepository extends JpaRepository records) throws DirectIndexApiException { + public void commit() throws DirectIndexApiException { try { - final Iterator iter = records.map(this::prepareSolrDocument) - .map(r -> { - if (log.isDebugEnabled()) { - try { - log.debug("Indexing record: " + new ObjectMapper().writeValueAsString(r)); - } catch (final JsonProcessingException e) { - log.debug("The record seems invalid: " + r); - } - } - return r; - }) - .filter(o -> o != null) - .iterator(); - - solrClient.add(iter); solrClient.commit(); + } catch (final Throwable e) { + throw new DirectIndexApiException("Error performing commit", e); + } + } + + public UpdateResponse addRecords(final Iterator iter) throws DirectIndexApiException { + try { + // TODO: parse update response + return solrClient.add(iter); } catch (final Throwable e) { throw new DirectIndexApiException("Error creating solr document", e); } } - public void addRecords(final SolrRecord... records) throws Exception { - addRecords(Arrays.stream(records)); + public void addRecords(final SolrInputDocument... records) throws Exception { + addRecords(Arrays.stream(records).iterator()); } public void deleteRecord(final String id) throws DirectIndexApiException { @@ -69,21 +56,11 @@ public class SolrIndexClient { // ClientUtils.escapeQueryChars(id)); final String query = String.format("objidentifier:\"%s\" OR resultdupid:\"%s\"", id, id); solrClient.deleteByQuery(query); - solrClient.commit(); } catch (SolrServerException | IOException e) { throw new DirectIndexApiException(e); } } - protected SolrInputDocument prepareSolrDocument(final SolrRecord record) { - try { - return SolrInputDocumentMapper.map(record, XMLSolrSerializer.generateXML(record)); - } catch (final Throwable e) { - log.error("Error preparing solr record", e); - return null; - } - } - public SolrRecord findRecord(final String id) throws DirectIndexApiException { try { final QueryResponse response = solrClient.query(queryParamsForId(id)); diff --git a/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java index 16cdc25..6d8c2ba 100644 --- a/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java +++ b/src/main/java/eu/dnetlib/app/directindex/tasks/ScheduledActions.java @@ -1,12 +1,16 @@ package eu.dnetlib.app.directindex.tasks; import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.solr.common.SolrInputDocument; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; @@ -14,7 +18,6 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.app.directindex.errors.DirectIndexApiException; import eu.dnetlib.app.directindex.input.ResultEntry; import eu.dnetlib.app.directindex.mapping.SolrRecordMapper; import eu.dnetlib.app.directindex.repo.PendingAction; @@ -39,41 +42,54 @@ public class ScheduledActions { @Autowired private PendingActionRepository pendingActionRepository; - @Scheduled(initialDelay = 5, fixedDelay = 5, timeUnit = TimeUnit.MINUTES) - public void indexNewRecords() throws DirectIndexApiException { + @Scheduled(initialDelay = 1, fixedDelay = 5, timeUnit = TimeUnit.MINUTES) + public void indexRecords() { if (!enabled) { log.info("SKIP"); return; } - log.info("Indexing new records..."); + try { + log.info("Indexing new records..."); - final List list = pendingActionRepository.findInsertOrUpdateOperations(); + final List list = pendingActionRepository.findInsertOrUpdateOperations(); - if (list.size() > 0) { - final SolrIndexClient solr = solrIndexClientFactory.getClient(); + final Map invalids = new HashMap<>(); - final ObjectMapper objectMapper = new ObjectMapper(); + if (list.size() > 0) { + final SolrIndexClient solr = solrIndexClientFactory.getClient(); - solr.addRecords(list.stream() - .map(PendingAction::getBody) - .map(s -> { - try { - return objectMapper.readValue(s, ResultEntry.class); - } catch (final Exception e) { - log.error(e); - return null; - } - }) - .filter(Objects::nonNull) - .map(solrRecordMapper::toSolrRecord) - .filter(Objects::nonNull)); + final ObjectMapper objectMapper = new ObjectMapper(); - updateExecutionDate(list); + final Iterator iter = list.stream() + .map(pendingAction -> { + final String id = pendingAction.getId(); + final String body = pendingAction.getBody(); + + try { + final ResultEntry resultEntry = objectMapper.readValue(body, ResultEntry.class); + final SolrInputDocument doc = solrRecordMapper.toSolrInputRecord(resultEntry); + return doc; + } catch (final Throwable e) { + invalids.put(id, e.getClass().getName() + ": " + e.getMessage()); + log.error(e); + return null; + } + }) + .filter(Objects::nonNull) + .iterator(); + + solr.addRecords(iter); + solr.commit(); + + updateExecutionDate(list, invalids); + } + + log.info(String.format("Indexed records: %s", list.size())); + } catch (final Throwable e) { + log.error("The scheduled task is failed", e); } - log.info(String.format("Indexed records: %s", list.size())); - } @Scheduled(initialDelay = 10, fixedDelay = 30, timeUnit = TimeUnit.MINUTES) @@ -83,30 +99,42 @@ public class ScheduledActions { return; } - log.info("Deleting records from index..."); + try { + log.info("Deleting records from index..."); - final List list = pendingActionRepository.findDeleteOperations(); + final List list = pendingActionRepository.findDeleteOperations(); + final Map invalids = new HashMap<>(); - if (list.size() > 0) { - final SolrIndexClient solr = solrIndexClientFactory.getClient(); + if (list.size() > 0) { + final SolrIndexClient solr = solrIndexClientFactory.getClient(); - list.stream().map(PendingAction::getId).forEach(id -> { - try { - solr.deleteRecord(id); - } catch (final DirectIndexApiException e) { - log.error(e); - } - }); + list.stream().map(PendingAction::getId).forEach(id -> { + try { + solr.deleteRecord(id); + } catch (final Throwable e) { + invalids.put(id, e.getMessage()); + log.error(e); + } + }); - updateExecutionDate(list); + solr.commit(); + + updateExecutionDate(list, invalids); + } + + log.info(String.format("Deleted records: %s", list.size())); + + } catch (final Throwable e) { + log.error("The scheduled task is failed", e); } - - log.info(String.format("Deleted records: %s", list.size())); } - private void updateExecutionDate(final List list) { + private void updateExecutionDate(final List list, final Map invalids) { final LocalDateTime now = LocalDateTime.now(); - list.forEach(r -> r.setExecutionDate(now)); + list.forEach(r -> { + r.setError(invalids.getOrDefault(r.getId(), null)); + r.setExecutionDate(now); + }); pendingActionRepository.saveAll(list); } diff --git a/src/main/resources/sql/schema.sql b/src/main/resources/sql/schema.sql index f2933aa..383a5f8 100644 --- a/src/main/resources/sql/schema.sql +++ b/src/main/resources/sql/schema.sql @@ -7,5 +7,6 @@ CREATE TABLE pending_actions( body text, created_by text, creation_date timestamp with time zone NOT NULL DEFAULT now(), - execution_date timestamp with time zone + execution_date timestamp with time zone, + error text ); diff --git a/src/test/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapperTest.java b/src/test/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapperTest.java index 5207627..edf4a36 100644 --- a/src/test/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapperTest.java +++ b/src/test/java/eu/dnetlib/app/directindex/mapping/SolrRecordMapperTest.java @@ -20,7 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.app.directindex.clients.CommunityClient; import eu.dnetlib.app.directindex.clients.DatasourceManagerClient; import eu.dnetlib.app.directindex.clients.ProjectClient; -import eu.dnetlib.app.directindex.clients.ProjectInfo; +import eu.dnetlib.app.directindex.clients.ProjectClient.ProjectInfo; import eu.dnetlib.app.directindex.clients.VocabularyClient; import eu.dnetlib.app.directindex.errors.DirectIndexApiException; import eu.dnetlib.app.directindex.input.DatasourceEntry;