AriadnePlus/dnet-ariadneplus-graphdb-pu.../src/main/java/eu/dnetlib/ariadneplus/elasticsearch/BulkUpload.java

264 lines
12 KiB
Java

package eu.dnetlib.ariadneplus.elasticsearch;
import eu.dnetlib.ariadneplus.elasticsearch.model.AriadnePlusEntry;
import eu.dnetlib.ariadneplus.elasticsearch.model.AriadneResource;
import eu.dnetlib.ariadneplus.elasticsearch.model.Spatial;
import eu.dnetlib.ariadneplus.reader.ResourceManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.builders.CoordinatesBuilder;
import org.elasticsearch.common.geo.builders.PolygonBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.locationtech.jts.algorithm.Centroid;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.util.GeometryTransformer;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Service
public class BulkUpload {
private static final Log log = LogFactory.getLog(BulkUpload.class);
@Value("${elasticsearch.hostname}")
private String elasticSearchHostName;
@Value("${elasticsearch.indexname}")
private String elasticSearchIndexName;
private RestHighLevelClient client;
private WKTReader wktReader = new WKTReader();
public void init(String elasticSearchHostName, String elasticSearchIndexName) throws IOException {
this.elasticSearchIndexName = elasticSearchIndexName;
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(elasticSearchHostName,9200,"http")));
}
@PostConstruct
public void init() throws IOException {
client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(elasticSearchHostName,9200,"http")));
}
public int index(ResourceManager manager, boolean isCollection) {
BulkRequest request = new BulkRequest();
int esResponseCode = 0;
while (manager.hasNext()){
try {
Object next = manager.next();
AriadnePlusEntry ace = ((AriadnePlusEntry) next);
AriadneResource uri = new AriadneResource();
uri.setUri(ace.getTypeURI());
uri.setLabel(ace.getTypeLabel());
ace.setHas_type(uri);
String uniqueIsPartOf = ace.getUniqueIsPartOf();
if (uniqueIsPartOf != null) {
ace.setIsPartOf(Arrays.asList(uniqueIsPartOf));
}
if (ace.getContributor() != null) {
ace.getContributor().clear();
ace.setContributor(ace.getCreator());
}
if (ace.getSpatial()!=null) {
ace.getSpatial()
.stream()
.filter(s -> Objects.nonNull(s.getLat()) && Objects.nonNull(s.getLon()))
.forEach(s -> {
double lat = Double.parseDouble(s.getLat());
double lon = Double.parseDouble(s.getLon());
org.elasticsearch.common.geo.GeoPoint geopoint = new org.elasticsearch.common.geo.GeoPoint(lat, lon);
s.setGeopoint(geopoint);
s.setCentroid(geopoint);
});
}
if (isCollection) {
ace.setResourceType("collection");
if (ace.getSpatial()!=null) {
ace.getSpatial()
.stream()
.filter(s -> Objects.nonNull(s.getWkt()))
.forEach(s -> {
s.setPolygon(s.getWkt());
s.setCentroid(calculateCentroid(s.getWkt()));
});
}
}
else {
ace.setResourceType("dataset");
if (ace.getDigitalImage()!=null) {
ace.getDigitalImage()
.stream()
.filter(i -> i.getProviderUri()!=null)
.forEach(i -> {
i.setPrimary("true");
});
}
if (ace.getSpatial()!=null) {
ace.getSpatial()
.stream()
.filter(s -> Objects.nonNull(s.getBoundingBoxMaxLat())
&& Objects.nonNull(s.getBoundingBoxMaxLon())
&& Objects.nonNull(s.getBoundingBoxMinLat())
&& Objects.nonNull(s.getBoundingBoxMinLon()))
.forEach(s -> {
double maxlat = Double.parseDouble(s.getBoundingBoxMaxLat());
double minlat = Double.parseDouble(s.getBoundingBoxMinLat());
double minlon = Double.parseDouble(s.getBoundingBoxMinLon());
double maxlon = Double.parseDouble(s.getBoundingBoxMaxLon());
CoordinatesBuilder coordinatesBuilder = new CoordinatesBuilder();
coordinatesBuilder.coordinate(minlon, maxlat);
coordinatesBuilder.coordinate(minlon, minlat);
coordinatesBuilder.coordinate(maxlon, minlat);
coordinatesBuilder.coordinate(maxlon, maxlat);
coordinatesBuilder.coordinate(minlon, maxlat);
PolygonBuilder polygonBuilder = new PolygonBuilder(coordinatesBuilder);
String wkt = polygonBuilder.toWKT();
s.setBoundingbox(wkt);
s.setCentroid(calculateCentroid(wkt));
});
ace.getSpatial()
.stream()
.filter(s -> Objects.nonNull(s.getPolygonGeoPoints()) && s.getPolygonGeoPoints().size() >= 4)
.forEach(s -> {
//FIXME: What did you want to do? Nothing is set anywhere
CoordinatesBuilder coordinatesBuilder = new CoordinatesBuilder();
s.getPolygonGeoPoints().forEach(p -> {
coordinatesBuilder.coordinate(
Double.parseDouble(p.getLon()),
Double.parseDouble(p.getLat()));
});
});
ace.getSpatial()
.stream()
.filter(s -> Objects.nonNull(s.getWkt()))
.forEach(s -> {
s.setPolygon(s.getWkt());
s.setCentroid(calculateCentroid(s.getWkt()));
});
}
}
if (ace.getSpatial() != null) {
if (ace.getSpatial().size() == 2) {
Spatial uniqueSpatial = new Spatial();
boolean uniquePlaceNameFound = ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).count()==1;
boolean uniqueLocationFound = ace.getSpatial().stream().filter(s -> s.getGeopoint()!=null).count()==1;
if (uniquePlaceNameFound&&uniqueLocationFound) {
ace.getSpatial().stream().filter(s -> s.getPlaceName()!=null).forEach(s -> {
uniqueSpatial.setPlaceName(s.getPlaceName());
});
ace.getSpatial().stream().filter(s -> s.getGeopoint()!=null).forEach(s -> {
uniqueSpatial.setGeopoint(s.getGeopoint());
uniqueSpatial.setCentroid(s.getGeopoint());
});
ace.getSpatial().clear();
ace.setSpatial(Arrays.asList(uniqueSpatial));
}
}
List<Spatial> dedupSpatials = removeDuplicates(ace.getSpatial());
ace.getSpatial().clear();
ace.setSpatial(dedupSpatials);
}
String[] splits = ace.getIdentifier().split("/");
log.debug("JSON >>>> "+ace.toJson());
String idES = splits[splits.length-1];
request.add(new IndexRequest(elasticSearchIndexName).id(idES)
.source(ace.toJson(),XContentType.JSON));
long start = System.currentTimeMillis();
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
long end = System.currentTimeMillis();
if (bulkResponse!=null) {
esResponseCode = bulkResponse.status().getStatus();
// log.info("Indexing to ES completed with status: " + bulkResponse.status());
if (bulkResponse.hasFailures()) {
log.error("FailureMessage: " + bulkResponse.buildFailureMessage());
esResponseCode = -7;
}
}
else {
esResponseCode = -3;
}
log.debug(idES+" es_index_time(sec): "+(end-start)/1000+" response_code: "+esResponseCode);
} catch (Throwable t) {
t.printStackTrace();
log.error("Indexing "+t.getMessage());
return -1;
}
}
return esResponseCode;
}
protected GeoPoint calculateCentroid(final String wkt){
try {
Geometry geo = wktReader.read(wkt);
Coordinate coord = Centroid.getCentroid(geo);
return new org.elasticsearch.common.geo.GeoPoint(coord.getY(), coord.getX());
} catch (ParseException e) {
log.fatal("Cannot calculate centroid for WKT "+wkt+"\n Cause: "+e.getCause().getMessage());
return null;
}
}
public static List<Spatial> removeDuplicates(List<Spatial> spatialList) {
Map<String, List<Spatial>> duplicatesMap = getDuplicatesMap(spatialList);
return duplicatesMap.values().stream()
.filter(spatials -> spatials!=null)
.map(spatials -> spatials.get(0))
.collect(Collectors.toList());
}
private static Map<String, List<Spatial>> getDuplicatesMap(List<Spatial> spatialList) {
return spatialList.stream().collect(Collectors.groupingBy(BulkUpload::uniqueAttributes));
}
private static String uniqueAttributes(Spatial spatial){
if(Objects.isNull(spatial)){
return StringUtils.EMPTY;
}
String name = "";
if (!Objects.isNull(spatial.getPlaceName())) {
name = spatial.getPlaceName();
}
String lat = "";
String lon = "";
if (!Objects.isNull(spatial.getGeopoint())) {
lat = Double.toString(spatial.getGeopoint().getLat());
lon = Double.toString(spatial.getGeopoint().getLon());
}
String uniqueAttribute = (name) + (lat) + (lon);
return uniqueAttribute;
}
}