api for wf history V2

This commit is contained in:
Michele Artini 2023-01-23 10:27:07 +01:00
parent ff313b5bd1
commit 3dc121045c
13 changed files with 238 additions and 97 deletions

View File

@ -1,29 +1,18 @@
package eu.dnetlib;
import static java.util.Arrays.asList;
import org.springframework.boot.autoconfigure.cache.CacheManagerCustomizer;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.stereotype.Component;
import static java.util.Arrays.asList;
@Component
public class CacheCustomizer implements CacheManagerCustomizer<ConcurrentMapCacheManager> {
@Override
public void customize(final ConcurrentMapCacheManager cacheManager) {
cacheManager.setCacheNames(
asList(
"fundingpath-ids",
"indexdsinfo-cache",
"objectstoreid-cache",
"context-cache",
"context-cache-funder",
"context-cache-community",
"dsm-aggregationhistory-cache",
"dsm-firstharvestdate-cache",
"vocabularies-cache",
"community-cache",
"info"));
}
@Override
public void customize(final ConcurrentMapCacheManager cacheManager) {
cacheManager
.setCacheNames(asList("fundingpath-ids", "indexdsinfo-cache", "objectstoreid-cache", "context-cache", "context-cache-funder", "context-cache-community", "dsm-aggregationhistory-cache-v1", "dsm-aggregationhistory-cache-v2", "dsm-firstharvestdate-cache", "vocabularies-cache", "community-cache", "info"));
}
}
}

View File

@ -29,7 +29,7 @@ import eu.dnetlib.enabling.datasources.common.DsmForbiddenException;
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
import eu.dnetlib.openaire.common.AbstractExporterController;
import eu.dnetlib.openaire.common.OperationManager;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponse;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponseV1;
import eu.dnetlib.openaire.dsm.domain.ApiDetails;
import eu.dnetlib.openaire.dsm.domain.ApiDetailsResponse;
import eu.dnetlib.openaire.dsm.domain.DatasourceDetailResponse;
@ -97,16 +97,16 @@ public class DsmApiController extends AbstractExporterController {
@GetMapping(value = "/ds/aggregationhistory/{dsId}", produces = {
"application/json"
})
@Operation(summary = "search datasources", description = "Returns list of Datasource details.", tags = {
@Operation(summary = "return aggregation hitory", description = "Returns the aggregation history of a datasource", tags = {
DS, R
})
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "500", description = "unexpected error")
})
public AggregationHistoryResponse aggregationHistory(@PathVariable final String dsId) throws DsmException {
public AggregationHistoryResponseV1 aggregationHistory(@PathVariable final String dsId) throws DsmException {
final StopWatch stop = StopWatch.createStarted();
final AggregationHistoryResponse rsp = dsmCore.aggregationhistory(dsId);
final AggregationHistoryResponseV1 rsp = dsmCore.aggregationhistoryV1(dsId);
return prepareResponse(0, rsp.getAggregationInfo().size(), stop, rsp);
}

View File

@ -15,8 +15,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import eu.dnetlib.enabling.datasources.common.DsmException;
import eu.dnetlib.openaire.common.AbstractExporterController;
import eu.dnetlib.openaire.dsm.dao.ResponseUtils;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponseV2;
import eu.dnetlib.openaire.dsm.domain.SimpleDatasourceInfo;
import eu.dnetlib.openaire.dsm.domain.SimpleResponse;
import io.swagger.v3.oas.annotations.Operation;
@ -83,4 +85,20 @@ public class DsmApiControllerV2 extends AbstractExporterController {
return prepareResponse(1, list.size(), stop, rsp);
}
@GetMapping(value = "/aggregationhistory/{dsId}", produces = {
"application/json"
})
@Operation(summary = "Return history", description = "Returns the aggregation history of the datasources (it includes also the failures)", tags = {
DS, R
})
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "OK"),
@ApiResponse(responseCode = "500", description = "unexpected error")
})
public AggregationHistoryResponseV2 aggregationHistory(@PathVariable final String dsId) throws DsmException {
final StopWatch stop = StopWatch.createStarted();
final AggregationHistoryResponseV2 rsp = dsmCore.aggregationhistoryV2(dsId);
return prepareResponse(0, rsp.getAggregationInfo().size(), stop, rsp);
}
}

View File

