package eu.dnetlib.ariadneplus.elasticsearch; import eu.dnetlib.ariadneplus.elasticsearch.model.AgentInfo; import eu.dnetlib.ariadneplus.elasticsearch.model.AriadneCatalogEntry; import eu.dnetlib.ariadneplus.reader.ResourceManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @Service public class BulkUpload { private static final Log log = LogFactory.getLog(BulkUpload.class); @Value("${elasticsearch.hostname}") private String elasticSearchHostName; @Value("${elasticsearch.indexname}") private String elasticSearchIndexName; private RestHighLevelClient client; @PostConstruct public void init() throws IOException { client = new RestHighLevelClient( RestClient.builder( new HttpHost(elasticSearchHostName,9200,"http"))); } public void index(ResourceManager manager) { BulkRequest request = new BulkRequest(); while (manager.hasNext()){ try { Object next = manager.next(); AriadneCatalogEntry ace = ((AriadneCatalogEntry) next); AgentInfo testPublisher = new AgentInfo(); testPublisher.setName("TEST"); ace.getPublisher().add(testPublisher); String[] splits = ace.getIdentifier().split("/"); request.add(new IndexRequest(elasticSearchIndexName).id(splits[splits.length-1]) .source(ace.toJson(),XContentType.JSON)); log.debug("Indexing to ES: "+ace.toJson()); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); log.info("Indexing to ES completed with status: "+bulkResponse.status()); if (bulkResponse.hasFailures()) { log.error("FailureMessage: "+bulkResponse.buildFailureMessage()); } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }