From b8f16d152fbcaa1d4f32b2d9c983e163564623a5 Mon Sep 17 00:00:00 2001 From: FabioISTI Date: Thu, 14 May 2020 18:04:29 +0200 Subject: [PATCH] Fault tolerance --- .../catalogues/gCat/oai_dc_full.json | 72 ++++++++++ .../gFeed/collectors/oai/OAIClient.java | 128 ++++++++++++------ .../oai/model/CommunicationException.java | 30 ++++ 3 files changed, 185 insertions(+), 45 deletions(-) create mode 100644 gCat-Controller/src/test/resources/org/gcube/data/publishing/gCatFeeder/catalogues/gCat/oai_dc_full.json create mode 100644 oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/model/CommunicationException.java diff --git a/gCat-Controller/src/test/resources/org/gcube/data/publishing/gCatFeeder/catalogues/gCat/oai_dc_full.json b/gCat-Controller/src/test/resources/org/gcube/data/publishing/gCatFeeder/catalogues/gCat/oai_dc_full.json new file mode 100644 index 0000000..f32c3d7 --- /dev/null +++ b/gCat-Controller/src/test/resources/org/gcube/data/publishing/gCatFeeder/catalogues/gCat/oai_dc_full.json @@ -0,0 +1,72 @@ +{ + "profile": "\n\n\n\t\n\t\tcontributor\n\t\tfalse\n\t\tString\n\t\t*\n\t\t\n\t\n\t\tcoverage\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tcreator\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tdate\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tdescription\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tformat\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tidentifier\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tlanguage\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tpublisher\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\trelation\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\trights\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tsource\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\tsubject\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\ttitle\n\t\tfalse\n\t\tString\n\t\t*\n\t\n\t\n\t\ttype\n\t\tfalse\n\t\tString\n\t\t*\n\t\n", + "item": { + "name": "doi_10_15454_000___", + "title": "INRA:Beet:AKER_8354", + "version": "n.a.", + "license_id": "CC-BY-NC-SA-4.0", + "author": "Portail Data Inra", + "maintainer": "Portail Data Inra", + "notes": "Abstract:AKER_8354 is a Beet accession from GnpIS.", + "tags": [ + { + "name": "OAI" + } + ], + "extras": [ + { + "key": "system:type", + "value": "Harvested Object" + }, + { + "key": "Harvested Object:contributor", + "value": "Rinnova" + }, + { + "key": "Harvested Object:creator", + "value": "GnpIS" + }, + { + "key": "Harvested Object:date", + "value": "2017-05-08" + }, + { + "key": "Harvested Object:description", + "value": "Abstract:AKER_8354 is a Beet accession from GnpIS." + }, + { + "key": "Harvested Object:identifier", + "value": "https://doi.org/10.15454/000PKT" + }, + { + "key": "Harvested Object:language", + "value": "English" + }, + { + "key": "Harvested Object:publisher", + "value": "Portail Data Inra" + }, + { + "key": "Harvested Object:subject", + "value": "Genetic Resource" + }, + { + "key": "Harvested Object:title", + "value": "INRA:Beet:AKER_8354" + }, + { + "key": "Harvested Object:type", + "value": "Physical Object" + } + ], + "private": false + }, + "resources": [ + { + "name": "Record", + "url": "https://doi.org/10.15454/000PKT", + "format": "https", + "description": "Original record" + } + ] +} \ No newline at end of file diff --git a/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/OAIClient.java b/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/OAIClient.java index 392a49b..c557130 100644 --- a/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/OAIClient.java +++ b/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/OAIClient.java @@ -15,6 +15,7 @@ import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.transform.stream.StreamSource; +import org.gcube.data.publishing.gFeed.collectors.oai.model.CommunicationException; import org.gcube.data.publishing.gFeed.collectors.oai.model.DCRecordMetadata; import org.gcube.data.publishing.gFeed.collectors.oai.model.MetadataHolder; import org.gcube.data.publishing.gFeed.collectors.oai.model.OAIInteractionException; @@ -33,26 +34,28 @@ import lombok.extern.slf4j.Slf4j; public class OAIClient { private static JAXBContext jaxbContext=null; + private static final int MAX_ATTEMPTS=3; + private static final long DELAY_FACTOR=1000; private static synchronized JAXBContext getContext() throws JAXBException { if(jaxbContext==null) - jaxbContext = JAXBContext.newInstance(OAIRecord.class, - MetadataHolder.class, - OAIMetadata.class, - DCRecordMetadata.class, - OAI_PMH.class); + jaxbContext = JAXBContext.newInstance(OAIRecord.class, + MetadataHolder.class, + OAIMetadata.class, + DCRecordMetadata.class, + OAI_PMH.class); return jaxbContext; } - - - + + + public static final String DC_METADATA_PREFIX="oai_dc"; - + @NonNull private String baseUrl; - - + + Client client; private synchronized Client getWebClient() { @@ -69,57 +72,92 @@ public class OAIClient { ArrayList toReturn=new ArrayList(); String resumptionToken=null; - + // call & iterate boolean isComplete=false; + int currentAttempt=1; while(!isComplete) { - - WebTarget target=getWebClient().target(baseUrl). - queryParam("verb","ListRecords"); - - - - if(resumptionToken==null) - target=target.queryParam("metadataPrefix",metadataPrefix); - else - target=target.queryParam("resumptionToken", resumptionToken); - - - Response resp=target.request("application/xml").get(); - - OAI_PMH msg=check(resp); + try { + WebTarget target=getWebClient().target(baseUrl). + queryParam("verb","ListRecords"); - if(msg.isError()) throw new OAIInteractionException(msg.getError().getCode()+ " : "+msg.getError().getMessage()); - - toReturn.addAll(msg.getResponseRecords().getRecords()); - - Token t=msg.getResponseRecords().getResumptionToken(); - log.debug("Obtained token : "+t); - if(t!=null && t.getId()!=null && !t.getId().isEmpty()) { - resumptionToken=t.getId(); - }else isComplete=true; //no token = completion + if(resumptionToken==null) + target=target.queryParam("metadataPrefix",metadataPrefix); + else + target=target.queryParam("resumptionToken", resumptionToken); + + + Response resp=target.request("application/xml").get(); + + OAI_PMH msg=check(resp); + //No errors, thus reset attempt counter + currentAttempt=1; + + if(msg.isError()) throw new OAIInteractionException(msg.getError().getCode()+ " : "+msg.getError().getMessage()); + + toReturn.addAll(msg.getResponseRecords().getRecords()); + log.debug("Parsed "+toReturn.size()+" records so far."); + + Token t=msg.getResponseRecords().getResumptionToken(); + log.debug("Obtained token : "+t); + + if(t!=null && t.getId()!=null && !t.getId().isEmpty()) { + resumptionToken=t.getId(); + }else isComplete=true; //no token = completion + +// }catch(CommunicationException e) { +// log.warn("Received communication error "+e.getMessage()); +// log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = "); +// isComplete=currentAttempt>MAX_ATTEMPTS; +// try { +// Thread.sleep(currentAttempt*DELAY_FACTOR); +// } catch (InterruptedException e1) {} +// currentAttempt++; +// +// }catch(OAIInteractionException e) { +// log.warn("Remote OAI "+baseUrl+" didn't accept request ",e); +// log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = "); +// isComplete=currentAttempt>MAX_ATTEMPTS; +// try { +// Thread.sleep(currentAttempt*DELAY_FACTOR); +// } catch (InterruptedException e1) {} +// currentAttempt++; + }catch(Throwable t) { +// throw new OAIInteractionException("Unexpected error while harvesting "+baseUrl,t); + log.warn("Unexpected ERROR "+t.getMessage()); + log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = "); + isComplete=currentAttempt>MAX_ATTEMPTS; + try { + Thread.sleep(currentAttempt*DELAY_FACTOR); + } catch (InterruptedException e1) {} + currentAttempt++; + } } + log.trace("Obtained "+toReturn.size()+" from "+baseUrl); return toReturn; } + private void retry() { + + } - private static OAI_PMH check(Response resp) throws JAXBException { + private static OAI_PMH check(Response resp) throws JAXBException, CommunicationException { if(resp.getStatus()<200||resp.getStatus()>=300) { // exception - throw new RuntimeException("Implement fault"); + throw new CommunicationException("Received error message. STATUS "+resp.getStatus()+ ", message : "+resp.readEntity(String.class)); }else { - - + + String respString=resp.readEntity(String.class); Unmarshaller jaxbUnmarshaller = getContext().createUnmarshaller(); OAI_PMH obj=(OAI_PMH) jaxbUnmarshaller.unmarshal(new StringReader(respString)); - + return obj; - -// OAI_PMH response = (OAI_PMH) jaxbUnmarshaller.unmarshal( -// new StreamSource(new StringReader(respString))); - + + // OAI_PMH response = (OAI_PMH) jaxbUnmarshaller.unmarshal( + // new StreamSource(new StringReader(respString))); + } } diff --git a/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/model/CommunicationException.java b/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/model/CommunicationException.java new file mode 100644 index 0000000..be2c622 --- /dev/null +++ b/oai-harvester/src/main/java/org/gcube/data/publishing/gFeed/collectors/oai/model/CommunicationException.java @@ -0,0 +1,30 @@ +package org.gcube.data.publishing.gFeed.collectors.oai.model; + +public class CommunicationException extends Exception { + + public CommunicationException() { + // TODO Auto-generated constructor stub + } + + public CommunicationException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public CommunicationException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + + public CommunicationException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public CommunicationException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + // TODO Auto-generated constructor stub + } + +}