Improved aggregator

This commit is contained in:
Luca Frosini 2024-02-13 16:03:10 +01:00
parent a11a3ba8e8
commit d2a67855c1
14 changed files with 328 additions and 163 deletions

View File

@ -71,6 +71,11 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.gcube.common</groupId>
<artifactId>authorization-utils</artifactId>
<version>[2.2.0, 3.0.0-SNAPSHOT)</version>
</dependency>
<!-- Test Dependencies --> <!-- Test Dependencies -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -24,8 +24,8 @@ public abstract class DocumentElaboration {
protected static final String ID = Record.ID; protected static final String ID = Record.ID;
protected static final int THRESHOLD_FOR_FIVE_PERCENT = 100000; protected static final int THRESHOLD_FOR_FIVE_PERCENT = 10000;
protected static final int THRESHOLD_FOR_ONE_PERCENT = 1000000; protected static final int THRESHOLD_FOR_ONE_PERCENT = 100000;
public static final int MAX_RETRY = 7; public static final int MAX_RETRY = 7;
@ -34,6 +34,7 @@ public abstract class DocumentElaboration {
protected final AggregationState finalAggregationState; protected final AggregationState finalAggregationState;
protected final int rowToBeElaborated; protected final int rowToBeElaborated;
protected int currentlyElaborated;
protected Calendar startTime; protected Calendar startTime;
@ -42,8 +43,9 @@ public abstract class DocumentElaboration {
this.finalAggregationState = finalAggregationState; this.finalAggregationState = finalAggregationState;
this.file = file; this.file = file;
this.rowToBeElaborated = rowToBeElaborated; this.rowToBeElaborated = rowToBeElaborated;
this.currentlyElaborated = 0;
} }
protected void readFile() throws Exception { protected void readFile() throws Exception {
FileInputStream fstream = null; FileInputStream fstream = null;
DataInputStream in = null; DataInputStream in = null;
@ -66,7 +68,12 @@ public abstract class DocumentElaboration {
percentOfNumberOfRows = percentOfNumberOfRows / 5; percentOfNumberOfRows = percentOfNumberOfRows / 5;
} }
int elaborated = 0; currentlyElaborated = 0;
int restartFrom = aggregationStatus.getRestartFrom();
if(restartFrom>0) {
logger.info("The elaboration will be restarted from record number {}", aggregationStatus.getRestartFrom());
}
String line; String line;
// Read File Line By Line // Read File Line By Line
while((line = br.readLine()) != null) { while((line = br.readLine()) != null) {
@ -75,6 +82,9 @@ public abstract class DocumentElaboration {
int i = 0; int i = 0;
while(elaborate) { while(elaborate) {
++i; ++i;
if(currentlyElaborated<restartFrom) {
break;
}
try { try {
elaborateLine(line); elaborateLine(line);
TimeUnit.MILLISECONDS.sleep(3); TimeUnit.MILLISECONDS.sleep(3);
@ -98,23 +108,26 @@ public abstract class DocumentElaboration {
} }
} }
++elaborated; ++currentlyElaborated;
if(elaborated % percentOfNumberOfRows == 0) { if(currentlyElaborated % percentOfNumberOfRows == 0) {
int elaboratedPercentage = elaborated * 100 / rowToBeElaborated; if(currentlyElaborated>=restartFrom) {
aggregationStatus.setRestartFrom(currentlyElaborated, true);
}
int elaboratedPercentage = currentlyElaborated * 100 / rowToBeElaborated;
logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(), logger.info("{} - Elaborated {} rows of {} (about {}%)", aggregationStatus.getAggregationInfo(),
elaborated, rowToBeElaborated, elaboratedPercentage); currentlyElaborated, rowToBeElaborated, elaboratedPercentage);
} }
if(elaborated > rowToBeElaborated) { if(currentlyElaborated > rowToBeElaborated) {
throw new Exception("Elaborated file line is number " + elaborated + " > " + rowToBeElaborated throw new Exception("Elaborated file line is number " + currentlyElaborated + " > " + rowToBeElaborated
+ " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); + " (total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
} }
} }
if(elaborated != rowToBeElaborated) { if(currentlyElaborated != rowToBeElaborated) {
throw new Exception("Elaborated file line is number " + elaborated + " != " + rowToBeElaborated throw new Exception("Elaborated file line is number " + currentlyElaborated + " != " + rowToBeElaborated
+ "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution"); + "(total number of rows to elaborate). This is really strange and should not occur. Stopping execution");
} }
logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), elaborated, logger.info("{} - Elaborated {} rows of {} ({}%)", aggregationStatus.getAggregationInfo(), currentlyElaborated,
rowToBeElaborated, 100); rowToBeElaborated, 100);
} catch(Exception e) { } catch(Exception e) {
@ -137,7 +150,11 @@ public abstract class DocumentElaboration {
public void elaborate() throws Exception { public void elaborate() throws Exception {
startTime = Utility.getUTCCalendarInstance(); startTime = Utility.getUTCCalendarInstance();
readFile(); try {
readFile();
}catch (Exception e) {
}
afterElaboration(); afterElaboration();
aggregationStatus.setAggregationState(finalAggregationState, startTime, true); aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
} }

View File

@ -10,16 +10,13 @@ import java.util.TreeMap;
import org.gcube.accounting.aggregator.elaboration.Elaborator; import org.gcube.accounting.aggregator.elaboration.Elaborator;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceDst; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceDst;
import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory;
import org.gcube.accounting.aggregator.persistence.PostgreSQLConnectorDst;
import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.com.fasterxml.jackson.core.JsonProcessingException; import org.gcube.com.fasterxml.jackson.core.JsonProcessingException;
import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.com.fasterxml.jackson.databind.JsonNode;
import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.DSMapper;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility; import org.gcube.documentstore.records.RecordUtility;

View File

@ -61,9 +61,15 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
} }
protected Connection getConnection() throws Exception { protected Connection getConnection() throws Exception {
if(connection==null) { if(connection==null || connection.isClosed()) {
Class.forName("org.postgresql.Driver");
String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY); String url = configuration.getProperty(AccountingPersistenceQueryPostgreSQL.URL_PROPERTY_KEY);
if(connection!=null && connection.isClosed()) {
logger.warn("The connection was closed. We should investigate why. Going to reconnect to {}.", url);
}
Class.forName("org.postgresql.Driver");
String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY); String username = configuration.getProperty(AccountingPersistenceConfiguration.USERNAME_PROPERTY_KEY);
String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY); String password = configuration.getProperty(AccountingPersistenceConfiguration.PASSWORD_PROPERTY_KEY);
@ -126,6 +132,7 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
stringBuffer.append(" (id, "); stringBuffer.append(" (id, ");
stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, "); stringBuffer.append("record_type, aggregation_type, aggregation_start_date, aggregation_end_date, ");
stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, "); stringBuffer.append("original_records_number, aggregated_records_number, recovered_records_number, malformed_records_number, percentage, ");
stringBuffer.append("restart_from, ");
stringBuffer.append("context, current_aggregation_state, last_update_time, previous)"); stringBuffer.append("context, current_aggregation_state, last_update_time, previous)");
stringBuffer.append(" VALUES ("); stringBuffer.append(" VALUES (");
stringBuffer.append(getValue(aggregationStatus.getUUID().toString())); stringBuffer.append(getValue(aggregationStatus.getUUID().toString()));
@ -150,6 +157,8 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber())); stringBuffer.append(getValue(aggregationStatus.getMalformedRecordNumber()));
stringBuffer.append(", "); stringBuffer.append(", ");
stringBuffer.append(getValue(aggregationStatus.getPercentage())); stringBuffer.append(getValue(aggregationStatus.getPercentage()));
stringBuffer.append(", ");
stringBuffer.append(getValue(aggregationStatus.getRestartFrom()));
stringBuffer.append(", "); stringBuffer.append(", ");
stringBuffer.append(getValue(aggregationStatus.getContext())); stringBuffer.append(getValue(aggregationStatus.getContext()));
@ -168,8 +177,14 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
if(upsert) { if(upsert) {
stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET "); stringBuffer.append(") ON CONFLICT (id) DO UPDATE SET ");
stringBuffer.append("original_records_number=EXCLUDED.original_records_number, aggregated_records_number=EXCLUDED.aggregated_records_number, recovered_records_number=EXCLUDED.recovered_records_number, malformed_records_number=EXCLUDED.malformed_records_number, percentage=EXCLUDED.percentage, "); stringBuffer.append("original_records_number=EXCLUDED.original_records_number, ");
stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;"); stringBuffer.append("aggregated_records_number=EXCLUDED.aggregated_records_number, ");
stringBuffer.append("recovered_records_number=EXCLUDED.recovered_records_number, ");
stringBuffer.append("malformed_records_number=EXCLUDED.malformed_records_number, ");
stringBuffer.append("percentage=EXCLUDED.percentage, ");
stringBuffer.append("restart_from=EXCLUDED.restart_from, ");
stringBuffer.append("current_aggregation_state=EXCLUDED.current_aggregation_state, ");
stringBuffer.append("last_update_time=EXCLUDED.last_update_time, previous=EXCLUDED.previous;");
}else { }else {
stringBuffer.append(");"); stringBuffer.append(");");
} }
@ -242,6 +257,9 @@ public class PostgreSQLConnector extends AccountingPersistenceQueryPostgreSQL im
int malformedRecordNumber = resultSet.getInt("malformed_records_number"); int malformedRecordNumber = resultSet.getInt("malformed_records_number");
aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber); aggregationStatus.setRecordNumbers(originalRecordsNumber, aggregatedRecordsNumber, malformedRecordNumber);
int restartFrom = resultSet.getInt("restart_from");
aggregationStatus.setRestartFrom(restartFrom, false);
String context = resultSet.getString("context"); String context = resultSet.getString("context");
aggregationStatus.setContext(context); aggregationStatus.setContext(context);

