2020-01-28 16:53:59 +01:00
|
|
|
package eu.dnetlib.ariadneplus.elasticsearch;
|
|
|
|
|
2020-07-08 10:50:14 +02:00
|
|
|
import eu.dnetlib.ariadneplus.elasticsearch.model.*;
|
2020-06-10 19:39:53 +02:00
|
|
|
import eu.dnetlib.ariadneplus.reader.ResourceManager;
|
2021-03-04 10:59:54 +01:00
|
|
|
import org.apache.commons.lang3.StringUtils;
|
2020-06-12 18:14:41 +02:00
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.apache.http.HttpHost;
|
2021-06-17 17:59:51 +02:00
|
|
|
import org.apache.lucene.spatial3d.geom.GeoPoint;
|
|
|
|
import org.apache.lucene.spatial3d.geom.GeoPolygon;
|
|
|
|
import org.apache.lucene.spatial3d.geom.GeoPolygonFactory;
|
|
|
|
import org.apache.lucene.spatial3d.geom.PlanetModel;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
|
import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
|
import org.elasticsearch.action.index.IndexRequest;
|
|
|
|
import org.elasticsearch.client.RequestOptions;
|
|
|
|
import org.elasticsearch.client.RestClient;
|
|
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
2021-06-17 17:59:51 +02:00
|
|
|
import org.elasticsearch.common.geo.builders.CoordinatesBuilder;
|
|
|
|
import org.elasticsearch.common.geo.builders.EnvelopeBuilder;
|
|
|
|
import org.elasticsearch.common.geo.builders.PolygonBuilder;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.elasticsearch.common.xcontent.XContentType;
|
2021-06-17 17:59:51 +02:00
|
|
|
import org.elasticsearch.geometry.Polygon;
|
|
|
|
import org.locationtech.jts.geom.Coordinate;
|
2020-01-28 16:53:59 +01:00
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
2020-06-12 18:14:41 +02:00
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
import java.io.IOException;
|
2021-06-17 17:59:51 +02:00
|
|
|
import java.io.OutputStream;
|
2021-03-04 10:59:54 +01:00
|
|
|
import java.util.*;
|
2021-02-16 10:51:14 +01:00
|
|
|
import java.util.stream.Collectors;
|
2020-06-12 18:14:41 +02:00
|
|
|
|
2020-01-28 16:53:59 +01:00
|
|
|
@Service
|
|
|
|
public class BulkUpload {
|
|
|
|
|
2020-06-12 18:14:41 +02:00
|
|
|
private static final Log log = LogFactory.getLog(BulkUpload.class);
|
|
|
|
|
|
|
|
@Value("${elasticsearch.hostname}")
|
|
|
|
private String elasticSearchHostName;
|
|
|
|
|
|
|
|
@Value("${elasticsearch.indexname}")
|
|
|
|
private String elasticSearchIndexName;
|
2020-01-28 16:53:59 +01:00
|
|
|
|
|
|
|
private RestHighLevelClient client;
|
|
|
|
|
2020-06-16 02:36:16 +02:00
|
|
|
public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException {
|
|
|
|
this.elasticSearchIndexName = elasticSearchIndexName;
|
|
|
|
client = new RestHighLevelClient(
|
|
|
|
RestClient.builder(
|
|
|
|
new HttpHost(elasticSearchHostName,9200,"http")));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-01-28 16:53:59 +01:00
|
|
|
@PostConstruct
|
2020-06-10 19:39:53 +02:00
|
|
|
public void init() throws IOException {
|
2020-01-28 16:53:59 +01:00
|
|
|
client = new RestHighLevelClient(
|
|
|
|
RestClient.builder(
|
2020-06-12 18:14:41 +02:00
|
|
|
new HttpHost(elasticSearchHostName,9200,"http")));
|
2020-01-28 16:53:59 +01:00
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-07-22 23:13:47 +02:00
|
|
|
public int index(ResourceManager manager, boolean isCollection) {
|
2020-01-28 16:53:59 +01:00
|
|
|
BulkRequest request = new BulkRequest();
|
2020-07-22 23:13:47 +02:00
|
|
|
int esResponseCode = 0;
|
2020-01-28 16:53:59 +01:00
|
|
|
while (manager.hasNext()){
|
2020-06-10 19:39:53 +02:00
|
|
|
|
2020-01-28 16:53:59 +01:00
|
|
|
try {
|
2020-06-10 19:39:53 +02:00
|
|
|
Object next = manager.next();
|
2021-06-15 23:34:59 +02:00
|
|
|
AriadnePlusEntry ace = ((AriadnePlusEntry) next);
|
2020-07-07 13:39:22 +02:00
|
|
|
if (isCollection) {
|
|
|
|
ace.setResourceType("collection");
|
2020-07-13 11:03:21 +02:00
|
|
|
if (ace.getSpatial()==null) {
|
|
|
|
ace.setSpatial(Arrays.asList(new Spatial()));
|
|
|
|
}
|
2020-07-07 13:39:22 +02:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
ace.setResourceType("dataset");
|
2021-02-16 10:51:14 +01:00
|
|
|
if (ace.getSpatial()!=null) {
|
2021-06-15 23:34:59 +02:00
|
|
|
ace.getSpatial()
|
|
|
|
.stream()
|
|
|
|
.filter(s -> Objects.nonNull(s.getLat()) && Objects.nonNull(s.getLon()))
|
|
|
|
.forEach(s -> {
|
|
|
|
double lat = Double.parseDouble(s.getLat());
|
|
|
|
double lon = Double.parseDouble(s.getLon());
|
2021-06-17 17:59:51 +02:00
|
|
|
org.elasticsearch.common.geo.GeoPoint geopoint = new org.elasticsearch.common.geo.GeoPoint(lat, lon);
|
2021-06-15 23:34:59 +02:00
|
|
|
s.setGeopoint(geopoint);
|
|
|
|
});
|
2021-06-17 17:59:51 +02:00
|
|
|
ace.getSpatial()
|
|
|
|
.stream()
|
|
|
|
.filter(s -> Objects.nonNull(s.getBoundingBoxMaxLat())
|
|
|
|
&& Objects.nonNull(s.getBoundingBoxMaxLon())
|
|
|
|
&& Objects.nonNull(s.getBoundingBoxMinLat())
|
|
|
|
&& Objects.nonNull(s.getBoundingBoxMinLon()))
|
|
|
|
.forEach(s -> {
|
|
|
|
double maxlat = Double.parseDouble(s.getBoundingBoxMaxLat());
|
|
|
|
double minlat = Double.parseDouble(s.getBoundingBoxMinLat());
|
|
|
|
double minlon = Double.parseDouble(s.getBoundingBoxMinLon());
|
|
|
|
double maxlon = Double.parseDouble(s.getBoundingBoxMaxLon());
|
|
|
|
CoordinatesBuilder coordinatesBuilder = new CoordinatesBuilder();
|
|
|
|
coordinatesBuilder.coordinate(minlon, maxlat);
|
|
|
|
coordinatesBuilder.coordinate(minlon, minlat);
|
|
|
|
coordinatesBuilder.coordinate(maxlon, minlat);
|
|
|
|
coordinatesBuilder.coordinate(maxlon, maxlat);
|
|
|
|
coordinatesBuilder.coordinate(minlon, maxlat);
|
|
|
|
PolygonBuilder polygonBuilder = new PolygonBuilder(coordinatesBuilder);
|
|
|
|
String wkt = polygonBuilder.toWKT();
|
|
|
|
s.setBoundingbox(wkt);
|
|
|
|
// Coordinate topLeft = new Coordinate(minlon, maxlat);
|
|
|
|
// Coordinate bottomRight = new Coordinate(maxlon, minlat);
|
|
|
|
// EnvelopeBuilder envelopeBuilder = new EnvelopeBuilder(topLeft, bottomRight);
|
|
|
|
// String wkt = envelopeBuilder.toWKT();
|
|
|
|
// s.setBoundingbox(wkt);
|
|
|
|
});
|
2021-06-18 19:32:12 +02:00
|
|
|
ace.getSpatial()
|
|
|
|
.stream()
|
|
|
|
.filter(s -> Objects.nonNull(s.getPolygonGeoPoints())&&s.getPolygonGeoPoints().size()>=4)
|
|
|
|
.forEach(s -> {
|
|
|
|
CoordinatesBuilder coordinatesBuilder = new CoordinatesBuilder();
|
|
|
|
s.getPolygonGeoPoints().forEach(p -> {
|
|
|
|
coordinatesBuilder.coordinate(
|
|
|
|
Double.parseDouble(p.getLon()),
|
|
|
|
Double.parseDouble(p.getLat()));
|
|
|
|
});
|
|
|
|
PolygonBuilder polygonBuilder = new PolygonBuilder(coordinatesBuilder);
|
|
|
|
String wkt = polygonBuilder.toWKT();
|
|
|
|
log.debug("POLYGON "+wkt);
|
|
|
|
s.setPolygon(wkt);
|
|
|
|
});
|
2021-02-16 10:51:14 +01:00
|
|
|
if (ace.getSpatial().size()==2) {
|
|
|
|
Spatial uniqueSpatial = new Spatial();
|
|
|
|
boolean uniquePlaceNameFound = ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).count()==1;
|
2021-06-15 23:34:59 +02:00
|
|
|
boolean uniqueLocationFound = ace.getSpatial().stream().filter(s -> s.getGeopoint()!=null).count()==1;
|
2021-02-16 10:51:14 +01:00
|
|
|
if (uniquePlaceNameFound&&uniqueLocationFound) {
|
|
|
|
ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).forEach(s -> {
|
|
|
|
uniqueSpatial.setPlaceName(s.getPlaceName());
|
|
|
|
});
|
2021-06-15 23:34:59 +02:00
|
|
|
ace.getSpatial().stream().filter(s -> s.getGeopoint()!=null).forEach(s -> {
|
|
|
|
uniqueSpatial.setGeopoint(s.getGeopoint());
|
2021-02-16 10:51:14 +01:00
|
|
|
});
|
|
|
|
ace.getSpatial().clear();
|
|
|
|
ace.setSpatial(Arrays.asList(uniqueSpatial));
|
|
|
|
}
|
|
|
|
}
|
2021-03-04 10:59:54 +01:00
|
|
|
// else {
|
|
|
|
// if (ace.getSpatial()!=null) {
|
|
|
|
// Set<String> items = new HashSet<>();
|
|
|
|
// List<Spatial> spatialsNoDup = ace.getSpatial().stream()
|
|
|
|
// .filter(s -> !items.add(s.getPlaceName()))
|
|
|
|
// .collect(Collectors.toList());
|
|
|
|
// ace.getSpatial().clear();
|
|
|
|
// ace.setSpatial(spatialsNoDup);
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// }
|
|
|
|
List<Spatial> dedupSpatials = removeDuplicates(ace.getSpatial());
|
|
|
|
ace.getSpatial().clear();
|
|
|
|
ace.setSpatial(dedupSpatials);
|
|
|
|
|
2021-06-15 23:34:59 +02:00
|
|
|
// TODO update following check according to new model definition
|
|
|
|
// if (ace.getSpatial().size()>1) {
|
|
|
|
// ace.getSpatial().removeIf(s -> (s.getPlaceName()!=null&&s.getPlaceName().equals("Name not provided")&&Objects.isNull(s.getLocation())));
|
|
|
|
// }
|
2021-02-16 10:51:14 +01:00
|
|
|
}
|
|
|
|
else {
|
|
|
|
ace.setSpatial(Arrays.asList(new Spatial()));
|
|
|
|
}
|
2020-07-07 13:39:22 +02:00
|
|
|
}
|
2020-07-08 22:00:28 +02:00
|
|
|
|
2021-06-17 17:59:51 +02:00
|
|
|
// if (ace.getTemporal()!=null) {
|
|
|
|
// ace.getTemporal().stream()
|
|
|
|
// .filter(t->t.getMatchingPeriodOName()!=null)
|
|
|
|
// .forEach(t->{
|
|
|
|
// t.setPeriodName(t.getMatchingPeriodOName());
|
|
|
|
// });
|
|
|
|
// }
|
2021-02-16 18:19:30 +01:00
|
|
|
|
2020-07-13 11:03:21 +02:00
|
|
|
if (!isCollection) {
|
|
|
|
String uniqueIsPartOf = ace.getUniqueIsPartOf();
|
|
|
|
if (uniqueIsPartOf != null) {
|
|
|
|
ace.setIsPartOf(Arrays.asList(uniqueIsPartOf));
|
|
|
|
}
|
|
|
|
if (ace.getContributor() != null) {
|
|
|
|
ace.getContributor().clear();
|
|
|
|
ace.setContributor(ace.getCreator());
|
|
|
|
}
|
2020-07-07 13:39:22 +02:00
|
|
|
}
|
2021-06-15 23:34:59 +02:00
|
|
|
|
2020-06-10 19:39:53 +02:00
|
|
|
String[] splits = ace.getIdentifier().split("/");
|
2020-07-09 16:35:04 +02:00
|
|
|
|
2021-06-17 17:59:51 +02:00
|
|
|
log.debug("JSON >>>> "+ace.toJson());
|
2020-07-13 11:03:21 +02:00
|
|
|
|
2020-07-17 17:42:08 +02:00
|
|
|
String idES = splits[splits.length-1];
|
2020-07-13 11:03:21 +02:00
|
|
|
request.add(new IndexRequest(elasticSearchIndexName).id(idES)
|
2020-01-28 16:53:59 +01:00
|
|
|
.source(ace.toJson(),XContentType.JSON));
|
2020-11-30 13:27:28 +01:00
|
|
|
long start = System.currentTimeMillis();
|
2020-01-28 16:53:59 +01:00
|
|
|
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
|
2020-11-30 13:27:28 +01:00
|
|
|
long end = System.currentTimeMillis();
|
2020-07-22 23:13:47 +02:00
|
|
|
if (bulkResponse!=null) {
|
|
|
|
esResponseCode = bulkResponse.status().getStatus();
|
2020-07-24 12:34:06 +02:00
|
|
|
// log.info("Indexing to ES completed with status: " + bulkResponse.status());
|
2020-07-22 23:13:47 +02:00
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
|
log.error("FailureMessage: " + bulkResponse.buildFailureMessage());
|
2021-05-31 16:43:21 +02:00
|
|
|
esResponseCode = -7;
|
2020-07-22 23:13:47 +02:00
|
|
|
}
|
2020-06-10 19:39:53 +02:00
|
|
|
}
|
2020-07-23 23:12:07 +02:00
|
|
|
else {
|
|
|
|
esResponseCode = -3;
|
|
|
|
}
|
2020-11-30 13:27:28 +01:00
|
|
|
log.debug(idES+" es_index_time(sec): "+(end-start)/1000+" response_code: "+esResponseCode);
|
|
|
|
} catch (Throwable t) {
|
|
|
|
t.printStackTrace();
|
|
|
|
log.error("Indexing "+t.getMessage());
|
2020-07-22 23:13:47 +02:00
|
|
|
return -1;
|
2020-01-28 16:53:59 +01:00
|
|
|
}
|
|
|
|
}
|
2020-07-22 23:13:47 +02:00
|
|
|
return esResponseCode;
|
2020-01-28 16:53:59 +01:00
|
|
|
}
|
2021-03-04 10:59:54 +01:00
|
|
|
|
|
|
|
public static List<Spatial> removeDuplicates(List<Spatial> spatialList) {
|
|
|
|
Map<String, List<Spatial>> duplicatesMap = getDuplicatesMap(spatialList);
|
|
|
|
return duplicatesMap.values().stream()
|
|
|
|
.filter(spatials -> spatials!=null)
|
|
|
|
.map(spatials -> spatials.get(0))
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
}
|
|
|
|
|
|
|
|
private static Map<String, List<Spatial>> getDuplicatesMap(List<Spatial> spatialList) {
|
|
|
|
return spatialList.stream().collect(Collectors.groupingBy(BulkUpload::uniqueAttributes));
|
|
|
|
}
|
|
|
|
|
|
|
|
private static String uniqueAttributes(Spatial spatial){
|
|
|
|
if(Objects.isNull(spatial)){
|
|
|
|
return StringUtils.EMPTY;
|
|
|
|
}
|
|
|
|
String name = "";
|
|
|
|
if (!Objects.isNull(spatial.getPlaceName())) {
|
|
|
|
name = spatial.getPlaceName();
|
|
|
|
}
|
|
|
|
String lat = "";
|
|
|
|
String lon = "";
|
2021-06-15 23:34:59 +02:00
|
|
|
if (!Objects.isNull(spatial.getGeopoint())) {
|
|
|
|
lat = Double.toString(spatial.getGeopoint().getLat());
|
|
|
|
lon = Double.toString(spatial.getGeopoint().getLon());
|
2021-03-04 10:59:54 +01:00
|
|
|
}
|
|
|
|
String uniqueAttribute = (name) + (lat) + (lon);
|
|
|
|
return uniqueAttribute;
|
|
|
|
}
|
2020-01-28 16:53:59 +01:00
|
|
|
}
|