package eu.dnetlib.ariadneplus.elasticsearch; import eu.dnetlib.ariadneplus.elasticsearch.model.*; import eu.dnetlib.ariadneplus.reader.ResourceManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentType; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; @Service public class BulkUpload { private static final Log log = LogFactory.getLog(BulkUpload.class); @Value("${elasticsearch.hostname}") private String elasticSearchHostName; @Value("${elasticsearch.indexname}") private String elasticSearchIndexName; private RestHighLevelClient client; public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException { this.elasticSearchIndexName = elasticSearchIndexName; client = new RestHighLevelClient( RestClient.builder( new HttpHost(elasticSearchHostName,9200,"http"))); } @PostConstruct public void init() throws IOException { client = new RestHighLevelClient( RestClient.builder( new HttpHost(elasticSearchHostName,9200,"http"))); } public void index(ResourceManager manager, boolean isCollection) { BulkRequest request = new BulkRequest(); while (manager.hasNext()){ try { Object next = manager.next(); AriadneCatalogEntry ace = ((AriadneCatalogEntry) next); if (isCollection) { ace.setResourceType("collection"); if (ace.getSpatial()==null) { ace.setSpatial(Arrays.asList(new Spatial())); } } else { ace.setResourceType("dataset"); Spatial esSpatial = new Spatial(); ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).forEach(s -> { esSpatial.setPlaceName(s.getPlaceName()); }); ace.getSpatial().stream().filter(s -> s.getLocation()!=null).forEach(s -> { esSpatial.setLocation(s.getLocation()); }); ace.getSpatial().clear(); ace.setSpatial(Arrays.asList(esSpatial)); } if (!isCollection) { String uniqueIsPartOf = ace.getUniqueIsPartOf(); if (uniqueIsPartOf != null) { ace.setIsPartOf(Arrays.asList(uniqueIsPartOf)); } if (ace.getContributor() != null) { ace.getContributor().clear(); ace.setContributor(ace.getCreator()); } Distribution distribution = new Distribution(); AgentInfo distrPublisher = new AgentInfo(); distrPublisher.setEmail(""); distrPublisher.setName(""); distrPublisher.setType(""); distribution.setPublisher(Arrays.asList(distrPublisher)); ace.setDistribution(Arrays.asList(distribution)); ItemMetadataStructure ims = new ItemMetadataStructure(); ace.setHasItemMetadataStructure(Arrays.asList(ims)); MetadataRecord mr = new MetadataRecord(); Dex dex = new Dex(); mr.setConformsTo(Arrays.asList(dex)); ace.setHasMetadataRecord(Arrays.asList(mr)); if (!isCollection) { ace.setKeyword(Arrays.asList(new String(""))); } AgentInfo sr = new AgentInfo(); ace.setScientificResponsible(Arrays.asList(sr)); AgentInfo tr = new AgentInfo(); ace.setTechnicalResponsible(Arrays.asList(tr)); } AgentInfo testPublisher = new AgentInfo(); testPublisher.setName("TEST"); testPublisher.setPhone(null); testPublisher.setEmail(null); ace.getPublisher().add(testPublisher); String[] splits = ace.getIdentifier().split("/"); if (ace.getAatSubjects() != null && ace.getDerivedSubject() != null) { String aatSource = ace.getAatSubjects().get(0).getId(); ace.getDerivedSubject().forEach(d -> { d.setSource(aatSource); }); String [] aatSourceSplit = aatSource.split("/"); String aatSubjectId = aatSourceSplit[aatSourceSplit.length-1]; ace.getAatSubjects().forEach(s -> s.setId(aatSubjectId)); } String idES; if (isCollection) { idES = "10000".concat(ace.getOriginalId()); } else { idES = splits[splits.length-1]; } request.add(new IndexRequest(elasticSearchIndexName).id(idES) .source(ace.toJson(),XContentType.JSON)); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); log.info("Indexing to ES completed with status: "+bulkResponse.status()); if (bulkResponse.hasFailures()) { log.error("FailureMessage: "+bulkResponse.buildFailureMessage()); } } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } }