added failed wfs in history

This commit is contained in:
Michele Artini 2022-08-17 11:15:34 +02:00
parent db400dbfe0
commit ec666bf88e
1 changed files with 14 additions and 4 deletions

View File

@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -148,21 +149,25 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
AggregationInfo info = null; AggregationInfo info = null;
final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName")); final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
final boolean success = isCompletedSuccesfully(d);
switch (stage) { switch (stage) {
case COLLECT: case COLLECT:
final CollectionInfo cInfo = new CollectionInfo(); final CollectionInfo cInfo = new CollectionInfo();
cInfo.setAggregationStage(stage); cInfo.setAggregationStage(stage);
cInfo.setCollectionMode(getCollectionMode(d)); cInfo.setCollectionMode(getCollectionMode(d));
cInfo.setNumberOfRecords(getNumberOfRecords(d)); cInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
cInfo.setDate(getDate(d)); cInfo.setDate(getDate(d));
cInfo.setCompletedSuccessfully(success);
info = cInfo; info = cInfo;
break; break;
case TRANSFORM: case TRANSFORM:
final TransformationInfo tInfo = new TransformationInfo(); final TransformationInfo tInfo = new TransformationInfo();
tInfo.setAggregationStage(stage); tInfo.setAggregationStage(stage);
tInfo.setNumberOfRecords(getNumberOfRecords(d)); tInfo.setNumberOfRecords(success ? getNumberOfRecords(d) : 0);
tInfo.setDate(getDate(d)); tInfo.setDate(getDate(d));
tInfo.setCompletedSuccessfully(success);
info = tInfo; info = tInfo;
break; break;
} }
@ -198,8 +203,13 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
return DateFormatUtils.format(new DateUtils().parse(dateString), DsmMappingUtils.DATE_FORMAT); return DateFormatUtils.format(new DateUtils().parse(dateString), DsmMappingUtils.DATE_FORMAT);
} }
private boolean isCompletedSuccesfully(final Document d) {
final String boolString = d.getString("system:isCompletedSuccessfully");
return BooleanUtils.toBoolean(boolString);
}
private static Bson getFields() { private static Bson getFields() {
return fields(eq("system:wfName", 1), eq("system:node:SELECT_MODE:selection", 1), eq("collectionMode", 1), eq("mainlog:sinkSize", 1), eq("mainlog:writeOps", 1), eq("mainlog:total", 1), eq("system:startHumanDate", 1), eq("system:profileName", 1)); return fields(eq("system:wfName", 1), eq("system:node:SELECT_MODE:selection", 1), eq("collectionMode", 1), eq("mainlog:sinkSize", 1), eq("mainlog:writeOps", 1), eq("mainlog:total", 1), eq("system:startHumanDate", 1), eq("system:profileName", 1), eq("system:isCompletedSuccessfully", 1));
} }
private static BasicDBObject dbo(final String key, final Object value) { private static BasicDBObject dbo(final String key, final Object value) {
@ -207,7 +217,7 @@ public class MongoLoggerClientImpl implements MongoLoggerClient {
} }
private Bson queryForAggregationHistory(final String dsId, final String pattern) { private Bson queryForAggregationHistory(final String dsId, final String pattern) {
return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), eq("system:isCompletedSuccessfully", "true"), regex("system:wfName", pattern, "i")); return and(eq("parentDatasourceId", dsId), eq("system:profileFamily", "aggregator"), regex("system:wfName", pattern, "i"));
} }
private synchronized MongoCollection<Document> getCollection() { private synchronized MongoCollection<Document> getCollection() {