package eu.eudat.elastic.repository; import eu.eudat.elastic.criteria.DatasetCriteria; import eu.eudat.elastic.entities.Dataset; import eu.eudat.elastic.entities.Dmp; import eu.eudat.elastic.entities.Tag; import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.NestedQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.filter.ParsedFilters; import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.nested.ParsedNested; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @Service("datasetRepository") public class DatasetRepository extends ElasticRepository { private final DmpRepository dmpRepository; public DatasetRepository(RestHighLevelClient client, DmpRepository dmpRepository) { super(client); this.dmpRepository = dmpRepository; } @Override public Dataset createOrUpdate(Dataset entity) throws IOException { if (this.getClient() != null) { XContentBuilder builder = XContentFactory.jsonBuilder(); Dmp dmp = this.dmpRepository.findDocument(entity.getDmp().toString()); boolean found = false; if (dmp.getDatasets() != null && !dmp.getDatasets().isEmpty()) { for (int i = 0; i < dmp.getDatasets().size(); i++) { if (dmp.getDatasets().get(i).getId().equals(entity.getId())) { dmp.getDatasets().set(i, entity); found = true; break; } } } if (!found) { if (dmp.getDatasets() == null) { dmp.setDatasets(new ArrayList<>()); } dmp.getDatasets().add(entity); } IndexRequest request = new IndexRequest("dmps").id(dmp.getId().toString()).source(dmp.toElasticEntity(builder));//new IndexRequest("datasets", "doc", entity.getId()).source(entity.toElasticEntity(builder)); this.getClient().index(request, RequestOptions.DEFAULT); return entity; } return null; } @Override public Dataset findDocument(String id) throws IOException { if (this.getClient() != null) { SearchRequest searchRequest = new SearchRequest("dmps"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().should(QueryBuilders.termQuery("datasets.id.keyword", id)); NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery( "datasets", boolQuery, ScoreMode.Avg).innerHit(new InnerHitBuilder()); searchSourceBuilder.query(nestedQueryBuilder); searchRequest.source(searchSourceBuilder); SearchResponse response = this.getClient().search(searchRequest, RequestOptions.DEFAULT); return ((Stream)Arrays.stream(response.getHits().getHits()) .map(hit -> hit.getInnerHits().values()).flatMap(Collection::stream) .map(SearchHits::getHits).flatMap(Arrays::stream) .map(x -> new Dataset().fromElasticEntity(this.transformFromString(x.getSourceAsString(), Map.class)))).findFirst().orElse(null); // GetRequest request = new GetRequest("datasets", id); // GetResponse response = this.getClient().get(request, RequestOptions.DEFAULT); // return new Dataset().fromElasticEntity(response.getSourceAsMap()); } return null; } @Override public List query(DatasetCriteria criteria) throws IOException { if (this.getClient() != null) { SearchRequest searchRequest = new SearchRequest("dmps"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); /*CountRequest countRequest = new CountRequest("dmps").routing("datasets").routing("id"); countRequest.query(QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("datasets.status.keyword", Stream.of(Dataset.Status.DELETED.getValue(), Dataset.Status.CANCELED.getValue()).collect(Collectors.toList())))); CountResponse countResponse = getClient().count(countRequest, RequestOptions.DEFAULT); Long count = countResponse.getCount();*/ SearchRequest countRequest = new SearchRequest("dmps"); NestedAggregationBuilder nestedAggregationBuilder = AggregationBuilders.nested("by_dataset", "datasets"); FiltersAggregationBuilder filtersAggregationBuilder = AggregationBuilders.filters("dataset_query", QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("datasets.status.keyword", Stream.of(Dataset.Status.DELETED.getValue(), Dataset.Status.CANCELED.getValue()).collect(Collectors.toList())))); nestedAggregationBuilder.subAggregation(filtersAggregationBuilder); SearchSourceBuilder countSourceBuilder = new SearchSourceBuilder(); countSourceBuilder.aggregation(nestedAggregationBuilder); countRequest.source(countSourceBuilder); SearchResponse countResponse = getClient().search(countRequest, RequestOptions.DEFAULT); Long count = ((ParsedFilters)((ParsedNested)countResponse.getAggregations().asMap().get("by_dataset")).getAggregations().get("dataset_query")).getBuckets().get(0).getDocCount(); searchSourceBuilder.size(count.intValue()); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().mustNot(QueryBuilders.termsQuery("datasets.status", Stream.of(Dataset.Status.DELETED.getValue(), Dataset.Status.CANCELED.getValue()).collect(Collectors.toList()))); if (criteria.isPublic()) { boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.public", "true")); boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.status", Dataset.Status.FINALISED.getValue())); boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.lastPublicVersion", "true")); } if (criteria.getLike() != null && !criteria.getLike().isEmpty()) { boolQuery = boolQuery.should(QueryBuilders.queryStringQuery(criteria.getLike()).allowLeadingWildcard(true).fields(Stream.of(new Object[][]{ {"datasets.label", 1.0f}, {"datasets.description", 1.0f}, {"datasets.formData", 1.0f} }).collect(Collectors.toMap(data -> (String) data[0], data -> (Float) data[1])))); } if (criteria.getDatasetTemplates() != null && criteria.getDatasetTemplates().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.template", criteria.getDatasetTemplates().stream().map(UUID::toString).collect(Collectors.toList()))); } if (criteria.getStatus() != null) { boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.status", criteria.getStatus().toString())); } if (criteria.getDmps() != null && criteria.getDmps().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.dmp", criteria.getDmps().stream().map(UUID::toString).collect(Collectors.toList()))); } if (criteria.getGroupIds() != null && criteria.getGroupIds().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.group", criteria.getGroupIds().stream().map(UUID::toString).collect(Collectors.toList()))); } if (criteria.getGrants() != null && criteria.getGrants().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.grant", criteria.getGrants().stream().map(UUID::toString).collect(Collectors.toList()))); } if (criteria.getGrantStatus() != null) { boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.grantStatus", criteria.getGrantStatus().toString())); } if (criteria.getCollaborators() != null && criteria.getCollaborators().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.collaborators.id", criteria.getCollaborators().stream().map(UUID::toString).collect(Collectors.toList()))); } if (!criteria.isPublic()) { if (criteria.getAllowAllVersions() != null && !criteria.getAllowAllVersions()) { boolQuery = boolQuery.should(QueryBuilders.termQuery("datasets.lastVersion", "true")); } } if (criteria.getOrganiztions() != null && criteria.getOrganiztions().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.organizations.id", criteria.getOrganiztions())); } if (criteria.getTags() != null && criteria.getTags().size() > 0) { boolQuery = boolQuery.should(QueryBuilders.termsQuery("datasets.tags.name", criteria.getTags().stream().map(Tag::getName).collect(Collectors.toList()))); } if (boolQuery.should().isEmpty() && boolQuery.mustNot().isEmpty()) { boolQuery.should(QueryBuilders.matchAllQuery()); } else { boolQuery.minimumShouldMatch(boolQuery.should().size()); } NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("datasets", boolQuery, ScoreMode.Avg).innerHit(new InnerHitBuilder()); searchSourceBuilder.query(nestedQueryBuilder); searchRequest.source(searchSourceBuilder); SearchResponse response = this.getClient().search(searchRequest, RequestOptions.DEFAULT); return ((Stream)Arrays.stream(response.getHits().getHits()) .map(hit -> hit.getInnerHits().values()).flatMap(Collection::stream) .map(SearchHits::getHits).flatMap(Arrays::stream) .map(x -> new Dataset().fromElasticEntity(this.transformFromString(x.getSourceAsString(), Map.class)))).collect(Collectors.toList()); } return null; } @Override public boolean exists() throws IOException { if (this.getClient() != null) { GetIndexRequest request = new GetIndexRequest("dmps"); // request.indices("datasets"); return this.getClient().indices().exists(request, RequestOptions.DEFAULT); } return false; } @Override public void clear() throws IOException { //DON'T /* if (exists()) { DeleteByQueryRequest delete = new DeleteByQueryRequest("datasets"); delete.setQuery(QueryBuilders.matchAllQuery()); this.getClient().deleteByQuery(delete, RequestOptions.DEFAULT); }*/ } }