2022-02-04 10:12:15 +01:00
|
|
|
package eu.dnetlib.openaire.dsm.dao;
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.time.Instant;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Objects;
|
|
|
|
import java.util.Optional;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.function.Function;
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.google.common.cache.*;
|
|
|
|
import com.google.common.primitives.Ints;
|
|
|
|
import com.mongodb.BasicDBObject;
|
|
|
|
import com.mongodb.MongoClient;
|
|
|
|
import com.mongodb.client.FindIterable;
|
|
|
|
import com.mongodb.client.MongoCollection;
|
2022-02-07 10:09:18 +01:00
|
|
|
import eu.dnetlib.DnetOpenaireExporterProperties;
|
|
|
|
import eu.dnetlib.DnetOpenaireExporterProperties.Datasource;
|
2022-02-04 10:12:15 +01:00
|
|
|
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
|
|
|
|
import eu.dnetlib.enabling.datasources.common.AggregationStage;
|
|
|
|
import eu.dnetlib.enabling.datasources.common.DsmException;
|
|
|
|
import eu.dnetlib.miscutils.datetime.DateUtils;
|
|
|
|
import eu.dnetlib.openaire.common.Utils;
|
|
|
|
|
|
|
|
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
|
|
|
|
import eu.dnetlib.openaire.dsm.domain.CollectionInfo;
|
|
|
|
import eu.dnetlib.openaire.dsm.domain.CollectionMode;
|
|
|
|
import eu.dnetlib.openaire.dsm.domain.TransformationInfo;
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
import org.apache.commons.lang3.time.DateFormatUtils;
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
import org.apache.http.HttpStatus;
|
|
|
|
import org.bson.Document;
|
|
|
|
import org.bson.conversions.Bson;
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
|
|
|
import org.springframework.cache.annotation.CacheEvict;
|
|
|
|
import org.springframework.cache.annotation.Cacheable;
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
import static com.mongodb.client.model.Filters.*;
|
|
|
|
import static com.mongodb.client.model.Projections.fields;
|
|
|
|
/**
|
|
|
|
* Created by claudio on 20/10/2016.
|
|
|
|
*/
|
|
|
|
@Component
|
|
|
|
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
|
|
|
|
public class MongoLoggerClientImpl implements MongoLoggerClient {
|
|
|
|
|
|
|
|
private static final Log log = LogFactory.getLog(MongoLoggerClientImpl.class);
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
private MongoClient datasourcePublisherMongoClient;
|
|
|
|
|
|
|
|
@Autowired
|
2022-02-07 10:09:18 +01:00
|
|
|
private DnetOpenaireExporterProperties config;
|
2022-02-04 10:12:15 +01:00
|
|
|
|
|
|
|
private final static String LOADTIME = "loadtime";
|
|
|
|
private final LoadingCache<String, Instant> loadingCache = CacheBuilder.newBuilder()
|
|
|
|
.maximumSize(1)
|
|
|
|
.expireAfterWrite(60, TimeUnit.MINUTES)
|
|
|
|
.build(new CacheLoader<String, Instant>() {
|
|
|
|
// The only cached value is associated to "loadtime"
|
|
|
|
public Instant load(String key) {
|
|
|
|
final Instant loadTime = getLoadTime();
|
|
|
|
log.debug("found load time: " + loadTime.toString());
|
|
|
|
return loadTime;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
|
|
private static final Bson fields = getFields();
|
|
|
|
|
|
|
|
private static MongoCollection<Document> collection = null;
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Cacheable("dsm-aggregationhistory-cache")
|
|
|
|
public List<AggregationInfo> getAggregationHistory(final String dsId) throws DsmException {
|
|
|
|
log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId));
|
|
|
|
final Datasource conf = config.getDatasource();
|
|
|
|
try {
|
|
|
|
final FindIterable<Document> aggregationDocs = getCollection().find(queryForAggregationHistory(dsId, "(collect|transform)"))
|
|
|
|
.projection(fields)
|
|
|
|
.limit(conf.getMongoQueryLimit())
|
|
|
|
.sort(dbo("system:startHumanDate", -1));
|
|
|
|
|
|
|
|
final List<AggregationInfo> aggregationInfos = Utils.stream(aggregationDocs.iterator())
|
|
|
|
.map(getMapper())
|
|
|
|
.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
|
|
|
|
.collect(Collectors.toList());
|
|
|
|
|
|
|
|
final Instant loadTime = loadingCache.get(LOADTIME);
|
|
|
|
|
|
|
|
if (!Objects.equals(Instant.MIN, loadTime)) {
|
|
|
|
for (final AggregationInfo a : aggregationInfos) {
|
|
|
|
if (asInstant(a).isBefore(loadTime) && AggregationStage.COLLECT.equals(a.getAggregationStage())) {
|
|
|
|
a.setIndexedVersion(true);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return aggregationInfos;
|
|
|
|
} catch (Throwable e) {
|
|
|
|
throw new DsmException(HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error reading aggregation history for '%s'", dsId), e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private Instant getLoadTime() {
|
|
|
|
log.warn("querying for metadata load time, not using cache");
|
|
|
|
return Optional.ofNullable(getCollection().find(queryForLastMetadataLoad()))
|
|
|
|
.map(d -> d.sort(dbo("system:startHumanDate", -1)).first())
|
|
|
|
.map(d -> (String) d.getOrDefault("system:startHumanDate", ""))
|
|
|
|
.map(s -> Instant.parse(s.replaceAll("\\+.*", "Z")))
|
|
|
|
.orElse(Instant.MIN);
|
|
|
|
}
|
|
|
|
|
|
|
|
private Instant asInstant(final AggregationInfo a) {
|
|
|
|
return Instant.parse(a.getDate() + "T00:00:00Z");
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@CacheEvict(cacheNames = { "dsm-aggregationhistory-cache", "dsm-firstharvestdate-cache" }, allEntries = true)
|
|
|
|
@Scheduled(fixedDelayString = "${openaire.exporter.cache.ttl}")
|
|
|
|
public void dropCache() {
|
|
|
|
log.debug("dropped dsManager aggregation history cache");
|
|
|
|
}
|
|
|
|
|
|
|
|
private Function<Document, AggregationInfo> getMapper() {
|
|
|
|
return new Function<Document, AggregationInfo>() {
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public AggregationInfo apply(final Document d) {
|
|
|
|
|
|
|
|
AggregationInfo info = null;
|
|
|
|
final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
|
|
|
|
switch (stage) {
|
|
|
|
|
|
|
|
case COLLECT:
|
|
|
|
CollectionInfo cInfo = new CollectionInfo();
|
|
|
|
cInfo.setAggregationStage(stage);
|
|
|
|
cInfo.setCollectionMode(getCollectionMode(d));
|
|
|
|
cInfo.setNumberOfRecords(getNumberOfRecords(d));
|
|
|
|
cInfo.setDate(getDate(d));
|
|
|
|
info = cInfo;
|
|
|
|
break;
|
|
|
|
case TRANSFORM:
|
|
|
|
TransformationInfo tInfo = new TransformationInfo();
|
|
|
|
tInfo.setAggregationStage(stage);
|
|
|
|
tInfo.setNumberOfRecords(getNumberOfRecords(d));
|
|
|
|
tInfo.setDate(getDate(d));
|
|
|
|
info = tInfo;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
return info;
|
|
|
|
}
|
|
|
|
|
|
|
|
private CollectionMode getCollectionMode(Document d) {
|
|
|
|
return Optional.ofNullable(d.getString("system:node:SELECT_MODE:selection"))
|
|
|
|
.map(CollectionMode::valueOf)
|
|
|
|
.orElseGet(() ->
|
|
|
|
Optional.ofNullable(d.getString("collectionMode"))
|
|
|
|
.map(CollectionMode::valueOf)
|
|
|
|
.orElse(null));
|
|
|
|
}
|
|
|
|
|
|
|
|
private Integer getNumberOfRecords(final Document d) {
|
|
|
|
final String sinkSize = d.getString("mainlog:sinkSize");
|
|
|
|
final String total = d.getString("mainlog:total");
|
|
|
|
|
|
|
|
if (StringUtils.isNotBlank(sinkSize)) {
|
|
|
|
return Ints.tryParse(sinkSize);
|
|
|
|
} else if (StringUtils.isNotBlank(total)) {
|
|
|
|
return Ints.tryParse(total);
|
|
|
|
} else {
|
|
|
|
return -1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
private String getDate(final Document d) {
|
|
|
|
final String dateString = d.getString("system:startHumanDate");
|
|
|
|
if (StringUtils.isBlank(dateString)) { return ""; }
|
|
|
|
return DateFormatUtils.format(new DateUtils().parse(dateString), DsmMappingUtils.DATE_FORMAT);
|
|
|
|
}
|
|
|
|
|
|
|
|
private static Bson getFields() {
|
|
|
|
return fields(
|
|
|
|
eq("system:wfName", 1),
|
|
|
|
eq("system:node:SELECT_MODE:selection", 1),
|
|
|
|
eq("collectionMode", 1),
|
|
|
|
eq("mainlog:sinkSize", 1),
|
|
|
|
eq("mainlog:writeOps", 1),
|
|
|
|
eq("mainlog:total", 1),
|
|
|
|
eq("system:startHumanDate", 1),
|
|
|
|
eq("system:profileName", 1));
|
|
|
|
}
|
|
|
|
|
|
|
|
private static BasicDBObject dbo(final String key, final Object value) {
|
|
|
|
return new BasicDBObject(key, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
private Bson queryForAggregationHistory(final String dsId, final String pattern) {
|
|
|
|
return and(
|
|
|
|
eq("parentDatasourceId", dsId),
|
|
|
|
eq("system:profileFamily", "aggregator"),
|
|
|
|
eq("system:isCompletedSuccessfully", "true"),
|
|
|
|
regex("system:wfName", pattern, "i"));
|
|
|
|
}
|
|
|
|
|
|
|
|
private Bson queryForLastMetadataLoad() {
|
|
|
|
try {
|
|
|
|
final String contentLoadQuery = config.getContentLoadQuery();
|
|
|
|
log.debug("parsing content load query: " + contentLoadQuery);
|
|
|
|
return new ObjectMapper().readValue(contentLoadQuery, BasicDBObject.class);
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new IllegalArgumentException(e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private synchronized MongoCollection<Document> getCollection() {
|
|
|
|
if (collection == null) {
|
|
|
|
log.info("inizializing mongodb collection ...");
|
|
|
|
final Datasource conf = config.getDatasource();
|
|
|
|
collection = datasourcePublisherMongoClient.getDatabase(conf.getMongoDbName()).getCollection(conf.getMongoCollectionName());
|
|
|
|
}
|
|
|
|
return collection;
|
|
|
|
}
|
|
|
|
}
|