Using StorageHubClient in place of Home Library Webapp HTTP calls.

Added the generation of a CSV to post analyze the calledMethods
This commit is contained in:
Luca Frosini 2020-03-12 19:42:34 +01:00
parent e6a4dbc3a7
commit 0ba72759af
6 changed files with 124 additions and 43 deletions

View File

@ -8,7 +8,7 @@ import org.gcube.common.storagehub.client.dsl.FolderContainer;
*/
public class WorkSpaceDirectoryStructure extends DirectoryStructure<FolderContainer>{
private static final String BACKUP_FOLDER_DESCRIPTION = "Accouting Aggregator Plugin Backup Folder";
private static final String BACKUP_FOLDER_DESCRIPTION = "Accounting Aggregator Plugin Backup Folder";
@Override
protected FolderContainer getRoot() throws Exception {
@ -17,7 +17,7 @@ public class WorkSpaceDirectoryStructure extends DirectoryStructure<FolderContai
@Override
protected FolderContainer createDirectory(FolderContainer parent, String name) throws Exception {
return WorkSpaceManagement.getInstance().createFolder(parent, name, BACKUP_FOLDER_DESCRIPTION);
return WorkSpaceManagement.getInstance().getOrCreateFolder(parent, name, BACKUP_FOLDER_DESCRIPTION, false);
}

View File

@ -187,7 +187,7 @@ public class Elaborator {
* before midnight and the second after midnight (so in the next day).
*/
if (Utility.isTimeElapsed(now, persistStartTime) && !Utility.isTimeElapsed(now, persistEndTime)) {
Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile);
Persist persist = new Persist(aggregationStatus, srcBucket, dstBucket, originalRecordsbackupFile, aggregateRecordsBackupFile, recordType);
persist.recover();
}else{
logger.info("Cannot delete/insert document before {} and after {}.", AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistStartTime), AccountingAggregatorPlugin.LOCAL_TIME_DATE_FORMAT.format(persistEndTime));

View File

@ -142,8 +142,8 @@ public abstract class DocumentElaboration {
public void elaborate() throws Exception {
startTime = Utility.getUTCCalendarInstance();
readFile();
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
afterElaboration();
aggregationStatus.setAggregationState(finalAggregationState, startTime, true);
}
protected abstract void elaborateLine(String line) throws Exception;

View File

@ -1,11 +1,20 @@
package org.gcube.accounting.aggregator.persist;
import java.io.File;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.persistence.CouchBaseConnector;
import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
@ -17,13 +26,73 @@ import com.couchbase.client.java.document.json.JsonObject;
*/
public class InsertDocument extends DocumentElaboration {
public static final String CSV_FILENAME;
static {
StringWriter stringWriter = new StringWriter();
stringWriter.append(ServiceUsageRecord.SERVICE_CLASS);
stringWriter.append("_");
stringWriter.append(ServiceUsageRecord.SERVICE_NAME);
stringWriter.append("_");
stringWriter.append(ServiceUsageRecord.CALLED_METHOD);
stringWriter.append(".csv");
CSV_FILENAME = stringWriter.toString();
}
/**
* This is used to save a CSV file which allow to made a post analysis of calledMethods
* The format of the CSV file is
* serviceClass,serviceName,calledMethod
* e.g.
* SDI,GeoNetwork,create
*
* There is an entry for each triple
*/
protected Map<String,Set<String>> serviceClassName_calledMethods;
protected List<String> unparsableLines;
protected boolean serviceUsageRecordElaboration;
public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new HashMap<String,Set<String>>();
unparsableLines = new ArrayList<>();
}
protected String getKey(JsonObject jsonObject) {
String serviceClass = jsonObject.getString(ServiceUsageRecord.SERVICE_CLASS);
String serviceName = jsonObject.getString(ServiceUsageRecord.SERVICE_CLASS);
return serviceClass + "," + serviceName;
}
protected void addServiceClassName_calledMethods(JsonObject jsonObject) {
String key = getKey(jsonObject);
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
if(setOfCalledMethods==null) {
setOfCalledMethods = new HashSet<>();
serviceClassName_calledMethods.put(key, setOfCalledMethods);
}
String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD);
setOfCalledMethods.add(calledMethod);
}
protected JsonObject analyseLine(String line) {
JsonObject jsonObject = JsonObject.fromJson(line);
if(serviceUsageRecordElaboration) {
try {
addServiceClassName_calledMethods(jsonObject);
}catch (Throwable e) {
unparsableLines.add(line);
}
}
return jsonObject;
}
@Override
protected void elaborateLine(String line) throws Exception {
JsonObject jsonObject = JsonObject.fromJson(line);
JsonObject jsonObject = analyseLine(line);
String id = jsonObject.getString(ID);
JsonDocument jsonDocument = JsonDocument.create(id, jsonObject);
bucket.upsert(jsonDocument, PersistTo.MASTER, CouchBaseConnector.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS);
@ -31,7 +100,42 @@ public class InsertDocument extends DocumentElaboration {
@Override
protected void afterElaboration() {
// Nothing to do
if(serviceUsageRecordElaboration) {
File destinationFolder = file.getParentFile();
File csvFile = new File(destinationFolder, CSV_FILENAME);
for(String key : serviceClassName_calledMethods.keySet()) {
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
for(String calledMethod : setOfCalledMethods) {
try {
Utility.printLine(csvFile, key + "," + calledMethod);
} catch(Throwable e) {
logger.error("Unable to print CSV line : {},{}", key, calledMethod);
}
}
}
if(!unparsableLines.isEmpty()) {
try {
Utility.printLine(csvFile, "");
Utility.printLine(csvFile, "------------------------------------------------------------------");
Utility.printLine(csvFile, "Unparsable Lines");
Utility.printLine(csvFile, "");
}catch (Throwable e) {
logger.error("Unable to add separator for unparsable lines in CSV");
}
for(String unparsableLine : unparsableLines) {
try {
Utility.printLine(csvFile, unparsableLine);
}catch(Throwable e) {
logger.error("Unable to print unparsable line in CSV : {}", unparsableLine);
}
}
}
}
}
public void setServiceUsageRecordElaboration(boolean serviceUsageRecordElaboration) {
this.serviceUsageRecordElaboration = serviceUsageRecordElaboration;
}
}

View File

@ -9,6 +9,7 @@ import org.gcube.accounting.aggregator.status.AggregationState;
import org.gcube.accounting.aggregator.status.AggregationStatus;
import org.gcube.accounting.aggregator.utility.Utility;
import org.gcube.accounting.aggregator.workspace.WorkSpaceManagement;
import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord;
import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,9 +31,11 @@ public class Persist {
protected final File originalRecordsbackupFile;
protected final File aggregateRecordsBackupFile;
protected final String recordType;
public Persist(AggregationStatus aggregationStatus,
Bucket originalRecordBucket, Bucket aggregatedRecordBucket,
File originalRecordsbackupFile, File aggregateRecordsBackupFile) {
File originalRecordsbackupFile, File aggregateRecordsBackupFile, String recordType) {
super();
this.aggregationStatus = aggregationStatus;
@ -42,6 +45,8 @@ public class Persist {
this.originalRecordsbackupFile = originalRecordsbackupFile;
this.aggregateRecordsBackupFile = aggregateRecordsBackupFile;
this.recordType = recordType;
}
private void setAggregationStateToCompleted(Calendar now) throws Exception {
@ -84,6 +89,8 @@ public class Persist {
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.DELETED)){
// For Each aggregated row stored on file it add them to Bucket. At the end of elaboration set AggregationStatus to ADDED
InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.getSimpleName())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);
insertDocument.elaborate();
}

View File

@ -15,6 +15,7 @@ import org.gcube.common.homelibrary.home.HomeManager;
import org.gcube.common.homelibrary.home.HomeManagerFactory;
import org.gcube.common.homelibrary.home.User;
import org.gcube.common.homelibrary.home.workspace.Workspace;
import org.gcube.common.storagehub.client.dsl.FileContainer;
import org.gcube.common.storagehub.client.dsl.FolderContainer;
import org.gcube.common.storagehub.client.dsl.ItemContainer;
import org.gcube.common.storagehub.client.dsl.ListResolverTyped;
@ -120,7 +121,7 @@ public class WorkSpaceManagement {
}
}
protected FolderContainer getOrCreateFolder(FolderContainer parent, String name, String description, boolean hidden)
public FolderContainer getOrCreateFolder(FolderContainer parent, String name, String description, boolean hidden)
throws Exception {
FolderContainer destinationFolder = null;
ListResolverTyped listResolverTyped = parent.list();
@ -142,45 +143,14 @@ public class WorkSpaceManagement {
return destinationFolder;
}
public FolderContainer createFolder(FolderContainer parentPath, String folderName, String folderDescription)
throws Exception {
// TODO
/*
try {
HTTPCall httpCall = new HTTPCall(restEndpointMap.get("CreateFolder"), USER_AGENT);
Map<String, String> parameters = new HashMap<>();
parameters.put("name", folderName);
parameters.put("description", folderDescription);
parameters.put("parentPath", parentPath);
httpCall.call("", HTTPMETHOD.POST, parameters, null, HTTPCall.CONTENT_TYPE_TEXT_PLAIN);
return parentPath + "/" + folderName;
} catch (Exception e) {
logger.error("Error while creating folder ", e);
throw e;
}
*/
return null;
}
public void uploadFile(InputStream inputStream, String name, String description, String mimeType,
public FileContainer uploadFile(InputStream inputStream, String fileName, String description, String mimeType,
FolderContainer parentPath) throws Exception {
try {
logger.trace("Going to upload file on WorkSpace name:{}, description:{}, mimetype:{}, parentPath:{}", name,
logger.trace("Going to upload file on WorkSpace name:{}, description:{}, mimetype:{}, parentPath:{}", fileName,
description, mimeType, parentPath);
// TODO
/*
HTTPCall httpCall = new HTTPCall(restEndpointMap.get("Upload"), USER_AGENT);
Map<String, String> parameters = new HashMap<>();
parameters.put("name", name);
parameters.put("description", description);
parameters.put("parentPath", parentPath);
httpCall.call("", HTTPMETHOD.POST, inputStream, parameters, HTTPCall.CONTENT_TYPE_TEXT_PLAIN);
*/
FileContainer filecontainer = parentPath.uploadFile(inputStream, fileName, description);
return filecontainer;
} catch (Exception e) {
logger.error("Error while uploading file on WorkSpace", e);
throw e;