new node to elastic search index

This commit is contained in:
Enrico Ottonello 2019-12-16 15:25:55 +01:00
parent b3d8a9193b
commit 98b452c59a
1 changed files with 342 additions and 0 deletions

View File

@ -0,0 +1,342 @@
package eu.dnetlib.ariadneplus.workflows.nodes;
import eu.dnetlib.ariadneplus.rdf.RecordParserHelper;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import net.sf.saxon.s9api.SaxonApiException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import org.springframework.core.io.ClassPathResource;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.ontology.OntModelSpec;
import com.hp.hpl.jena.rdf.model.InfModel;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.ResIterator;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.vocabulary.RDF;
import com.hp.hpl.jena.vocabulary.RDFS;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import java.util.Collections;
import java.util.Random;
public class ElasticSearchIndexJobNode extends SimpleJobNode {
private static final Log log = LogFactory.getLog(ElasticSearchIndexJobNode.class);
private static final String OAI_NAMESPACE_URI = "http://www.openarchives.org/OAI/2.0/";
private static final String DRI_NAMESPACE_URI = "http://www.driver-repository.eu/namespace/dri";
private String eprParam;
private String indexId;
private String outputEprParam;
private String mappingPolicyProfileId;
private boolean verboseLogging;
// @Autowired
private RecordParserHelper recordParserHelper = new RecordParserHelper();
@Autowired
private ResultSetFactory resultSetFactory;
@Autowired
private UniqueServiceLocator serviceLocator;
// @Autowired
// private SaxonHelper saxonHelper;
@Autowired
private ResultSetClient resultSetClient;
@Override
protected String execute(final Env env) throws Exception {
setup();
setupES();
final ResultSet<?> rsIn = env.getAttribute(this.eprParam, ResultSet.class);
if ((rsIn == null)) { throw new MSROException("InputEprParam (" + this.eprParam + ") not found in ENV"); }
for (String record : getResultSetClient().iter(rsIn, String.class)) {
// log.debug(record);
String rdfBlock = getRDFBlock(record);
try {
String jsonData = prepareJsonData(rdfBlock);
feed(jsonData);
} catch (Exception e) {
log.error(e);
throw e;
}
}
log.info("elastic search indexing completed");
return Arc.DEFAULT_ARC;
}
private String base = "https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/";
private static final Model M_MODEL = ModelFactory.createDefaultModel();
public static final Property has_title = M_MODEL.createProperty("https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/has_title");
private OntModel baseModel;
private void setup() {
baseModel = ModelFactory.createOntologyModel(OntModelSpec.RDFS_MEM_TRANS_INF);
baseModel.read(getInputStream("eu/dnetlib/ariadneplus/rdfs/AO-CAT1.1.1.rdfs"), base);
}
private String prepareJsonData(String rdfBlock) throws Exception{
String jsonData = "";
InfModel model = loadBaseModel();
model.read(IOUtils.toInputStream(rdfBlock, "UTF-8"), base);
Resource AO_Individual_Data_Resource = M_MODEL.createResource("https://www.ariadne-infrastructure.eu/resource/ao/cat/1.1/AO_Individual_Data_Resource");
ResIterator subjects = model.listSubjectsWithProperty(RDF.type, AO_Individual_Data_Resource);
while (subjects.hasNext()) {
Resource subject = subjects.nextResource();
String title = getTitle(subject);
log.debug("\n\nRDF TITLE > " + title);
jsonData = getJsonTitle(title);
log.debug("\n\nJSON DATA > " + jsonData);
}
return jsonData;
}
private String getTitle(final Resource resource) {
final Statement s = resource.getProperty(has_title);
if (s != null) {
RDFNode obj = s.getObject();
if(obj.isLiteral()) return obj.asLiteral().getLexicalForm();
}
return getLabel(resource);
}
private String getLabel(final Resource resource) {
if(resource == null) return "";
if (resource.hasProperty(RDFS.label)) {
return resource.getProperty(RDFS.label).getString().replace("'", "\'");
} else return "";
}
private static InputStream getStream(final String classpath) throws IOException {
return new ClassPathResource(classpath).getInputStream();
}
protected InfModel loadBaseModel() {
return ModelFactory.createRDFSModel(baseModel);
}
private InputStream getInputStream(String classpath){
try {
final ClassPathResource resource = new ClassPathResource(classpath);
return resource.getInputStream();
}catch(IOException e){
return null;
}
}
protected String getJsonTitle(final String title)
throws IOException {
JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream out = new ByteArrayOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(out);
JsonGenerator jg = jsonFactory.createGenerator(bos, JsonEncoding.UTF8);
jg.writeStartObject();
jg.writeStringField("title", title);
jg.writeEndObject();
jg.close();
return out.toString("UTF-8");
}
// ELASTIC SEARCH CLIENT
private static final String BASE_CFG_URL = "http://%s:9200/%s/%s?pretty";
private String indexHost;
private String indexName;
private int readTimeout = 30000;
private void setupES(){
// indexHost = "elastic-test.ariadne.d4science.org";
indexHost = "localhost";
indexName = "ads-test02";
}
private void feed(String record) throws Exception {
String url = String.format(BASE_CFG_URL, indexHost, indexName);
System.out.println("Sending to: " + url);
System.out.println("record: " + record);
RestTemplate restTemplate = new RestTemplate(getClientHttpRequestFactory());
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(record, headers);
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class);
if(response.getStatusCode().is2xxSuccessful()){
String resolved = response.getBody();
// log.debug(String.format("AriadnePlus resolver resolved %s with %s", resName, resolved ));
System.out.println("SUCCESS :"+ resolved);
}
else{
// log.debug(String.format("AriadnePlus resolver returned %s with cause %s for %s", res.getStatusCodeValue(), res.getStatusCode().getReasonPhrase(), resName));
System.out.println("ERROR " + response.getStatusCode());
}
}
private String generateIdentifier(String value, int iteration) {
int r = getRandomNumberInRange(1, 1000);
return value.concat("-").concat(Integer.toString(r).concat("-").concat(Integer.toString(iteration)));
}
protected ClientHttpRequestFactory getClientHttpRequestFactory() {
HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();
clientHttpRequestFactory.setReadTimeout(readTimeout);
return clientHttpRequestFactory;
}
private static int getRandomNumberInRange(int min, int max) {
Random r = new Random();
return r.ints(min, (max + 1)).limit(1).findFirst().getAsInt();
}
// RDF HELPERS
public String getRDFBlock(final String record) throws SaxonApiException{
recordParserHelper.init();
Model md = null ;
try {
if (StringUtils.isBlank(record)) {
log.warn("Got empty record");
return "";
}
String objIdentifier = recordParserHelper.getObjIdentifier(record);
if (StringUtils.isBlank(objIdentifier)) {
log.warn("Got record with no objIdentifier -- skipping");
return "";
}
log.debug(objIdentifier);
String rdfBlock = recordParserHelper.getRDF(record);
if (StringUtils.isBlank(rdfBlock)) {
log.warn("Missing rdf:RDF in record with objIdentifier " + objIdentifier);
}
log.debug("\n\n" + rdfBlock + "\n\n" );
return rdfBlock;
}catch(Throwable e){
log.error(e);
throw e;
}
}
public String getOutputEprParam() {
return this.outputEprParam;
}
public void setOutputEprParam(final String outputEprParam) {
this.outputEprParam = outputEprParam;
}
public String getMappingPolicyProfileId() {
return mappingPolicyProfileId;
}
public void setMappingPolicyProfileId(final String mappingPolicyProfileId) {
this.mappingPolicyProfileId = mappingPolicyProfileId;
}
public boolean isVerboseLogging() {
return verboseLogging;
}
public void setVerboseLogging(final boolean verboseLogging) {
this.verboseLogging = verboseLogging;
}
public ResultSetFactory getResultSetFactory() {
return resultSetFactory;
}
public void setResultSetFactory(final ResultSetFactory resultSetFactory) {
this.resultSetFactory = resultSetFactory;
}
public UniqueServiceLocator getServiceLocator() {
return serviceLocator;
}
public void setServiceLocator(final UniqueServiceLocator serviceLocator) {
this.serviceLocator = serviceLocator;
}
public String getEprParam() {
return eprParam;
}
public void setEprParam(String eprParam) {
this.eprParam = eprParam;
}
public String getIndexId() {
return indexId;
}
public void setIndexId(String indexId) {
this.indexId = indexId;
}
public ResultSetClient getResultSetClient() {
return resultSetClient;
}
public void setResultSetClient(ResultSetClient resultSetClient) {
this.resultSetClient = resultSetClient;
}
public RecordParserHelper getRecordParserHelper() {
return recordParserHelper;
}
public void setRecordParserHelper(RecordParserHelper recordParserHelper) {
this.recordParserHelper = recordParserHelper;
}
}