added custom plugin to ads datasource

This commit is contained in:
Enrico Ottonello 2019-09-19 15:09:44 +02:00
parent c06c0cb7a1
commit 05209e8393
7 changed files with 583 additions and 0 deletions

View File

@ -0,0 +1,281 @@
package eu.dnetlib.ariadneplus.workflows.nodes;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import com.google.common.collect.Maps;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
import eu.dnetlib.miscutils.functional.xml.SaxonHelper;
import eu.dnetlib.miscutils.functional.xml.XMLIndenter;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.nodes.transform.ApplyX3Mapping;
import eu.dnetlib.msro.workflows.nodes.transform.X3MTransformJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.enabling.ISLookUpException;
import eu.dnetlib.rmi.enabling.ISLookUpService;
import eu.dnetlib.rmi.manager.MSROException;
import net.sf.saxon.s9api.SaxonApiException;
import net.sf.saxon.s9api.Serializer.Property;
import net.sf.saxon.s9api.XPathSelector;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
public class X3MTransformAriadnePlusJobNode extends X3MTransformJobNode {
private static final Log log = LogFactory.getLog(X3MTransformAriadnePlusJobNode.class);
private static final String OAI_NAMESPACE_URI = "http://www.openarchives.org/OAI/2.0/";
private static final String DRI_NAMESPACE_URI = "http://www.driver-repository.eu/namespace/dri";
private String inputEprParam;
private String outputEprParam;
private String mappingPolicyProfileId;
private String mappingProfileIds;
private String mappingUrl;
private boolean verboseLogging;
private XPathSelector xpathSelectorMetadata;
private XPathSelector xpathSelectorHeader;
private XPathSelector xpathSelectorFooter;
private XPathSelector xpathSelectorObjIdentifier;
/**
* true to pass the full record to X3m-engine. False to pass only what's in the metadata section.
**/
private boolean passFullRecord;
@Autowired
private ResultSetFactory resultSetFactory;
@Autowired
private UniqueServiceLocator serviceLocator;
@Autowired
private SaxonHelper saxonHelper;
@Override
protected String execute(final Env env) throws Exception {
log.info("Mapping profile ids read from node configuration: " + mappingProfileIds);
log.info("Mapping Policy profile id read from node configuration: " + mappingPolicyProfileId);
log.info("Mapping url read from node configuration: " + mappingUrl);
final String[] mappings = getMappingsCode(mappingProfileIds.split(","));
final String policy = getProfileCode(mappingPolicyProfileId);
LocalDateTime now = LocalDateTime.now();
final ResultSet<?> rsIn = env.getAttribute(this.inputEprParam, ResultSet.class);
if ((rsIn == null)) { throw new MSROException("InputEprParam (" + this.inputEprParam + ") not found in ENV"); }
prepareXpathSelectors();
final ResultSet<String> rsOut = this.resultSetFactory.map(rsIn, String.class, record -> {
//JUST FOR DEBUGGING THE TIMEOUT OF THE MONGO CURSOR: is there a metadata record that it is really slow to transform?
if(log.isDebugEnabled()) {
String objIdentifier = extractFromRecord(record, xpathSelectorObjIdentifier);
log.debug("Transforming record objIdentifier: " + objIdentifier);
}
ApplyX3Mapping mappingFunction = new ApplyX3Mapping(mappings, policy, verboseLogging);
String toTransform = record;
Instant startExtraction = Instant.now();
if(!isPassFullRecord()) {
log.debug("Extracting XML from the metadata block");
toTransform = extractFromRecord(record, xpathSelectorMetadata);
}
String header = extractFromRecord(record, xpathSelectorHeader);
String provenanceFooter = extractFromRecord(record, xpathSelectorFooter);
Instant endExtraction = Instant.now();
Instant startTransform = Instant.now();
String transformed = mappingFunction.apply(toTransform);
Instant endTransform = Instant.now();
if(log.isDebugEnabled()){
log.debug("Extraction took "+ Duration.between(startExtraction, endExtraction).toMillis()+" ms");
log.debug("Transformation took "+ Duration.between(startTransform, endTransform).toMillis()+" ms");
log.debug("Total mapping time: "+Duration.between(startExtraction, endTransform).toMillis()+" ms");
}
String res = buildXML(header, now.toString(), transformed, provenanceFooter);
if(log.isDebugEnabled()) {
log.debug("SOURCE:\n"+toTransform);
log.debug("TRANFORMED:\n"+res);
}
return res;
});
env.setAttribute(this.outputEprParam, rsOut);
return Arc.DEFAULT_ARC;
}
private String[] getMappingsCode(String[] mappingIds) throws ISLookUpException {
String[] mappings = new String[mappingIds.length];
for(int i =0; i < mappingIds.length; i++){
mappings[i] = getProfileCode(mappingIds[i]);
}
return mappings;
}
protected String buildXML(final String header, final String transformationDate, final String metadata, final String provenance) {
Instant start = Instant.now();
try {
XMLIndenter xmlHelper = new XMLIndenter();
DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
// root elements
Document doc = docBuilder.newDocument();
Element rootElement = doc.createElementNS(OAI_NAMESPACE_URI, "oai:record");
Element headerElem = docBuilder.parse(IOUtils.toInputStream(header, "UTF-8")).getDocumentElement();
Node headerNode = doc.importNode(headerElem, true);
rootElement.appendChild(headerNode);
Element transDate = doc.createElementNS(DRI_NAMESPACE_URI, "dri:dateOfTransformation");
transDate.setTextContent(transformationDate);
headerNode.appendChild(transDate);
Element metadataElement = doc.createElementNS(OAI_NAMESPACE_URI, "oai:metadata");
Element contentElem = docBuilder.parse(IOUtils.toInputStream(metadata, "UTF-8")).getDocumentElement();
Node contentNode = doc.importNode(contentElem, true);
metadataElement.appendChild(contentNode);
rootElement.appendChild(metadataElement);
Element aboutElem = docBuilder.parse(IOUtils.toInputStream(provenance, "UTF-8")).getDocumentElement();
Node aboutNode = doc.importNode(aboutElem, true);
rootElement.appendChild(aboutNode);
doc.appendChild(rootElement);
Instant startIndent = Instant.now();
String res = xmlHelper.indent(doc);
Instant end = Instant.now();
if(log.isDebugEnabled()){
log.debug("XML built in "+ Duration.between(start, end).toMillis()+" ms");
log.debug("Serialization with indent took "+ Duration.between(startIndent, end).toMillis()+" ms");
}
return res;
} catch (Exception e) {
throw new RuntimeException("Cannot build the transformed xml file", e);
}
}
private void prepareXpathSelectors() throws SaxonApiException {
Map<String, String> namespaces = Maps.newHashMap();
namespaces.put("oai", OAI_NAMESPACE_URI);
namespaces.put("dri", DRI_NAMESPACE_URI);
xpathSelectorHeader = this.saxonHelper.help().prepareXPathSelector("//oai:header", namespaces);
xpathSelectorMetadata = this.saxonHelper.help().prepareXPathSelector("//oai:metadata/*", namespaces);
xpathSelectorFooter = this.saxonHelper.help().prepareXPathSelector("//oai:about", namespaces);
xpathSelectorObjIdentifier = this.saxonHelper.help().prepareXPathSelector("//oai:header/*[local-name()='objIdentifier']/text()", namespaces);
}
private String extractFromRecord(final String record, final XPathSelector xPathSelector) {
try {
return this.saxonHelper.help().setSerializerProperty(Property.OMIT_XML_DECLARATION, "yes").evaluateSingleAsString(record, xPathSelector);
} catch (SaxonApiException e) {
throw new RuntimeException("Cannot extract content ", e);
}
}
private String getProfileCode(String profId) throws ISLookUpException {
if (StringUtils.isBlank(profId)) return null;
String xquery = "string(collection('/db/DRIVER/TransformationRuleDSResources')//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value ='" +
profId + "']//CODE)";
List<String> res = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
if (res.isEmpty() || StringUtils.isBlank(res.get(0))) {
throw new RuntimeException("Can't find transformation rule CODE for " + profId);
}
return res.get(0);
}
public String getInputEprParam() {
return this.inputEprParam;
}
public void setInputEprParam(final String inputEprParam) {
this.inputEprParam = inputEprParam;
}
public String getOutputEprParam() {
return this.outputEprParam;
}
public void setOutputEprParam(final String outputEprParam) {
this.outputEprParam = outputEprParam;
}
public String getMappingPolicyProfileId() {
return mappingPolicyProfileId;
}
public void setMappingPolicyProfileId(final String mappingPolicyProfileId) {
this.mappingPolicyProfileId = mappingPolicyProfileId;
}
public String getMappingProfileIds() {
return mappingProfileIds;
}
public void setMappingProfileIds(final String mappingProfileIds) {
this.mappingProfileIds = mappingProfileIds;
}
public boolean isVerboseLogging() {
return verboseLogging;
}
public void setVerboseLogging(final boolean verboseLogging) {
this.verboseLogging = verboseLogging;
}
public ResultSetFactory getResultSetFactory() {
return resultSetFactory;
}
public void setResultSetFactory(final ResultSetFactory resultSetFactory) {
this.resultSetFactory = resultSetFactory;
}
public UniqueServiceLocator getServiceLocator() {
return serviceLocator;
}
public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
this.serviceLocator = serviceLocator;
}
public boolean isPassFullRecord() {
return passFullRecord;
}
public void setPassFullRecord(final boolean passFullRecord) {
this.passFullRecord = passFullRecord;
}
public String getMappingUrl() {
return mappingUrl;
}
public void setMappingUrl(String mappingUrl) {
this.mappingUrl = mappingUrl;
}
private String[] getMappingsFromUrl(String url) {
String[] mappings = new String[1];
return mappings;
}
}

