Fixing aggregator

This commit is contained in:
Luca Frosini 2020-03-24 20:19:00 +01:00
parent e730349f8d
commit 8812f66acc
3 changed files with 45 additions and 35 deletions

View File

@ -2,11 +2,11 @@ package org.gcube.accounting.aggregator.persist;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.gcube.accounting.aggregator.elaboration.Elaborator;
@ -41,14 +41,16 @@ public class InsertDocument extends DocumentElaboration {
protected List<String> unparsableLines;
protected boolean serviceUsageRecordElaboration;
protected File csvFile;
protected File calledMethodCSVFile;
public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){
super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber());
serviceUsageRecordElaboration = false;
serviceClassName_calledMethods = new HashMap<String,Set<String>>();
serviceClassName_calledMethods = new TreeMap<String,Set<String>>();
unparsableLines = new ArrayList<>();
File destinationFolder = file.getParentFile();
calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
}
protected String getKey(JsonObject jsonObject) {
@ -61,7 +63,7 @@ public class InsertDocument extends DocumentElaboration {
String key = getKey(jsonObject);
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
if(setOfCalledMethods==null) {
setOfCalledMethods = new HashSet<>();
setOfCalledMethods = new TreeSet<>();
serviceClassName_calledMethods.put(key, setOfCalledMethods);
}
String calledMethod = jsonObject.getString(ServiceUsageRecord.CALLED_METHOD);
@ -91,13 +93,14 @@ public class InsertDocument extends DocumentElaboration {
@Override
protected void afterElaboration() {
if(serviceUsageRecordElaboration) {
File destinationFolder = file.getParentFile();
csvFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX));
if(calledMethodCSVFile.exists()) {
calledMethodCSVFile.delete();
}
for(String key : serviceClassName_calledMethods.keySet()) {
Set<String> setOfCalledMethods = serviceClassName_calledMethods.get(key);
for(String calledMethod : setOfCalledMethods) {
try {
Utility.printLine(csvFile, key + "," + calledMethod);
Utility.printLine(calledMethodCSVFile, key + "," + calledMethod);
} catch(Throwable e) {
logger.error("Unable to print CSV line : {},{}", key, calledMethod);
}
@ -105,17 +108,17 @@ public class InsertDocument extends DocumentElaboration {
}
if(!unparsableLines.isEmpty()) {
try {
Utility.printLine(csvFile, "");
Utility.printLine(csvFile, "------------------------------------------------------------------");
Utility.printLine(csvFile, "Unparsable Lines");
Utility.printLine(csvFile, "");
Utility.printLine(calledMethodCSVFile, "");
Utility.printLine(calledMethodCSVFile, "------------------------------------------------------------------");
Utility.printLine(calledMethodCSVFile, "Unparsable Lines");
Utility.printLine(calledMethodCSVFile, "");
}catch (Throwable e) {
logger.error("Unable to add separator for unparsable lines in CSV");
}
for(String unparsableLine : unparsableLines) {
try {
Utility.printLine(csvFile, unparsableLine);
Utility.printLine(calledMethodCSVFile, unparsableLine);
}catch(Throwable e) {
logger.error("Unable to print unparsable line in CSV : {}", unparsableLine);
}
@ -128,8 +131,8 @@ public class InsertDocument extends DocumentElaboration {
this.serviceUsageRecordElaboration = serviceUsageRecordElaboration;
}
public File getCSVFile() {
return csvFile;
public File getCalledMethodCSVFile() {
return calledMethodCSVFile;
}
}

View File

@ -85,13 +85,11 @@ public class Persist {
DeleteDocument deleteDocument = new DeleteDocument(aggregationStatus, originalRecordsbackupFile, originalRecordBucket);
deleteDocument.elaborate();
}
InsertDocument insertDocument = null;
boolean serviceUsageRecordElaboration = false;
InsertDocument insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
boolean serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.newInstance().getRecordType())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);
if(AggregationState.canContinue(aggregationStatus.getAggregationState(),AggregationState.DELETED)){
// For Each aggregated row stored on file it add them to Bucket. At the end of elaboration set AggregationStatus to ADDED
insertDocument = new InsertDocument(aggregationStatus, aggregateRecordsBackupFile, aggregatedRecordBucket);
serviceUsageRecordElaboration = recordType.compareTo(ServiceUsageRecord.class.getSimpleName())==0 ? true : false;
insertDocument.setServiceUsageRecordElaboration(serviceUsageRecordElaboration);
insertDocument.elaborate();
}
@ -113,7 +111,7 @@ public class Persist {
}
if(serviceUsageRecordElaboration) {
files.add(insertDocument.getCSVFile());
files.add(insertDocument.getCalledMethodCSVFile());
}
WorkSpaceManagement.getInstance().zipAndBackupFiles(targetFolder, zipFilename, files);

View File

@ -22,9 +22,9 @@ import org.slf4j.LoggerFactory;
* @author Luca Frosini (ISTI - CNR)
*/
public class WorkSpaceManagement {
public static Logger logger = LoggerFactory.getLogger(WorkSpaceManagement.class);
private static final String ZIP_SUFFIX = ".zip";
private static final String ZIP_FILE_DESCRIPTION = "Backup of original records deleted and aggregtaed records inserted.";
private static final String ZIP_MIMETYPE = "application/zip, application/octet-stream";
@ -48,7 +48,7 @@ public class WorkSpaceManagement {
ZipEntry ze = new ZipEntry(file.getName());
zos.putNextEntry(ze);
int len;
while ((len = in.read(buffer)) > 0) {
while((len = in.read(buffer)) > 0) {
zos.write(buffer, 0, len);
}
zos.closeEntry();
@ -61,7 +61,7 @@ public class WorkSpaceManagement {
}
public boolean zipAndBackupFiles(FolderContainer targetFolder, String name, List<File> files) throws Exception {
try {
String zipFileName = getZipFileName(name);
@ -72,27 +72,35 @@ public class WorkSpaceManagement {
FileOutputStream fos = new FileOutputStream(zipFile);
ZipOutputStream zos = new ZipOutputStream(fos);
for(File file : files){
for(File file : files) {
addToZipFile(zos, file);
}
zos.close();
FileInputStream zipFileStream = new FileInputStream(zipFile);
WorkSpaceManagement.getInstance().uploadFile(zipFileStream, zipFileName, ZIP_FILE_DESCRIPTION,
ZIP_MIMETYPE, targetFolder);
WorkSpaceManagement.getInstance().uploadFile(zipFileStream, zipFileName, ZIP_FILE_DESCRIPTION, ZIP_MIMETYPE,
targetFolder);
logger.debug("Going to delete local zip file {}", zipFile.getAbsolutePath());
zipFile.delete();
for(File file : files) {
if(file.exists()) {
logger.debug("Going to delete local file {} which was added to the zip file {}",
file.getAbsolutePath(), zipFile.getAbsolutePath());
file.delete();
}
}
return true;
} catch (Exception e) {
} catch(Exception e) {
logger.error("Error while trying to save a backup file containg aggregated records", e);
throw e;
}
}
public FolderContainer getWorkspaceRoot() throws Exception {
try {
return storageHubClient.getWSRoot();
@ -140,16 +148,17 @@ public class WorkSpaceManagement {
}
return destinationFolder;
}
public FileContainer uploadFile(InputStream inputStream, String fileName, String description, String mimeType,
FolderContainer parentPath) throws Exception {
try {
logger.trace("Going to upload file on WorkSpace name:{}, description:{}, mimetype:{}, parentPath:{}", fileName,
description, mimeType, parentPath);
logger.trace("Going to upload file on WorkSpace name:{}, description:{}, mimetype:{}, parentPath:{}",
fileName, description, mimeType, parentPath);
FileContainer filecontainer = parentPath.uploadFile(inputStream, fileName, description);
logger.info("Zip file {} successfully uploaded in workspace", fileName);
return filecontainer;
} catch (Exception e) {
} catch(Exception e) {
logger.error("Error while uploading file on WorkSpace", e);
throw e;
}