View File

@ -1,22 +1,7 @@
package org.gcube.accounting.aggregator.persistence; package org.gcube.accounting.aggregator.persistence;
import java.util.Calendar;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import org.gcube.accounting.analytics.Filter;
import org.gcube.accounting.analytics.Info;
import org.gcube.accounting.analytics.NumberedFilter;
import org.gcube.accounting.analytics.TemporalConstraint;
import org.gcube.accounting.analytics.UsageValue;
import org.gcube.accounting.analytics.exception.DuplicatedKeyFilterException;
import org.gcube.accounting.analytics.exception.KeyException;
import org.gcube.accounting.analytics.exception.ValueException;
import org.gcube.accounting.analytics.persistence.AccountingPersistenceBackendQueryConfiguration;
import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration;
import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.persistence.PersistencePostgreSQL;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.Record;
/** /**
@ -27,6 +12,7 @@ public class PostgreSQLConnectorDst implements AggregatorPersistenceDst {
protected PersistencePostgreSQL persistencePostgreSQL; protected PersistencePostgreSQL persistencePostgreSQL;
protected PostgreSQLConnectorDst() throws Exception { protected PostgreSQLConnectorDst() throws Exception {
//AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PostgreSQLConnectorDst.class);
AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class); AccountingPersistenceConfiguration accountingPersistenceConfiguration = new AccountingPersistenceConfiguration(PersistencePostgreSQL.class);
persistencePostgreSQL = new PersistencePostgreSQL(); persistencePostgreSQL = new PersistencePostgreSQL();
persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration); persistencePostgreSQL.prepareConnection(accountingPersistenceConfiguration);

View File

@ -1,12 +1,14 @@
package org.gcube.accounting.aggregator.persistence; package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc { public class PostgreSQLConnectorSrc extends PostgreSQLConnector implements AggregatorPersistenceSrc {
protected PostgreSQLConnectorSrc() throws Exception { protected PostgreSQLConnectorSrc() throws Exception {
super(AggregatorPersistenceSrc.class); super(AccountingPersistenceQueryPostgreSQL.class);
} }
} }

View File

@ -1,12 +1,14 @@
package org.gcube.accounting.aggregator.persistence; package org.gcube.accounting.aggregator.persistence;
import org.gcube.accounting.analytics.persistence.postgresql.AccountingPersistenceQueryPostgreSQL;
/** /**
* @author Luca Frosini (ISTI-CNR) * @author Luca Frosini (ISTI-CNR)
*/ */
public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceSrc { public class PostgreSQLConnectorStatus extends PostgreSQLConnector implements AggregatorPersistenceSrc {
protected PostgreSQLConnectorStatus() throws Exception { protected PostgreSQLConnectorStatus() throws Exception {
super(AggregatorPersistenceStatus.class); super(AccountingPersistenceQueryPostgreSQL.class);
} }
} }

