package eu.dnetlib.ariadneplus.elasticsearch; import eu.dnetlib.ariadneplus.elasticsearch.model.*; import eu.dnetlib.ariadneplus.reader.ResourceManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; @Service public class BulkUpload { private static final Log log = LogFactory.getLog(BulkUpload.class); @Value("${elasticsearch.hostname}") private String elasticSearchHostName; @Value("${elasticsearch.indexname}") private String elasticSearchIndexName; private RestHighLevelClient client; public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException { this.elasticSearchIndexName = elasticSearchIndexName; client = new RestHighLevelClient( RestClient.builder( new HttpHost(elasticSearchHostName,9200,"http"))); } @PostConstruct public void init() throws IOException { client = new RestHighLevelClient( RestClient.builder( new HttpHost(elasticSearchHostName,9200,"http"))); } public int index(ResourceManager manager, boolean isCollection) { BulkRequest request = new BulkRequest(); int esResponseCode = 0; while (manager.hasNext()){ try { Object next = manager.next(); AriadneCatalogEntry ace = ((AriadneCatalogEntry) next); if (isCollection) { ace.setResourceType("collection"); if (ace.getSpatial()==null) { ace.setSpatial(Arrays.asList(new Spatial())); } } else { ace.setResourceType("dataset"); Spatial esSpatial = new Spatial(); ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).forEach(s -> { esSpatial.setPlaceName(s.getPlaceName()); }); ace.getSpatial().stream().filter(s -> s.getLocation()!=null).forEach(s -> { esSpatial.setLocation(s.getLocation()); }); ace.getSpatial().clear(); ace.setSpatial(Arrays.asList(esSpatial)); } if (!isCollection) { String uniqueIsPartOf = ace.getUniqueIsPartOf(); if (uniqueIsPartOf != null) { ace.setIsPartOf(Arrays.asList(uniqueIsPartOf)); } if (ace.getContributor() != null) { ace.getContributor().clear(); ace.setContributor(ace.getCreator()); } // Distribution distribution = new Distribution(); // AgentInfo distrPublisher = new AgentInfo(); // distrPublisher.setEmail(""); // distrPublisher.setName(""); // distrPublisher.setType(""); // distribution.setPublisher(Arrays.asList(distrPublisher)); // ace.setDistribution(Arrays.asList(distribution)); // ItemMetadataStructure ims = new ItemMetadataStructure(); // ace.setHasItemMetadataStructure(Arrays.asList(ims)); // MetadataRecord mr = new MetadataRecord(); // Dex dex = new Dex(); // mr.setConformsTo(Arrays.asList(dex)); // ace.setHasMetadataRecord(Arrays.asList(mr)); // if (!isCollection) { // ace.setKeyword(Arrays.asList(new String(""))); // } // AgentInfo sr = new AgentInfo(); // ace.setScientificResponsible(Arrays.asList(sr)); // AgentInfo tr = new AgentInfo(); // ace.setTechnicalResponsible(Arrays.asList(tr)); } // AgentInfo testPublisher = new AgentInfo(); // testPublisher.setName("TEST"); // ace.getPublisher().add(testPublisher); String[] splits = ace.getIdentifier().split("/"); if (ace.getAatSubjects() != null && ace.getDerivedSubject() != null) { String aatSource = ace.getAatSubjects().get(0).getId(); ace.getDerivedSubject().forEach(d -> { d.setSource(aatSource); }); String [] aatSourceSplit = aatSource.split("/"); String aatSubjectId = aatSourceSplit[aatSourceSplit.length-1]; ace.getAatSubjects().forEach(s -> s.setId(aatSubjectId)); } String idES = splits[splits.length-1]; request.add(new IndexRequest(elasticSearchIndexName).id(idES) .source(ace.toJson(),XContentType.JSON)); long start = System.currentTimeMillis(); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); long end = System.currentTimeMillis(); if (bulkResponse!=null) { esResponseCode = bulkResponse.status().getStatus(); // log.info("Indexing to ES completed with status: " + bulkResponse.status()); if (bulkResponse.hasFailures()) { log.error("FailureMessage: " + bulkResponse.buildFailureMessage()); } } else { esResponseCode = -3; } log.debug(idES+" es_index_time(sec): "+(end-start)/1000+" response_code: "+esResponseCode); } catch (Throwable t) { t.printStackTrace(); log.error("Indexing "+t.getMessage()); return -1; } } return esResponseCode; } }