@ -40,7 +40,9 @@ import eu.dnetlib.openaire.dsm.dao.MongoLoggerClient;
import eu.dnetlib.openaire.dsm.dao.ResponseUtils;
import eu.dnetlib.openaire.dsm.dao.VocabularyClient;
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponse;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponseV1;
import eu.dnetlib.openaire.dsm.domain.AggregationHistoryResponseV2;
import eu.dnetlib.openaire.dsm.domain.AggregationInfoV1;
import eu.dnetlib.openaire.dsm.domain.ApiDetails;
import eu.dnetlib.openaire.dsm.domain.ApiDetailsResponse;
import eu.dnetlib.openaire.dsm.domain.DatasourceDetailResponse;
@ -313,9 +315,17 @@ public class DsmCore {
}
}
public AggregationHistoryResponse aggregationhistory(final String dsId) throws DsmException {
final List<AggregationInfo> history = mongoLoggerClient.getAggregationHistory(dsId);
final AggregationHistoryResponse rsp = new AggregationHistoryResponse(history);
@Deprecated
public AggregationHistoryResponseV1 aggregationhistoryV1(final String dsId) throws DsmException {
final List<AggregationInfoV1> history = mongoLoggerClient.getAggregationHistoryV1(dsId);
final AggregationHistoryResponseV1 rsp = new AggregationHistoryResponseV1(history);
rsp.setHeader(ResponseUtils.header(history.size()));
return rsp;
}
public AggregationHistoryResponseV2 aggregationhistoryV2(final String dsId) throws DsmException {
final List<AggregationInfo> history = mongoLoggerClient.getAggregationHistoryV2(dsId);
final AggregationHistoryResponseV2 rsp = new AggregationHistoryResponseV2(history);
rsp.setHeader(ResponseUtils.header(history.size()));
return rsp;

View File

@ -1,13 +1,16 @@
package eu.dnetlib.openaire.dsm.dao;
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
import eu.dnetlib.enabling.datasources.common.DsmException;
import java.util.List;
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
import eu.dnetlib.enabling.datasources.common.DsmException;
import eu.dnetlib.openaire.dsm.domain.AggregationInfoV1;
public interface MongoLoggerClient {
List<AggregationInfo> getAggregationHistory(final String dsId) throws DsmException;
List<AggregationInfoV1> getAggregationHistoryV1(final String dsId) throws DsmException;
List<AggregationInfo> getAggregationHistoryV2(final String dsId) throws DsmException;
void dropCache();

View File

@ -47,9 +47,12 @@ import eu.dnetlib.enabling.datasources.common.DsmException;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.openaire.common.Utils;
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
import eu.dnetlib.openaire.dsm.domain.CollectionInfo;
import eu.dnetlib.openaire.dsm.domain.AggregationInfoV1;
import eu.dnetlib.openaire.dsm.domain.CollectionInfoV1;
import eu.dnetlib.openaire.dsm.domain.CollectionInfoV2;
import eu.dnetlib.openaire.dsm.domain.CollectionMode;
import eu.dnetlib.openaire.dsm.domain.TransformationInfo;
import eu.dnetlib.openaire.dsm.domain.TransformationInfoV1;
import eu.dnetlib.openaire.dsm.domain.TransformationInfoV2;
import eu.dnetlib.openaire.info.JdbcInfoDao;
/**
@ -71,10 +74,8 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
private JdbcInfoDao jdbcInfoDao;
private final static String LOADTIME = "loadtime";
private final LoadingCache<String, Instant> loadingCache = CacheBuilder.newBuilder()
.maximumSize(1)
.expireAfterWrite(60, TimeUnit.MINUTES)
.build(new CacheLoader<String, Instant>() {
private final LoadingCache<String, Instant> loadingCache =
CacheBuilder.newBuilder().maximumSize(1).expireAfterWrite(60, TimeUnit.MINUTES).build(new CacheLoader<String, Instant>() {
// The only cached value is associated to "loadtime"
@Override
@ -90,18 +91,31 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
private static MongoCollection<Document> collection = null;
@Override
@Cacheable("dsm-aggregationhistory-cache")
public List<AggregationInfo> getAggregationHistory(final String dsId) throws DsmException {
@Cacheable("dsm-aggregationhistory-cache-v1")
@Deprecated
public List<AggregationInfoV1> getAggregationHistoryV1(final String dsId) throws DsmException {
return getAggregationHistory(dsId, queryForAggregationHistoryV1(dsId, "(collect|transform)"), getMapperV1());
}
@Override
@Cacheable("dsm-aggregationhistory-cache-v2")
public List<AggregationInfo> getAggregationHistoryV2(final String dsId) throws DsmException {
return getAggregationHistory(dsId, queryForAggregationHistoryV2(dsId, "(collect|transform)"), getMapperV2());
}
private <T extends AggregationInfo> List<T> getAggregationHistory(final String dsId,
final Bson queryForAggregationHistory,
final Function<Document, T> mapper) throws DsmException {
log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId));
final Datasource conf = config.getDatasource();
try {
final FindIterable<Document> aggregationDocs = getCollection().find(queryForAggregationHistory(dsId, "(collect|transform)"))
final FindIterable<Document> aggregationDocs = getCollection().find(queryForAggregationHistory)
.projection(fields)
.limit(conf.getMongoQueryLimit())
.sort(dbo("system:startHumanDate", -1));
final List<AggregationInfo> aggregationInfos = Utils.stream(aggregationDocs.iterator())
.map(getMapper())
final List<T> aggregationInfos = Utils.stream(aggregationDocs.iterator())
.map(mapper)
.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
.collect(Collectors.toList());
@ -134,69 +148,97 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
@Override
@CacheEvict(cacheNames = {
"dsm-aggregationhistory-cache", "dsm-firstharvestdate-cache"
"dsm-aggregationhistory-cache-v1", "dsm-aggregationhistory-cache-v2", "dsm-firstharvestdate-cache"
}, allEntries = true)
@Scheduled(fixedDelayString = "${openaire.exporter.cache.ttl}")
public void dropCache() {
log.debug("dropped dsManager aggregation history cache");
}
private Function<Document, AggregationInfo> getMapper() {
return new Function<Document, AggregationInfo>() {
@Deprecated
private Function<Document, AggregationInfoV1> getMapperV1() {
return d -> {
@Override
public AggregationInfo apply(final Document d) {
AggregationInfoV1 info = null;
final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
final boolean success = isCompletedSuccesfully(d);
AggregationInfo info = null;
final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
final boolean success = isCompletedSuccesfully(d);
switch (stage) {
switch (stage) {
case COLLECT:
final CollectionInfo cInfo = new CollectionInfo();
cInfo.setAggregationStage(stage);
cInfo.setCollectionMode(getCollectionMode(d));
cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
cInfo.setDate(getDate(d));
cInfo.setCompletedSuccessfully(success);
info = cInfo;
break;
case TRANSFORM:
final TransformationInfo tInfo = new TransformationInfo();
tInfo.setAggregationStage(stage);
tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
tInfo.setDate(getDate(d));
tInfo.setCompletedSuccessfully(success);
info = tInfo;
break;
}
return info;
}
private CollectionMode getCollectionMode(final Document d) {
return Optional.ofNullable(d.getString("system:node:SELECT_MODE:selection"))
.map(CollectionMode::valueOf)
.orElseGet(() -> Optional.ofNullable(d.getString("collectionMode"))
.map(CollectionMode::valueOf)
.orElse(null));
}
private Integer getNumberOfRecords(final Document d) {
final String sinkSize = d.getString("mainlog:sinkSize");
final String total = d.getString("mainlog:total");
if (StringUtils.isNotBlank(sinkSize)) {
return Ints.tryParse(sinkSize);
} else if (StringUtils.isNotBlank(total)) {
return Ints.tryParse(total);
} else {
return -1;
}
case COLLECT:
final CollectionInfoV1 cInfo = new CollectionInfoV1();
cInfo.setAggregationStage(stage);
cInfo.setCollectionMode(getCollectionMode(d));
cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
cInfo.setDate(getDate(d));
cInfo.setCompletedSuccessfully(success);
info = cInfo;
break;
case TRANSFORM:
final TransformationInfoV1 tInfo = new TransformationInfoV1();
tInfo.setAggregationStage(stage);
tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
tInfo.setDate(getDate(d));
tInfo.setCompletedSuccessfully(success);
info = tInfo;
break;
}
return info;
};
}
private Function<Document, AggregationInfo> getMapperV2() {
return d -> {
AggregationInfo info = null;
final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
final boolean success = isCompletedSuccesfully(d);
switch (stage) {
case COLLECT:
final CollectionInfoV2 cInfo = new CollectionInfoV2();
cInfo.setAggregationStage(stage);
cInfo.setCollectionMode(getCollectionMode(d));
cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
cInfo.setDate(getDate(d));
cInfo.setCompletedSuccessfully(success);
info = cInfo;
break;
case TRANSFORM:
final TransformationInfoV2 tInfo = new TransformationInfoV2();
tInfo.setAggregationStage(stage);
tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
tInfo.setDate(getDate(d));
tInfo.setCompletedSuccessfully(success);
info = tInfo;
break;
}
return info;
};
}
private CollectionMode getCollectionMode(final Document d) {
return Optional.ofNullable(d.getString("system:node:SELECT_MODE:selection"))
.map(CollectionMode::valueOf)
.orElseGet(() -> Optional.ofNullable(d.getString("collectionMode"))
.map(CollectionMode::valueOf)
.orElse(null));
}
private Integer getNumberOfRecords(final Document d) {
final String sinkSize = d.getString("mainlog:sinkSize");
final String total = d.getString("mainlog:total");
if (StringUtils.isNotBlank(sinkSize)) {
return Ints.tryParse(sinkSize);
} else if (StringUtils.isNotBlank(total)) {
return Ints.tryParse(total);
} else {
return -1;
}
}
private String getDate(final Document d) {
final String dateString = d.getString("system:startHumanDate");
if (StringUtils.isBlank(dateString)) { return ""; }
@ -216,7 +258,12 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
return new BasicDBObject(key, value);
}
private Bson queryForAggregationHistory(final String dsId, final String pattern) {
@Deprecated
private Bson queryForAggregationHistoryV1(final String dsId, final String pattern) {
return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), eq("system:isCompletedSuccessfully", "true"), regex("system:wfName", pattern, "i"));
}
private Bson queryForAggregationHistoryV2(final String dsId, final String pattern) {
return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), regex("system:wfName", pattern, "i"));
}

View File

@ -0,0 +1,25 @@
package eu.dnetlib.openaire.dsm.domain;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
@JsonAutoDetect
@Deprecated
public class AggregationHistoryResponseV1 extends Response {
private List<AggregationInfoV1> aggregationInfo;
public AggregationHistoryResponseV1(final List<AggregationInfoV1> aggregationInfo) {
super();
this.aggregationInfo = aggregationInfo;
}
public List<AggregationInfoV1> getAggregationInfo() {
return aggregationInfo;
}
public void setAggregationInfo(final List<AggregationInfoV1> aggregationInfo) {
this.aggregationInfo = aggregationInfo;
}
}

View File

@ -7,11 +7,11 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
@JsonAutoDetect
public class AggregationHistoryResponse extends Response {
public class AggregationHistoryResponseV2 extends Response {
private List<AggregationInfo> aggregationInfo;
public AggregationHistoryResponse(final List<AggregationInfo> aggregationInfo) {
public AggregationHistoryResponseV2(final List<AggregationInfo> aggregationInfo) {
super();
this.aggregationInfo = aggregationInfo;
}

View File

@ -0,0 +1,15 @@
package eu.dnetlib.openaire.dsm.domain;
import com.fasterxml.jackson.annotation.JsonIgnore;
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
public class AggregationInfoV1 extends AggregationInfo {
@Override
@JsonIgnore
public boolean isCompletedSuccessfully() {
return super.isCompletedSuccessfully();
}
}

View File

@ -0,0 +1,22 @@
package eu.dnetlib.openaire.dsm.domain;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
/**
* Created by claudio on 29/11/2016.
*/
@JsonAutoDetect
@Deprecated
public class CollectionInfoV1 extends AggregationInfoV1 {
private CollectionMode collectionMode;
public CollectionMode getCollectionMode() {
return collectionMode;
}
public void setCollectionMode(final CollectionMode collectionMode) {
this.collectionMode = collectionMode;
}
}

View File

@ -9,7 +9,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
* Created by claudio on 29/11/2016.
*/
@JsonAutoDetect
public class CollectionInfo extends AggregationInfo {
public class CollectionInfoV2 extends AggregationInfo {
private CollectionMode collectionMode;

View File

@ -0,0 +1,12 @@
package eu.dnetlib.openaire.dsm.domain;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
/**
* Created by claudio on 29/11/2016.
*/
@JsonAutoDetect
@Deprecated
public class TransformationInfoV1 extends AggregationInfoV1 {
}

View File

@ -8,6 +8,6 @@ import eu.dnetlib.enabling.datasources.common.AggregationInfo;
* Created by claudio on 29/11/2016.
*/
@JsonAutoDetect
public class TransformationInfo extends AggregationInfo {
public class TransformationInfoV2 extends AggregationInfo {
}