View File

@ -46,6 +46,9 @@ public class AggregationStatus {
@JsonProperty(required = false) @JsonProperty(required = false)
protected String context; protected String context;
@JsonProperty
protected int restartFrom;
@JsonProperty(required = false) @JsonProperty(required = false)
protected AggregationStatus previous; protected AggregationStatus previous;
@ -90,6 +93,7 @@ public class AggregationStatus {
this.uuid = UUID.randomUUID(); this.uuid = UUID.randomUUID();
this.malformedRecordNumber = 0; this.malformedRecordNumber = 0;
this.previous = null; this.previous = null;
this.restartFrom = 0;
} }
public AggregationStatus(AggregationStatus aggregationStatus) throws Exception { public AggregationStatus(AggregationStatus aggregationStatus) throws Exception {
@ -98,6 +102,7 @@ public class AggregationStatus {
this.uuid = aggregationStatus.getUUID(); this.uuid = aggregationStatus.getUUID();
this.malformedRecordNumber = 0; this.malformedRecordNumber = 0;
this.previous = aggregationStatus; this.previous = aggregationStatus;
this.restartFrom = 0;
} }
public AggregationInfo getAggregationInfo() { public AggregationInfo getAggregationInfo() {
@ -116,6 +121,8 @@ public class AggregationStatus {
this.aggregationState = aggregationState; this.aggregationState = aggregationState;
this.lastUpdateTime = endTime; this.lastUpdateTime = endTime;
this.restartFrom = 0;
AggregationStateEvent aggregationStatusEvent = new AggregationStateEvent(aggregationState, startTime, endTime); AggregationStateEvent aggregationStatusEvent = new AggregationStateEvent(aggregationState, startTime, endTime);
aggregationStateEvents.add(aggregationStatusEvent); aggregationStateEvents.add(aggregationStatusEvent);
@ -185,6 +192,18 @@ public class AggregationStatus {
this.context = context; this.context = context;
} }
public int getRestartFrom() {
return restartFrom;
}
public void setRestartFrom(int restartFrom, boolean sync) throws Exception {
this.restartFrom = restartFrom;
if (sync) {
AggregatorPersistenceFactory.getAggregatorPersistenceStatus().upsertAggregationStatus(this);
}
}
public AggregationStatus getPrevious() { public AggregationStatus getPrevious() {
return previous; return previous;
} }

View File

@ -14,12 +14,7 @@ import java.util.TimeZone;
import org.gcube.accounting.aggregator.aggregation.AggregationType; import org.gcube.accounting.aggregator.aggregation.AggregationType;
import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin; import org.gcube.accounting.aggregator.plugin.AccountingAggregatorPlugin;
import org.gcube.common.authorization.client.Constants; import org.gcube.common.authorization.utils.manager.SecretManagerProvider;
import org.gcube.common.authorization.library.AuthorizationEntry;
import org.gcube.common.authorization.library.provider.AuthorizationProvider;
import org.gcube.common.authorization.library.provider.ClientInfo;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider;
import org.gcube.common.authorization.library.utils.Caller;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,8 +26,7 @@ public class Utility {
private static Logger logger = LoggerFactory.getLogger(Utility.class); private static Logger logger = LoggerFactory.getLogger(Utility.class);
public static String getCurrentContext() throws Exception { public static String getCurrentContext() throws Exception {
String token = SecurityTokenProvider.instance.get(); return SecretManagerProvider.instance.get().getContext();
return Constants.authorizationService().get(token).getContext();
} }
@ -170,29 +164,8 @@ public class Utility {
return aggregationEndDate.getTime(); return aggregationEndDate.getTime();
} }
protected static ClientInfo getClientInfo() throws Exception { public static String getUsername() {
Caller caller = AuthorizationProvider.instance.get(); return SecretManagerProvider.instance.get().getUser().getUsername();
if(caller!=null){
return caller.getClient();
}else{
String token = SecurityTokenProvider.instance.get();
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
return authorizationEntry.getClientInfo();
}
}
public static String getUsername() throws Exception{
try {
ClientInfo clientInfo = getClientInfo();
String clientId = clientInfo.getId();
if (clientId != null && clientId.compareTo("") != 0) {
return clientId;
}
throw new Exception("Username null or empty");
} catch (Exception e) {
logger.error("Unable to retrieve user.");
throw new Exception("Unable to retrieve user.", e);
}
} }

View File

@ -7,14 +7,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Properties; import java.util.Properties;
import org.gcube.common.authorization.client.Constants; import org.gcube.common.authorization.utils.manager.SecretManager;
import org.gcube.common.authorization.client.exceptions.ObjectNotFound; import org.gcube.common.authorization.utils.manager.SecretManagerProvider;
import org.gcube.common.authorization.library.AuthorizationEntry; import org.gcube.common.authorization.utils.secret.JWTSecret;
import org.gcube.common.authorization.library.provider.AuthorizationProvider; import org.gcube.common.authorization.utils.secret.Secret;
import org.gcube.common.authorization.library.provider.ClientInfo; import org.gcube.common.authorization.utils.secret.SecretUtility;
import org.gcube.common.authorization.library.provider.SecurityTokenProvider; import org.gcube.common.keycloak.KeycloakClientFactory;
import org.gcube.common.authorization.library.utils.Caller; import org.gcube.common.keycloak.KeycloakClientHelper;
import org.gcube.common.scope.api.ScopeProvider; import org.gcube.common.keycloak.model.TokenResponse;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -22,82 +22,152 @@ import org.slf4j.LoggerFactory;
/** /**
* @author Luca Frosini (ISTI - CNR) * @author Luca Frosini (ISTI - CNR)
*
*/ */
public class ContextTest { public class ContextTest {
private static final Logger logger = LoggerFactory.getLogger(ContextTest.class); private static final Logger logger = LoggerFactory.getLogger(ContextTest.class);
protected static Properties properties; protected static final String CONFIG_INI_FILENAME = "config.ini";
protected static final String PROPERTIES_FILENAME = "token.properties";
public static final String ROOT_DEV_SCOPE; public static final String DEFAULT_TEST_SCOPE;
public static final String VO_DEFAULT_TEST_SCOPE;
public static final String VRE_DEFAULT_TEST_SCOPE;
public static final String VO_DEFAULT_TEST_SCOPE_ANOTHER_USER; public static final String GCUBE;
public static final String DEVNEXT;
public static final String NEXTNEXT;
public static final String DEVSEC;
public static final String DEVVRE;
public static final String ROOT_PROD;
protected static final Properties properties;
public static final String TYPE_PROPERTY_KEY = "type";
public static final String USERNAME_PROPERTY_KEY = "username";
public static final String PASSWORD_PROPERTY_KEY = "password";
public static final String CLIENT_ID_PROPERTY_KEY = "clientId";
static { static {
properties = new Properties(); GCUBE = "/gcube";
InputStream input = ContextTest.class.getClassLoader().getResourceAsStream(PROPERTIES_FILENAME); DEVNEXT = GCUBE + "/devNext";
NEXTNEXT = DEVNEXT + "/NextNext";
DEVSEC = GCUBE + "/devsec";
DEVVRE = DEVSEC + "/devVRE";
ROOT_PROD = "/d4science.research-infrastructures.eu";
DEFAULT_TEST_SCOPE = GCUBE;
properties = new Properties();
InputStream input = ContextTest.class.getClassLoader().getResourceAsStream(CONFIG_INI_FILENAME);
try { try {
// load the properties file // load the properties file
properties.load(input); properties.load(input);
} catch(IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// PARENT_DEFAULT_TEST_SCOPE = "/pred4s" }
// DEFAULT_TEST_SCOPE_NAME = PARENT_DEFAULT_TEST_SCOPE + "/preprod";
// ALTERNATIVE_TEST_SCOPE = DEFAULT_TEST_SCOPE_NAME + "/preVRE"; private enum Type{
USER, CLIENT_ID
};
public static void set(Secret secret) throws Exception {
SecretManagerProvider.instance.reset();
SecretManager secretManager = new SecretManager();
secretManager.addSecret(secret);
SecretManagerProvider.instance.set(secretManager);
SecretManagerProvider.instance.get().set();
}
public static void setContextByName(String fullContextName) throws Exception {
logger.debug("Going to set credentials for context {}", fullContextName);
Secret secret = getSecretByContextName(fullContextName);
set(secret);
}
private static TokenResponse getJWTAccessToken(String context) throws Exception {
Type type = Type.valueOf(properties.get(TYPE_PROPERTY_KEY).toString());
TokenResponse tr = null;
ROOT_DEV_SCOPE = "/gcube"; int index = context.indexOf('/', 1);
VO_DEFAULT_TEST_SCOPE = ROOT_DEV_SCOPE + "/devNext"; String root = context.substring(0, index == -1 ? context.length() : index);
VRE_DEFAULT_TEST_SCOPE = VO_DEFAULT_TEST_SCOPE + "/NextNext";
VO_DEFAULT_TEST_SCOPE_ANOTHER_USER = "lucio.lelii_" + VO_DEFAULT_TEST_SCOPE; switch (type) {
case CLIENT_ID:
String clientId = properties.getProperty(CLIENT_ID_PROPERTY_KEY);
String clientSecret = properties.getProperty(root);
tr = KeycloakClientFactory.newInstance().queryUMAToken(context, clientId, clientSecret, context, null);
break;
case USER:
default:
String username = properties.getProperty(USERNAME_PROPERTY_KEY);
String password = properties.getProperty(PASSWORD_PROPERTY_KEY);
switch (root) {
case "/gcube":
default:
clientId = "next.d4science.org";
break;
case "/pred4s":
clientId = "pre.d4science.org";
break;
case "/d4science.research-infrastructures.eu":
clientId = "services.d4science.org";
break;
}
clientSecret = null;
tr = KeycloakClientHelper.getTokenForUser(context, username, password);
break;
}
return tr;
}
public static Secret getSecretByContextName(String context) throws Exception {
TokenResponse tr = getJWTAccessToken(context);
Secret secret = new JWTSecret(tr.getAccessToken());
return secret;
}
public static void setContext(String token) throws Exception {
Secret secret = getSecret(token);
set(secret);
}
private static Secret getSecret(String token) throws Exception {
Secret secret = SecretUtility.getSecretByTokenString(token);
return secret;
}
public static String getUser() {
String user = "UNKNOWN";
try { try {
setContextByName(VO_DEFAULT_TEST_SCOPE); user = SecretManagerProvider.instance.get().getUser().getUsername();
} catch(Exception e) { } catch(Exception e) {
throw new RuntimeException(e); logger.error("Unable to retrieve user. {} will be used", user);
} }
} return user;
public static String getCurrentScope(String token) throws ObjectNotFound, Exception {
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
String context = authorizationEntry.getContext();
logger.info("Context of token {} is {}", token, context);
return context;
}
public static void setContextByName(String fullContextName) throws ObjectNotFound, Exception {
String token = ContextTest.properties.getProperty(fullContextName);
setContext(token);
}
private static void setContext(String token) throws ObjectNotFound, Exception {
SecurityTokenProvider.instance.set(token);
AuthorizationEntry authorizationEntry = Constants.authorizationService().get(token);
ClientInfo clientInfo = authorizationEntry.getClientInfo();
logger.debug("User : {} - Type : {}", clientInfo.getId(), clientInfo.getType().name());
String qualifier = authorizationEntry.getQualifier();
Caller caller = new Caller(clientInfo, qualifier);
AuthorizationProvider.instance.set(caller);
ScopeProvider.instance.set(getCurrentScope(token));
} }
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
setContextByName(VO_DEFAULT_TEST_SCOPE); setContextByName(DEFAULT_TEST_SCOPE);
} }
@AfterClass @AfterClass
public static void afterClass() throws Exception { public static void afterClass() throws Exception {
SecurityTokenProvider.instance.reset(); SecretManagerProvider.instance.reset();
ScopeProvider.instance.reset();
} }
} }

View File

@ -27,6 +27,13 @@ public class PostgreSQLConnectorTest extends ContextTest {
aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus(); aggregatorPersistenceStatus = AggregatorPersistenceFactory.getAggregatorPersistenceStatus();
} }
@Test
public void getAggregatorPersistenceDst() throws Exception {
ContextTest.setContextByName(ROOT_PROD);
AggregatorPersistenceDst dst = AggregatorPersistenceFactory.getAggregatorPersistenceDst();
dst.commitAndClose();
}
@Test @Test
public void getLastTest() throws Exception { public void getLastTest() throws Exception {
AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null); AggregationStatus aggregationStatus = aggregatorPersistenceStatus.getLast(ServiceUsageRecord.class.getSimpleName(), AggregationType.DAILY, null, null);
@ -68,7 +75,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus)); logger.debug("{}", DSMapper.getObjectMapper().writeValueAsString(aggregationStatus));
} }
@Test //@Test
public void aggregationStatusTest() throws Exception { public void aggregationStatusTest() throws Exception {
int toRemove = -36; int toRemove = -36;
@ -121,7 +128,7 @@ public class PostgreSQLConnectorTest extends ContextTest {
} }
@Test //@Test
public void createStartedElaboration() throws Exception { public void createStartedElaboration() throws Exception {
Calendar start = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15); Calendar start = Utility.getAggregationStartCalendar(2017, Calendar.JUNE, 15);

View File

@ -2,9 +2,7 @@ package org.gcube.accounting.aggregator.plugin;
import java.util.Calendar; import java.util.Calendar;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.ContextTest; import org.gcube.accounting.aggregator.ContextTest;
@ -27,7 +25,7 @@ public class AccountingAggregatorPluginTest extends ContextTest {
public static final String ROOT_PROD = "/d4science.research-infrastructures.eu"; public static final String ROOT_PROD = "/d4science.research-infrastructures.eu";
private void aggregate(String recordType, AggregationType aggregationType, Calendar aggregationStartCalendar, private void aggregate(String recordType, AggregationType aggregationType, Calendar aggregationStartCalendar,
Calendar aggregationEndCalendar) throws Exception { Calendar aggregationEndCalendar, boolean forceRerun, boolean forceEarlyAggregation) throws Exception {
Map<String, Object> inputs = new HashMap<String, Object>(); Map<String, Object> inputs = new HashMap<String, Object>();
inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name()); inputs.put(AccountingAggregatorPlugin.AGGREGATION_TYPE_INPUT_PARAMETER, aggregationType.name());
@ -38,8 +36,8 @@ public class AccountingAggregatorPluginTest extends ContextTest {
inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER, inputs.put(AccountingAggregatorPlugin.RESTART_FROM_LAST_AGGREGATION_DATE_INPUT_PARAMETER,
aggregationStartCalendar == null); aggregationStartCalendar == null);
inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, false); inputs.put(AccountingAggregatorPlugin.FORCE_EARLY_AGGREGATION, forceEarlyAggregation);
inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, false); inputs.put(AccountingAggregatorPlugin.FORCE_RERUN, forceRerun);
inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false); inputs.put(AccountingAggregatorPlugin.FORCE_RESTART, false);
if (aggregationStartCalendar != null) { if (aggregationStartCalendar != null) {
@ -80,12 +78,14 @@ public class AccountingAggregatorPluginTest extends ContextTest {
@JsonIgnore @JsonIgnore
@Test @Test
public void aggregateJobUsageRecord() throws Exception { public void aggregateJobUsageRecord() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
String recordType = JobUsageRecord.class.newInstance().getRecordType(); String recordType = JobUsageRecord.class.newInstance().getRecordType();
boolean allAgregationTypes = true; boolean allAgregationTypes = true;
boolean forceRerun = false;
boolean forceEarlyAggregation = false;
if (!allAgregationTypes) { if (!allAgregationTypes) {
AggregationType aggregationType = AggregationType.DAILY; AggregationType aggregationType = AggregationType.DAILY;
@ -95,10 +95,11 @@ public class AccountingAggregatorPluginTest extends ContextTest {
// aggregationStartCalendar, 1); // aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11); Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRerun, forceEarlyAggregation);
} else { } else {
for (AggregationType at : AggregationType.values()) { for (AggregationType at : AggregationType.values()) {
aggregate(recordType, at, null, null); aggregate(recordType, at, null, null, forceRerun, forceEarlyAggregation);
} }
} }
} }
@ -106,12 +107,14 @@ public class AccountingAggregatorPluginTest extends ContextTest {
@JsonIgnore @JsonIgnore
@Test @Test
public void aggregateStorageStatusRecord() throws Exception { public void aggregateStorageStatusRecord() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
String recordType = StorageStatusRecord.class.newInstance().getRecordType(); String recordType = StorageStatusRecord.class.newInstance().getRecordType();
boolean allAgregationTypes = false; boolean allAgregationTypes = false;
boolean forceRerun = false;
boolean forceEarlyAggregation = false;
if (!allAgregationTypes) { if (!allAgregationTypes) {
AggregationType aggregationType = AggregationType.DAILY; AggregationType aggregationType = AggregationType.DAILY;
@ -121,10 +124,10 @@ public class AccountingAggregatorPluginTest extends ContextTest {
// aggregationStartCalendar, 1); // aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11); Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRerun, forceEarlyAggregation);
} else { } else {
for (AggregationType at : AggregationType.values()) { for (AggregationType at : AggregationType.values()) {
aggregate(recordType, at, null, null); aggregate(recordType, at, null, null, forceRerun, forceEarlyAggregation);
} }
} }
} }
@ -132,13 +135,16 @@ public class AccountingAggregatorPluginTest extends ContextTest {
@JsonIgnore @JsonIgnore
@Test @Test
public void aggregateStorageUsageRecord() throws Exception { public void aggregateStorageUsageRecord() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
String recordType = StorageUsageRecord.class.newInstance().getRecordType(); String recordType = StorageUsageRecord.class.newInstance().getRecordType();
boolean allAgregationTypes = false; boolean allAgregationTypes = false;
boolean forceRerun = false;
boolean forceEarlyAggregation = false;
if (!allAgregationTypes) { if (!allAgregationTypes) {
AggregationType aggregationType = AggregationType.DAILY; AggregationType aggregationType = AggregationType.DAILY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10); Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10);
@ -147,64 +153,126 @@ public class AccountingAggregatorPluginTest extends ContextTest {
// aggregationStartCalendar, 1); // aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11); Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRerun, forceEarlyAggregation);
} else { } else {
for (AggregationType at : AggregationType.values()) { for (AggregationType at : AggregationType.values()) {
aggregate(recordType, at, null, null); aggregate(recordType, at, null, null, forceRerun, forceEarlyAggregation);
} }
} }
} }
@JsonIgnore // @JsonIgnore
@Test @Test
public void aggregateService() throws Exception { public void aggregateService() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); // ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
String recordType = ServiceUsageRecord.class.newInstance().getRecordType(); String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
boolean allAgregationTypes = true; boolean allAgregationTypes = false;
boolean forceRerun = true;
boolean forceEarlyAggregation = false;
if (!allAgregationTypes) { if (!allAgregationTypes) {
AggregationType aggregationType = AggregationType.DAILY; AggregationType aggregationType = AggregationType.YEARLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 10); Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2019, Calendar.JANUARY, 1);
// Calendar aggregationEndCalendar = Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
// Utility.getEndCalendarFromStartCalendar(aggregationType, // Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2015, Calendar.JANUARY, 1);
// aggregationStartCalendar, 1);
Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2021, Calendar.NOVEMBER, 11);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar); aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRerun, forceEarlyAggregation);
} else { } else {
for (AggregationType at : AggregationType.values()) { for (AggregationType at : AggregationType.values()) {
aggregate(recordType, at, null, null); aggregate(recordType, at, null, null, forceRerun, forceEarlyAggregation);
} }
} }
} }
@JsonIgnore
@Test @Test
public void aggregateAll() throws Exception { public void aggregateAprilService() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); // ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
Set<String> allRecordTypes = new HashSet<>(); String recordType = ServiceUsageRecord.class.newInstance().getRecordType();
allRecordTypes.add(ServiceUsageRecord.class.newInstance().getRecordType());
allRecordTypes.add(JobUsageRecord.class.newInstance().getRecordType());
allRecordTypes.add(StorageUsageRecord.class.newInstance().getRecordType());
allRecordTypes.add(StorageStatusRecord.class.newInstance().getRecordType());
for (AggregationType at : AggregationType.values()) { boolean allAgregationTypes = false;
for(String recordType : allRecordTypes) { boolean forceRerun = true;
aggregate(recordType, at, null, null); boolean forceEarlyAggregation = false;
if (!allAgregationTypes) {
AggregationType aggregationType = AggregationType.MONTHLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2023, Calendar.APRIL, 1);
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
// Calendar aggregationEndCalendar = Utility.getAggregationStartCalendar(2015, Calendar.JANUARY, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRerun, forceEarlyAggregation);
} else {
for (AggregationType at : AggregationType.values()) {
aggregate(recordType, at, null, null, forceRerun, forceEarlyAggregation);
} }
} }
}
// @JsonIgnore
@Test
public void aggregateAllStorageUsageRecord() throws Exception {
// ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD);
String recordType = StorageUsageRecord.class.newInstance().getRecordType();
aggregateAll(recordType);
} }
public void aggregateAll(String recordType) throws Exception {
boolean forceRestart = true;
boolean forceEarlyAggregation = false;
AggregationType aggregationType = AggregationType.YEARLY;
Calendar aggregationStartCalendar = Utility.getAggregationStartCalendar(2020, Calendar.JANUARY, 1);
Calendar end = Utility.getAggregationStartCalendar(2021, Calendar.JANUARY, 1);
// while (aggregationStartCalendar.before(end)) {
// Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
// aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
// aggregationStartCalendar = Calendar.getInstance();
// aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
//
// }
aggregationType = AggregationType.MONTHLY;
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(end.getTimeInMillis());
end = Utility.getAggregationStartCalendar(2023, Calendar.NOVEMBER, 1);
while (aggregationStartCalendar.before(end)) {
Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, forceEarlyAggregation);
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
}
aggregationType = AggregationType.DAILY;
aggregationStartCalendar = Calendar.getInstance();
aggregationStartCalendar.setTimeInMillis(end.getTimeInMillis());
aggregationStartCalendar = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 6);
end = Utility.getAggregationStartCalendar(2024, Calendar.FEBRUARY, 9);
// while (aggregationStartCalendar.before(end)) {
// Calendar aggregationEndCalendar = Utility.getEndCalendarFromStartCalendar(aggregationType, aggregationStartCalendar, 1);
// aggregate(recordType, aggregationType, aggregationStartCalendar, aggregationEndCalendar, forceRestart, true);
// aggregationStartCalendar = Calendar.getInstance();
// aggregationStartCalendar.setTimeInMillis(aggregationEndCalendar.getTimeInMillis());
//
// }
}
@JsonIgnore @JsonIgnore
@Test @Test
public void testRecovery() throws Exception { public void testRecovery() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(GCUBE);
// ContextTest.setContextByName(ROOT_PROD); // ContextTest.setContextByName(ROOT_PROD);
Map<String, Object> inputs = new HashMap<String, Object>(); Map<String, Object> inputs = new HashMap<String, Object>();

View File

@ -9,7 +9,7 @@ public class WorkSpaceManagementTest extends ContextTest {
@Test @Test
public void testCreatedirectory() throws Exception { public void testCreatedirectory() throws Exception {
ContextTest.setContextByName(ROOT_DEV_SCOPE); ContextTest.setContextByName(GCUBE);
//ContextTest.setContextByName(AccountingAggregatorPluginTest.ROOT_PROD); //ContextTest.setContextByName(AccountingAggregatorPluginTest.ROOT_PROD);
WorkSpaceManagement workSpaceManagement = WorkSpaceManagement.getInstance(); WorkSpaceManagement workSpaceManagement = WorkSpaceManagement.getInstance();

View File

@ -5,3 +5,4 @@
/token.properties /token.properties
/d4science.research-infrastructures.eu.gcubekey /d4science.research-infrastructures.eu.gcubekey
/CalledMethods/ /CalledMethods/
/config.ini