Fault tolerance
This commit is contained in:
parent
058925de0e
commit
b8f16d152f
|
@ -0,0 +1,72 @@
|
||||||
|
{
|
||||||
|
"profile": "<!--\n/elements/1.1\n\n contributor, coverage, creator, date, description, format, \n\tidentifier, language, publisher, relation, rights, source, subject, \n\ttitle, type -->\n\n<metadataformat type=\"Harvested Object\">\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>contributor</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\t\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>coverage</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>creator</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>date</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>description</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>format</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>identifier</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>language</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>publisher</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>relation</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>rights</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>source</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>subject</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>title</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n\t<metadatafield categoryref=\"Harvested Object\">\n\t\t<fieldName>type</fieldName>\n\t\t<mandatory>false</mandatory>\n\t\t<dataType>String</dataType>\n\t\t<maxOccurs>*</maxOccurs>\n\t</metadatafield>\n</metadataformat>",
|
||||||
|
"item": {
|
||||||
|
"name": "doi_10_15454_000___",
|
||||||
|
"title": "INRA:Beet:AKER_8354",
|
||||||
|
"version": "n.a.",
|
||||||
|
"license_id": "CC-BY-NC-SA-4.0",
|
||||||
|
"author": "Portail Data Inra",
|
||||||
|
"maintainer": "Portail Data Inra",
|
||||||
|
"notes": "Abstract:AKER_8354 is a Beet accession from GnpIS.",
|
||||||
|
"tags": [
|
||||||
|
{
|
||||||
|
"name": "OAI"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"extras": [
|
||||||
|
{
|
||||||
|
"key": "system:type",
|
||||||
|
"value": "Harvested Object"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:contributor",
|
||||||
|
"value": "Rinnova"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:creator",
|
||||||
|
"value": "GnpIS"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:date",
|
||||||
|
"value": "2017-05-08"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:description",
|
||||||
|
"value": "Abstract:AKER_8354 is a Beet accession from GnpIS."
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:identifier",
|
||||||
|
"value": "https://doi.org/10.15454/000PKT"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:language",
|
||||||
|
"value": "English"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:publisher",
|
||||||
|
"value": "Portail Data Inra"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:subject",
|
||||||
|
"value": "Genetic Resource"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:title",
|
||||||
|
"value": "INRA:Beet:AKER_8354"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"key": "Harvested Object:type",
|
||||||
|
"value": "Physical Object"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"private": false
|
||||||
|
},
|
||||||
|
"resources": [
|
||||||
|
{
|
||||||
|
"name": "Record",
|
||||||
|
"url": "https://doi.org/10.15454/000PKT",
|
||||||
|
"format": "https",
|
||||||
|
"description": "Original record"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -15,6 +15,7 @@ import javax.xml.bind.JAXBException;
|
||||||
import javax.xml.bind.Unmarshaller;
|
import javax.xml.bind.Unmarshaller;
|
||||||
import javax.xml.transform.stream.StreamSource;
|
import javax.xml.transform.stream.StreamSource;
|
||||||
|
|
||||||
|
import org.gcube.data.publishing.gFeed.collectors.oai.model.CommunicationException;
|
||||||
import org.gcube.data.publishing.gFeed.collectors.oai.model.DCRecordMetadata;
|
import org.gcube.data.publishing.gFeed.collectors.oai.model.DCRecordMetadata;
|
||||||
import org.gcube.data.publishing.gFeed.collectors.oai.model.MetadataHolder;
|
import org.gcube.data.publishing.gFeed.collectors.oai.model.MetadataHolder;
|
||||||
import org.gcube.data.publishing.gFeed.collectors.oai.model.OAIInteractionException;
|
import org.gcube.data.publishing.gFeed.collectors.oai.model.OAIInteractionException;
|
||||||
|
@ -33,26 +34,28 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
public class OAIClient {
|
public class OAIClient {
|
||||||
|
|
||||||
private static JAXBContext jaxbContext=null;
|
private static JAXBContext jaxbContext=null;
|
||||||
|
private static final int MAX_ATTEMPTS=3;
|
||||||
|
private static final long DELAY_FACTOR=1000;
|
||||||
|
|
||||||
|
|
||||||
private static synchronized JAXBContext getContext() throws JAXBException {
|
private static synchronized JAXBContext getContext() throws JAXBException {
|
||||||
if(jaxbContext==null)
|
if(jaxbContext==null)
|
||||||
jaxbContext = JAXBContext.newInstance(OAIRecord.class,
|
jaxbContext = JAXBContext.newInstance(OAIRecord.class,
|
||||||
MetadataHolder.class,
|
MetadataHolder.class,
|
||||||
OAIMetadata.class,
|
OAIMetadata.class,
|
||||||
DCRecordMetadata.class,
|
DCRecordMetadata.class,
|
||||||
OAI_PMH.class);
|
OAI_PMH.class);
|
||||||
return jaxbContext;
|
return jaxbContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public static final String DC_METADATA_PREFIX="oai_dc";
|
public static final String DC_METADATA_PREFIX="oai_dc";
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
private String baseUrl;
|
private String baseUrl;
|
||||||
|
|
||||||
|
|
||||||
Client client;
|
Client client;
|
||||||
|
|
||||||
private synchronized Client getWebClient() {
|
private synchronized Client getWebClient() {
|
||||||
|
@ -69,57 +72,92 @@ public class OAIClient {
|
||||||
ArrayList<OAIRecord> toReturn=new ArrayList<OAIRecord>();
|
ArrayList<OAIRecord> toReturn=new ArrayList<OAIRecord>();
|
||||||
|
|
||||||
String resumptionToken=null;
|
String resumptionToken=null;
|
||||||
|
|
||||||
// call & iterate
|
// call & iterate
|
||||||
boolean isComplete=false;
|
boolean isComplete=false;
|
||||||
|
int currentAttempt=1;
|
||||||
while(!isComplete) {
|
while(!isComplete) {
|
||||||
|
try {
|
||||||
WebTarget target=getWebClient().target(baseUrl).
|
WebTarget target=getWebClient().target(baseUrl).
|
||||||
queryParam("verb","ListRecords");
|
queryParam("verb","ListRecords");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if(resumptionToken==null)
|
|
||||||
target=target.queryParam("metadataPrefix",metadataPrefix);
|
|
||||||
else
|
|
||||||
target=target.queryParam("resumptionToken", resumptionToken);
|
|
||||||
|
|
||||||
|
|
||||||
Response resp=target.request("application/xml").get();
|
|
||||||
|
|
||||||
OAI_PMH msg=check(resp);
|
|
||||||
|
|
||||||
if(msg.isError()) throw new OAIInteractionException(msg.getError().getCode()+ " : "+msg.getError().getMessage());
|
if(resumptionToken==null)
|
||||||
|
target=target.queryParam("metadataPrefix",metadataPrefix);
|
||||||
toReturn.addAll(msg.getResponseRecords().getRecords());
|
else
|
||||||
|
target=target.queryParam("resumptionToken", resumptionToken);
|
||||||
Token t=msg.getResponseRecords().getResumptionToken();
|
|
||||||
log.debug("Obtained token : "+t);
|
|
||||||
if(t!=null && t.getId()!=null && !t.getId().isEmpty()) {
|
Response resp=target.request("application/xml").get();
|
||||||
resumptionToken=t.getId();
|
|
||||||
}else isComplete=true; //no token = completion
|
OAI_PMH msg=check(resp);
|
||||||
|
//No errors, thus reset attempt counter
|
||||||
|
currentAttempt=1;
|
||||||
|
|
||||||
|
if(msg.isError()) throw new OAIInteractionException(msg.getError().getCode()+ " : "+msg.getError().getMessage());
|
||||||
|
|
||||||
|
toReturn.addAll(msg.getResponseRecords().getRecords());
|
||||||
|
log.debug("Parsed "+toReturn.size()+" records so far.");
|
||||||
|
|
||||||
|
Token t=msg.getResponseRecords().getResumptionToken();
|
||||||
|
log.debug("Obtained token : "+t);
|
||||||
|
|
||||||
|
if(t!=null && t.getId()!=null && !t.getId().isEmpty()) {
|
||||||
|
resumptionToken=t.getId();
|
||||||
|
}else isComplete=true; //no token = completion
|
||||||
|
|
||||||
|
// }catch(CommunicationException e) {
|
||||||
|
// log.warn("Received communication error "+e.getMessage());
|
||||||
|
// log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = ");
|
||||||
|
// isComplete=currentAttempt>MAX_ATTEMPTS;
|
||||||
|
// try {
|
||||||
|
// Thread.sleep(currentAttempt*DELAY_FACTOR);
|
||||||
|
// } catch (InterruptedException e1) {}
|
||||||
|
// currentAttempt++;
|
||||||
|
//
|
||||||
|
// }catch(OAIInteractionException e) {
|
||||||
|
// log.warn("Remote OAI "+baseUrl+" didn't accept request ",e);
|
||||||
|
// log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = ");
|
||||||
|
// isComplete=currentAttempt>MAX_ATTEMPTS;
|
||||||
|
// try {
|
||||||
|
// Thread.sleep(currentAttempt*DELAY_FACTOR);
|
||||||
|
// } catch (InterruptedException e1) {}
|
||||||
|
// currentAttempt++;
|
||||||
|
}catch(Throwable t) {
|
||||||
|
// throw new OAIInteractionException("Unexpected error while harvesting "+baseUrl,t);
|
||||||
|
log.warn("Unexpected ERROR "+t.getMessage());
|
||||||
|
log.debug("Current attempt number = "+currentAttempt," max attempt Number = "+MAX_ATTEMPTS+", attempts delay factor = ");
|
||||||
|
isComplete=currentAttempt>MAX_ATTEMPTS;
|
||||||
|
try {
|
||||||
|
Thread.sleep(currentAttempt*DELAY_FACTOR);
|
||||||
|
} catch (InterruptedException e1) {}
|
||||||
|
currentAttempt++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
log.trace("Obtained "+toReturn.size()+" from "+baseUrl);
|
||||||
return toReturn;
|
return toReturn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void retry() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private static OAI_PMH check(Response resp) throws JAXBException {
|
private static OAI_PMH check(Response resp) throws JAXBException, CommunicationException {
|
||||||
if(resp.getStatus()<200||resp.getStatus()>=300) {
|
if(resp.getStatus()<200||resp.getStatus()>=300) {
|
||||||
// exception
|
// exception
|
||||||
throw new RuntimeException("Implement fault");
|
throw new CommunicationException("Received error message. STATUS "+resp.getStatus()+ ", message : "+resp.readEntity(String.class));
|
||||||
}else {
|
}else {
|
||||||
|
|
||||||
|
|
||||||
String respString=resp.readEntity(String.class);
|
String respString=resp.readEntity(String.class);
|
||||||
Unmarshaller jaxbUnmarshaller = getContext().createUnmarshaller();
|
Unmarshaller jaxbUnmarshaller = getContext().createUnmarshaller();
|
||||||
OAI_PMH obj=(OAI_PMH) jaxbUnmarshaller.unmarshal(new StringReader(respString));
|
OAI_PMH obj=(OAI_PMH) jaxbUnmarshaller.unmarshal(new StringReader(respString));
|
||||||
|
|
||||||
return obj;
|
return obj;
|
||||||
|
|
||||||
// OAI_PMH response = (OAI_PMH) jaxbUnmarshaller.unmarshal(
|
// OAI_PMH response = (OAI_PMH) jaxbUnmarshaller.unmarshal(
|
||||||
// new StreamSource(new StringReader(respString)));
|
// new StreamSource(new StringReader(respString)));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package org.gcube.data.publishing.gFeed.collectors.oai.model;
|
||||||
|
|
||||||
|
public class CommunicationException extends Exception {
|
||||||
|
|
||||||
|
public CommunicationException() {
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommunicationException(String message) {
|
||||||
|
super(message);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommunicationException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommunicationException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
public CommunicationException(String message, Throwable cause, boolean enableSuppression,
|
||||||
|
boolean writableStackTrace) {
|
||||||
|
super(message, cause, enableSuppression, writableStackTrace);
|
||||||
|
// TODO Auto-generated constructor stub
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue