diff --git a/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ElasticSearchIndexJobNode.java b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ElasticSearchIndexJobNode.java new file mode 100644 index 0000000..0c1efb7 --- /dev/null +++ b/dnet-ariadneplus/src/main/java/eu/dnetlib/ariadneplus/workflows/nodes/ElasticSearchIndexJobNode.java @@ -0,0 +1,342 @@ +package eu.dnetlib.ariadneplus.workflows.nodes; + +import eu.dnetlib.ariadneplus.rdf.RecordParserHelper; +import eu.dnetlib.enabling.locators.UniqueServiceLocator; +import eu.dnetlib.enabling.resultset.client.ResultSetClient; +import eu.dnetlib.enabling.resultset.factory.ResultSetFactory; +import eu.dnetlib.msro.workflows.graph.Arc; +import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; +import eu.dnetlib.msro.workflows.procs.Env; +import eu.dnetlib.rmi.common.ResultSet; +import eu.dnetlib.rmi.manager.MSROException; +import net.sf.saxon.s9api.SaxonApiException; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.annotation.Autowired; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import org.springframework.core.io.ClassPathResource; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.hp.hpl.jena.ontology.OntModel; +import com.hp.hpl.jena.ontology.OntModelSpec; +import com.hp.hpl.jena.rdf.model.InfModel; +import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.Property; +import com.hp.hpl.jena.rdf.model.RDFNode; +import com.hp.hpl.jena.rdf.model.ResIterator; +import com.hp.hpl.jena.rdf.model.Resource; +import com.hp.hpl.jena.rdf.model.Statement; +import com.hp.hpl.jena.vocabulary.RDF; +import com.hp.hpl.jena.vocabulary.RDFS; + +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.ClientHttpRequestFactory; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +import java.util.Collections; +import java.util.Random; + + +public class ElasticSearchIndexJobNode extends SimpleJobNode { + + private static final Log log = LogFactory.getLog(ElasticSearchIndexJobNode.class); + private static final String OAI_NAMESPACE_URI = "http://www.openarchives.org/OAI/2.0/"; + private static final String DRI_NAMESPACE_URI = "http://www.driver-repository.eu/namespace/dri"; + + private String eprParam; + private String indexId; + private String outputEprParam; + + private String mappingPolicyProfileId; + + private boolean verboseLogging; + +// @Autowired + private RecordParserHelper recordParserHelper = new RecordParserHelper(); + + + @Autowired + private ResultSetFactory resultSetFactory; + @Autowired + private UniqueServiceLocator serviceLocator; +// @Autowired +// private SaxonHelper saxonHelper; + @Autowired + private ResultSetClient resultSetClient; + + @Override + protected String execute(final Env env) throws Exception { + + setup(); + setupES(); + + final ResultSet rsIn = env.getAttribute(this.eprParam, ResultSet.class); + if ((rsIn == null)) { throw new MSROException("InputEprParam (" + this.eprParam + ") not found in ENV"); } + + for (String record : getResultSetClient().iter(rsIn, String.class)) { +// log.debug(record); + String rdfBlock = getRDFBlock(record); + try { + String jsonData = prepareJsonData(rdfBlock); + feed(jsonData); + } catch (Exception e) { + log.error(e); + throw e; + } + } + + log.info("elastic search indexing completed"); + return Arc.DEFAULT_ARC; + } + + private String base = "https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/"; + private static final Model M_MODEL = ModelFactory.createDefaultModel(); + public static final Property has_title = M_MODEL.createProperty("https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/has_title"); + + private OntModel baseModel; + + + private void setup() { + baseModel = ModelFactory.createOntologyModel(OntModelSpec.RDFS_MEM_TRANS_INF); + baseModel.read(getInputStream("eu/dnetlib/ariadneplus/rdfs/AO-CAT1.1.1.rdfs"), base); + } + + private String prepareJsonData(String rdfBlock) throws Exception{ + String jsonData = ""; + InfModel model = loadBaseModel(); + model.read(IOUtils.toInputStream(rdfBlock, "UTF-8"), base); + Resource AO_Individual_Data_Resource = M_MODEL.createResource("https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/AO_Individual_Data_Resource"); + ResIterator subjects = model.listSubjectsWithProperty(RDF.type, AO_Individual_Data_Resource); + while (subjects.hasNext()) { + Resource subject = subjects.nextResource(); + String title = getTitle(subject); + log.debug("\n\nRDF TITLE > " + title); + jsonData = getJsonTitle(title); + log.debug("\n\nJSON DATA > " + jsonData); + } + return jsonData; + } + + private String getTitle(final Resource resource) { + final Statement s = resource.getProperty(has_title); + if (s != null) { + RDFNode obj = s.getObject(); + if(obj.isLiteral()) return obj.asLiteral().getLexicalForm(); + } + return getLabel(resource); + } + + private String getLabel(final Resource resource) { + if(resource == null) return ""; + if (resource.hasProperty(RDFS.label)) { + return resource.getProperty(RDFS.label).getString().replace("'", "\'"); + } else return ""; + } + + private static InputStream getStream(final String classpath) throws IOException { + return new ClassPathResource(classpath).getInputStream(); + } + + protected InfModel loadBaseModel() { + return ModelFactory.createRDFSModel(baseModel); + } + + private InputStream getInputStream(String classpath){ + try { + final ClassPathResource resource = new ClassPathResource(classpath); + return resource.getInputStream(); + }catch(IOException e){ + return null; + } + } + + protected String getJsonTitle(final String title) + throws IOException { + JsonFactory jsonFactory = new JsonFactory(); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + BufferedOutputStream bos = new BufferedOutputStream(out); + JsonGenerator jg = jsonFactory.createGenerator(bos, JsonEncoding.UTF8); + jg.writeStartObject(); + + jg.writeStringField("title", title); + + jg.writeEndObject(); + jg.close(); + return out.toString("UTF-8"); + } + + + // ELASTIC SEARCH CLIENT + + private static final String BASE_CFG_URL = "http://%s:9200/%s/%s?pretty"; + + private String indexHost; + private String indexName; + private int readTimeout = 30000; + + + private void setupES(){ +// indexHost = "elastic-test.ariadne.d4science.org"; + indexHost = "localhost"; + indexName = "ads-test02"; + } + + private void feed(String record) throws Exception { + String url = String.format(BASE_CFG_URL, indexHost, indexName); + System.out.println("Sending to: " + url); + System.out.println("record: " + record); + RestTemplate restTemplate = new RestTemplate(getClientHttpRequestFactory()); + HttpHeaders headers = new HttpHeaders(); + headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON)); + headers.setContentType(MediaType.APPLICATION_JSON); + HttpEntity entity = new HttpEntity<>(record, headers); + + ResponseEntity response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class); + if(response.getStatusCode().is2xxSuccessful()){ + String resolved = response.getBody(); +// log.debug(String.format("AriadnePlus resolver resolved %s with %s", resName, resolved )); + System.out.println("SUCCESS :"+ resolved); + } + else{ +// log.debug(String.format("AriadnePlus resolver returned %s with cause %s for %s", res.getStatusCodeValue(), res.getStatusCode().getReasonPhrase(), resName)); + System.out.println("ERROR " + response.getStatusCode()); + } + + } + + private String generateIdentifier(String value, int iteration) { + int r = getRandomNumberInRange(1, 1000); + return value.concat("-").concat(Integer.toString(r).concat("-").concat(Integer.toString(iteration))); + } + + protected ClientHttpRequestFactory getClientHttpRequestFactory() { + HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(); + clientHttpRequestFactory.setReadTimeout(readTimeout); + return clientHttpRequestFactory; + } + + private static int getRandomNumberInRange(int min, int max) { + + Random r = new Random(); + return r.ints(min, (max + 1)).limit(1).findFirst().getAsInt(); + + } + + + // RDF HELPERS + public String getRDFBlock(final String record) throws SaxonApiException{ + recordParserHelper.init(); + Model md = null ; + try { + if (StringUtils.isBlank(record)) { + log.warn("Got empty record"); + return ""; + } + String objIdentifier = recordParserHelper.getObjIdentifier(record); + if (StringUtils.isBlank(objIdentifier)) { + log.warn("Got record with no objIdentifier -- skipping"); + return ""; + } + log.debug(objIdentifier); + String rdfBlock = recordParserHelper.getRDF(record); + if (StringUtils.isBlank(rdfBlock)) { + log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier); + } + log.debug("\n\n" + rdfBlock + "\n\n" ); + return rdfBlock; + }catch(Throwable e){ + log.error(e); + throw e; + } + } + + + + public String getOutputEprParam() { + return this.outputEprParam; + } + + public void setOutputEprParam(final String outputEprParam) { + this.outputEprParam = outputEprParam; + } + + public String getMappingPolicyProfileId() { + return mappingPolicyProfileId; + } + + public void setMappingPolicyProfileId(final String mappingPolicyProfileId) { + this.mappingPolicyProfileId = mappingPolicyProfileId; + } + + public boolean isVerboseLogging() { + return verboseLogging; + } + + public void setVerboseLogging(final boolean verboseLogging) { + this.verboseLogging = verboseLogging; + } + + public ResultSetFactory getResultSetFactory() { + return resultSetFactory; + } + + public void setResultSetFactory(final ResultSetFactory resultSetFactory) { + this.resultSetFactory = resultSetFactory; + } + + public UniqueServiceLocator getServiceLocator() { + return serviceLocator; + } + + public void setServiceLocator(final UniqueServiceLocator serviceLocator) { + this.serviceLocator = serviceLocator; + } + + public String getEprParam() { + return eprParam; + } + + public void setEprParam(String eprParam) { + this.eprParam = eprParam; + } + + public String getIndexId() { + return indexId; + } + + public void setIndexId(String indexId) { + this.indexId = indexId; + } + + public ResultSetClient getResultSetClient() { + return resultSetClient; + } + + public void setResultSetClient(ResultSetClient resultSetClient) { + this.resultSetClient = resultSetClient; + } + + public RecordParserHelper getRecordParserHelper() { + return recordParserHelper; + } + + public void setRecordParserHelper(RecordParserHelper recordParserHelper) { + this.recordParserHelper = recordParserHelper; + } +} +