package eu.dnetlib.openaire.dsm.dao; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; import static com.mongodb.client.model.Filters.regex; import static com.mongodb.client.model.Projections.fields; import java.time.Instant; import java.time.LocalDate; import java.time.ZoneId; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpStatus; import org.bson.Document; import org.bson.conversions.Bson; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.primitives.Ints; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import eu.dnetlib.DnetOpenaireExporterProperties; import eu.dnetlib.DnetOpenaireExporterProperties.Datasource; import eu.dnetlib.miscutils.datetime.DateUtils; import eu.dnetlib.openaire.common.Utils; import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils; import eu.dnetlib.openaire.exporter.exceptions.DsmApiException; import eu.dnetlib.openaire.exporter.model.dsm.AggregationInfo; import eu.dnetlib.openaire.exporter.model.dsm.AggregationInfoV1; import eu.dnetlib.openaire.exporter.model.dsm.AggregationStage; import eu.dnetlib.openaire.exporter.model.dsm.CollectionMode; import eu.dnetlib.openaire.info.JdbcInfoDao; /** * Created by claudio on 20/10/2016. */ @Component @ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true") public class MongoLoggerClientImpl implements MongoLoggerClient { private static final Log log = LogFactory.getLog(MongoLoggerClientImpl.class); @Autowired private MongoClient datasourcePublisherMongoClient; @Autowired private DnetOpenaireExporterProperties config; @Autowired private JdbcInfoDao jdbcInfoDao; private final static String LOADTIME = "loadtime"; private final LoadingCache loadingCache = CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(60, TimeUnit.MINUTES).build(new CacheLoader() { // The only cached value is associated to "loadtime" @Override public Instant load(final String key) { final Instant loadTime = getLoadTime(); log.debug("found load time: " + loadTime.toString()); return loadTime; } }); private static final Bson fields = getFields(); private static MongoCollection collection = null; @Override @Cacheable("dsm-aggregationhistory-cache-v1") @Deprecated public List getAggregationHistoryV1(final String dsId) throws DsmApiException { return getAggregationHistoryV1(dsId, queryForAggregationHistoryV1(dsId, "(collect|transform)"), getMapperV1()); } @Override @Cacheable("dsm-aggregationhistory-cache-v2") public List getAggregationHistoryV2(final String dsId) throws DsmApiException { return getAggregationHistoryV2(dsId, queryForAggregationHistoryV2(dsId, "(collect|transform)"), getMapperV2()); } @Deprecated private List getAggregationHistoryV1(final String dsId, final Bson queryForAggregationHistory, final Function mapper) throws DsmApiException { log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId)); final Datasource conf = config.getDatasource(); try { final FindIterable aggregationDocs = getCollection().find(queryForAggregationHistory) .projection(fields) .limit(conf.getMongoQueryLimit()) .sort(dbo("system:startHumanDate", -1)); final List aggregationInfos = Utils.stream(aggregationDocs.iterator()) .map(mapper) .filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate())) .collect(Collectors.toList()); final Instant loadTime = loadingCache.get(LOADTIME); if (!Objects.equals(Instant.MIN, loadTime)) { for (final AggregationInfoV1 a : aggregationInfos) { if (asInstant(a).isBefore(loadTime) && AggregationStage.COLLECT.equals(a.getAggregationStage())) { a.setIndexedVersion(true); break; } } } return aggregationInfos; } catch (final Throwable e) { throw new DsmApiException(HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error reading aggregation history for '%s'", dsId), e); } } private List getAggregationHistoryV2(final String dsId, final Bson queryForAggregationHistory, final Function mapper) throws DsmApiException { log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId)); final Datasource conf = config.getDatasource(); try { final FindIterable aggregationDocs = getCollection().find(queryForAggregationHistory) .projection(fields) .limit(conf.getMongoQueryLimit()) .sort(dbo("system:startHumanDate", -1)); final List aggregationInfos = Utils.stream(aggregationDocs.iterator()) .map(mapper) .filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate())) .collect(Collectors.toList()); final Instant loadTime = loadingCache.get(LOADTIME); if (!Objects.equals(Instant.MIN, loadTime)) { for (final AggregationInfo a : aggregationInfos) { if (asInstant(a).isBefore(loadTime) && AggregationStage.COLLECT.equals(a.getAggregationStage())) { a.setIndexedVersion(true); break; } } } return aggregationInfos; } catch (final Throwable e) { throw new DsmApiException(HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error reading aggregation history for '%s'", dsId), e); } } private Instant getLoadTime() { log.warn("querying for metadata load time, not using cache"); final LocalDate date = jdbcInfoDao.getDate(JdbcInfoDao.DATE_INFO.oaf_load_date); return date.atStartOfDay(ZoneId.systemDefault()).toInstant(); } private Instant asInstant(final AggregationInfo a) { return Instant.parse(a.getDate() + "T00:00:00Z"); } @Deprecated private Instant asInstant(final AggregationInfoV1 a) { return Instant.parse(a.getDate() + "T00:00:00Z"); } @Override @CacheEvict(cacheNames = { "dsm-aggregationhistory-cache-v1", "dsm-aggregationhistory-cache-v2", "dsm-firstharvestdate-cache" }, allEntries = true) @Scheduled(fixedDelayString = "${openaire.exporter.cache.ttl}") public void dropCache() { log.debug("dropped dsManager aggregation history cache"); } @Deprecated private Function getMapperV1() { return d -> { AggregationInfoV1 info = null; final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName")); final boolean success = isCompletedSuccesfully(d); switch (stage) { case COLLECT: final AggregationInfoV1 cInfo = new AggregationInfoV1(); cInfo.setAggregationStage(stage); cInfo.setCollectionMode(getCollectionMode(d)); cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0); cInfo.setDate(getDate(d)); info = cInfo; break; case TRANSFORM: final AggregationInfoV1 tInfo = new AggregationInfoV1(); tInfo.setAggregationStage(stage); tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0); tInfo.setDate(getDate(d)); info = tInfo; break; } return info; }; } private Function getMapperV2() { return d -> { AggregationInfo info = null; final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName")); final boolean success = isCompletedSuccesfully(d); switch (stage) { case COLLECT: final AggregationInfo cInfo = new AggregationInfo(); cInfo.setAggregationStage(stage); cInfo.setCollectionMode(getCollectionMode(d)); cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0); cInfo.setDate(getDate(d)); cInfo.setCompletedSuccessfully(success); info = cInfo; break; case TRANSFORM: final AggregationInfo tInfo = new AggregationInfo(); tInfo.setAggregationStage(stage); tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0); tInfo.setDate(getDate(d)); tInfo.setCompletedSuccessfully(success); info = tInfo; break; } return info; }; } private CollectionMode getCollectionMode(final Document d) { return Optional.ofNullable(d.getString("system:node:SELECT_MODE:selection")) .map(CollectionMode::valueOf) .orElseGet(() -> Optional.ofNullable(d.getString("collectionMode")) .map(CollectionMode::valueOf) .orElse(null)); } private Integer getNumberOfRecords(final Document d) { final String sinkSize = d.getString("mainlog:sinkSize"); final String total = d.getString("mainlog:total"); if (StringUtils.isNotBlank(sinkSize)) { return Ints.tryParse(sinkSize); } if (StringUtils.isNotBlank(total)) { return Ints.tryParse(total); } return -1; } private String getDate(final Document d) { final String dateString = d.getString("system:startHumanDate"); if (StringUtils.isBlank(dateString)) { return ""; } return DateFormatUtils.format(new DateUtils().parse(dateString), DsmMappingUtils.DATE_FORMAT); } private boolean isCompletedSuccesfully(final Document d) { final String boolString = d.getString("system:isCompletedSuccessfully"); return BooleanUtils.toBoolean(boolString); } private static Bson getFields() { return fields(eq("system:wfName", 1), eq("system:node:SELECT_MODE:selection", 1), eq("collectionMode", 1), eq("mainlog:sinkSize", 1), eq("mainlog:writeOps", 1), eq("mainlog:total", 1), eq("system:startHumanDate", 1), eq("system:profileName", 1), eq("system:isCompletedSuccessfully", 1)); } private static BasicDBObject dbo(final String key, final Object value) { return new BasicDBObject(key, value); } @Deprecated private Bson queryForAggregationHistoryV1(final String dsId, final String pattern) { return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), eq("system:isCompletedSuccessfully", "true"), regex("system:wfName", pattern, "i")); } private Bson queryForAggregationHistoryV2(final String dsId, final String pattern) { return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), regex("system:wfName", pattern, "i")); } private synchronized MongoCollection getCollection() { if (collection == null) { log.info("inizializing mongodb collection ..."); final Datasource conf = config.getDatasource(); collection = datasourcePublisherMongoClient.getDatabase(conf.getMongoDbName()).getCollection(conf.getMongoCollectionName()); } return collection; } }