View File

@ -0,0 +1,27 @@
package eu.dnetlib.data.collector.plugins.ariadneplus.ads;
import eu.dnetlib.data.collector.plugins.FileCollectorPlugin;
import eu.dnetlib.rmi.data.CollectorServiceException;
import eu.dnetlib.rmi.data.InterfaceDescriptor;
public class ADSCollectorPlugin extends FileCollectorPlugin {
@Override
public Iterable<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
throws CollectorServiceException {
final String baseUrl = interfaceDescriptor.getBaseUrl();
final String suffixToUrl = interfaceDescriptor.getParams().get("suffixToBaseUrl");
return () -> {
try {
return new ADSIterator(super.collect(interfaceDescriptor, fromDate, untilDate).iterator(), baseUrl, suffixToUrl);
} catch (CollectorServiceException e) {
throw new RuntimeException(e);
}
} ;
}
}

View File

@ -0,0 +1,92 @@
package eu.dnetlib.data.collector.plugins.ariadneplus.ads;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
import com.ximpleware.*;
import eu.dnetlib.data.collector.ThreadSafeIterator;
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class ADSIterator extends ThreadSafeIterator {
private static final Log log = LogFactory.getLog(ADSIterator.class);
private Iterator<String> identifiers;
private String baseUrl;
private String suffix;
public ADSIterator(final Iterator<String> idIterator, final String baseUrl, final String suffix){
this.identifiers = idIterator;
this.baseUrl = baseUrl;
this.suffix = suffix;
}
@Override
public boolean doHasNext() {
return identifiers.hasNext();
}
@Override
public String doNext() {
String record = identifiers.next();
try {
return addADSNamespace(record);
} catch (Exception e) {
if(this.hasNext()){
return this.next();
}
else return "";
}
}
protected String addADSNamespace(final String xml) {
String namespaceList = " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:dc=\"http://purl.org/dc/elements/1.1/\"\n" +
" xmlns:dcterms=\"http://purl.org/dc/terms/\"\n" +
" xmlns:ads=\"https://archaeologydataservice.ac.uk/\"";
try {
VTDGen vg = new VTDGen();
vg.setDoc(xml.getBytes());
vg.parse(false); // namespace unaware to all name space nodes addressable using xpath @*
VTDNav vn = vg.getNav();
XMLModifier xm = new XMLModifier(vn);
byte[] attrBytes = namespaceList.getBytes();
vn.toElement(VTDNav.ROOT);
xm.insertAttribute(attrBytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
xm.output(baos);
return baos.toString();
} catch(ParseException | ModifyException | NavException | IOException | TranscodeException e){
log.error("Cannot add namespace declarations to element: "+xml);
throw new CollectorServiceRuntimeException("Cannot add namespace declarations to element", e);
}
}
public Iterator<String> getIdentifiers() {
return identifiers;
}
public void setIdentifiers(final Iterator<String> identifiers) {
this.identifiers = identifiers;
}
public String getBaseUrl() {
return baseUrl;
}
public void setBaseUrl(final String baseUrl) {
this.baseUrl = baseUrl;
}
public String getSuffix() {
return suffix;
}
public void setSuffix(final String suffix) {
this.suffix = suffix;
}
}

View File

@ -0,0 +1,63 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="8dbee0c0-cf01-4450-95a3-b1fb39353a8a_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU="/>
<RESOURCE_TYPE value="RepositoryServiceResourceType"/>
<RESOURCE_KIND value="RepositoryServiceResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2019-09-19T12:42:16+02:00"/>
<PROTOCOL/>
</HEADER>
<BODY>
<CONFIGURATION>
<DATASOURCE_TYPE>dnet:repository</DATASOURCE_TYPE>
<DATASOURCE_ORIGINAL_ID provenance="D-NET">ariadneplus_::ads</DATASOURCE_ORIGINAL_ID>
<DATASOURCE_AGGREGATED>false</DATASOURCE_AGGREGATED>
<ENVIRONMENTS>
<ENVIRONMENT/>
</ENVIRONMENTS>
<TYPOLOGY>null</TYPOLOGY>
<MAX_SIZE_OF_DATASTRUCTURE>0</MAX_SIZE_OF_DATASTRUCTURE>
<AVAILABLE_DISKSPACE>0</AVAILABLE_DISKSPACE>
<MAX_NUMBER_OF_DATASTRUCTURE>0</MAX_NUMBER_OF_DATASTRUCTURE>
<OFFICIAL_NAME>ADS</OFFICIAL_NAME>
<ENGLISH_NAME>ADS</ENGLISH_NAME>
<ICON_URI>null</ICON_URI>
<COUNTRY>EU</COUNTRY>
<LOCATION>
<LONGITUDE>0.0</LONGITUDE>
<LATITUDE>0.0</LATITUDE>
<TIMEZONE>0.0</TIMEZONE>
</LOCATION>
<REPOSITORY_WEBPAGE>http://</REPOSITORY_WEBPAGE>
<REPOSITORY_INSTITUTION>ADS</REPOSITORY_INSTITUTION>
<ADMIN_INFO>alessia.bardi@isti.cnr.it</ADMIN_INFO>
<INTERFACES>
<INTERFACE active="true" compliance="metadata" contentDescription="metadata" id="api_________::ariadneplus_::ads" label="dnet:repository (metadata)" removable="false" typology="dnet:repository">
<ACCESS_PROTOCOL splitOnElement="record" suffixToBaseUrl="bb">ads</ACCESS_PROTOCOL>
<BASE_URL>file:///var/lib/dnet/ads/ariadne_398_part1.xml</BASE_URL>
<INTERFACE_EXTRA_FIELD name="last_collection_date">2019-09-19T12:42:16+02:00</INTERFACE_EXTRA_FIELD>
<INTERFACE_EXTRA_FIELD name="last_collection_mdId">8098455d-e3a4-499b-adf3-4bb55574646f_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==</INTERFACE_EXTRA_FIELD>
<INTERFACE_EXTRA_FIELD name="last_collection_total">100000</INTERFACE_EXTRA_FIELD>
<INTERFACE_EXTRA_FIELD name="metadata_identifier_path">//*[local-name()='record']/*[namespace-uri()='http://purl.org/dc/elements/1.1/' and local-name()='identifier'][2]</INTERFACE_EXTRA_FIELD>
</INTERFACE>
</INTERFACES>
<EXTRA_FIELDS>
<FIELD>
<key>NamespacePrefix</key>
<value>ariadneplus_</value>
</FIELD>
</EXTRA_FIELDS>
<REGISTERED_BY/>
</CONFIGURATION>
<STATUS>
<NUMBER_OF_OBJECTS>0</NUMBER_OF_OBJECTS>
</STATUS>
<QOS>
<AVAILABILITY>0</AVAILABILITY>
<CAPACITY/>
<THROUGHPUT>0</THROUGHPUT>
</QOS>
<SECURITY_PARAMETERS/>
<BLACKBOARD/>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:http="http://cxf.apache.org/transports/http/configuration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="adsCollectorPlugin" class="eu.dnetlib.data.collector.plugins.ariadneplus.ads.ADSCollectorPlugin">
<property name="protocolDescriptor">
<bean class="eu.dnetlib.rmi.data.ProtocolDescriptor" p:name="ads">
<property name="params">
<list>
<bean class="eu.dnetlib.rmi.data.ProtocolParameter"
p:name="suffixToBaseUrl"/>
<bean class="eu.dnetlib.rmi.data.ProtocolParameter"
p:name="splitOnElement"/>
</list>
</property>
</bean>
</property>
</bean>
</beans>

View File

@ -0,0 +1,41 @@
package eu.dnetlib.data.collector.plugins.ariadneplus;
import com.google.common.collect.Lists;
import eu.dnetlib.data.collector.plugins.HttpSimpleCollectorPlugin;
import eu.dnetlib.data.collector.plugins.oai.engine.HttpConnector;
import eu.dnetlib.rmi.data.CollectorServiceException;
import eu.dnetlib.rmi.data.InterfaceDescriptor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Ignore
public class MappingDownloadTest {
private InterfaceDescriptor apiDescriptor;
private HttpSimpleCollectorPlugin plugin;
private HttpConnector connector;
@Before
public void prepare(){
connector = new HttpConnector();
plugin = new HttpSimpleCollectorPlugin();
plugin.setHttpConnector(connector);
apiDescriptor = new InterfaceDescriptor();
}
@Test
public void test() throws CollectorServiceException {
apiDescriptor.setBaseUrl("http://data.d4science.org/em1EemhBdUZ0bjNGTWJNNjlxVDltcm9acDFmMHlBSVVHbWJQNStIS0N6Yz0");
Iterable<String> res = plugin.collect(apiDescriptor, null, null);
Assert.assertTrue(Lists.newArrayList(res).size() == 1);
for(String r : res){
System.out.println(r);
Assert.assertNotNull(r);
}
}
}

View File

@ -0,0 +1,57 @@
package eu.dnetlib.data.collector.plugins.ariadneplus.ads;
import com.ximpleware.*;
import com.ximpleware.EOFException;
import java.io.*;
import org.junit.Assert;
import org.junit.Test;
public class PreProcessRecordTest {
@Test
public void preprocessXMLRecord2() throws EncodingException, EOFException, EntityException, ParseException, ModifyException, XPathParseException, XPathEvalException, NavException, IOException, TranscodeException {
String namespaceList = " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:dc=\"http://purl.org/dc/elements/1.1/\"\n" +
" xmlns:dcterms=\"http://purl.org/dc/terms/\"\n" +
" xmlns:ads=\"https://archaeologydataservice.ac.uk/\"";
System.out.println("namespaceList: "+ namespaceList + "\n");
// TODO Auto-generated method stub
VTDGen vg = new VTDGen();
String xml="<record> " +
"<dc:title>Part of a cropmark with...</dc:title>\n" +
"<dc:creator>Historic England</dc:creator>\n" +
"<dc:subjectPeriod>\n" +
" <dc:subject>CROPMARK</dc:subject>\n" +
" <dcterms:temporal>PALAEOLITHIC</dcterms:temporal>\n" +
"</dc:subjectPeriod>\n" +
"<dc:subjectPeriod>\n" +
" <dc:subject>HOUSE</dc:subject>\n" +
"</dc:subjectPeriod>\n" +
"<dc:description>Part of a cropmark with internal markings seen on air photograph. Field investigation in 1957 found the marks to be due to natural undulations and vegetational changes, and no traces of antiquity were observed.</dc:description>\n" +
"<dc:identifier>Depositor ID: NT 70 NE 1</dc:identifier>\n" +
"<dc:identifier>NMR_NATINV-3</dc:identifier>\n" +
"<dc:source>https://archaeologydataservice.ac.uk/archsearch/record?titleId=967971</dc:source>\n" +
"<dc:language xsi:type=\"dcterms:ISO639-2\">eng</dc:language>\n" +
"<dcterms:spatial>ALWINTON, ALNWICK, NORTHUMBERLAND, ENGLAND</dcterms:spatial>\n" +
"<dcterms:spatial xsi:type=\"dcterms:POINT\">-2.758696, 55.945225</dcterms:spatial>\n" +
"<dc:rights>http://archaeologydataservice.ac.uk/advice/termsOfUseAndAccess</dc:rights>\n" +
"</record> ";
System.out.println("original record: "+xml + "\n");
vg.setDoc(xml.getBytes());
vg.parse(false); // namespace unaware to all name space nodes addressable using xpath @*
VTDNav vn = vg.getNav();
XMLModifier xm = new XMLModifier(vn);
byte[] attrBytes = namespaceList.getBytes();
vn.toElement(VTDNav.ROOT);
xm.insertAttribute(attrBytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
xm.output(baos);
System.out.println("preprocessed record: " + baos.toString() + "\n");
Assert.assertNotNull(baos);
}
}