package org.gcube.accounting.aggregator.persist; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.gcube.accounting.aggregator.elaboration.Elaborator; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceDst; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceFactory; import org.gcube.accounting.aggregator.persistence.PostgreSQLConnectorDst; import org.gcube.accounting.aggregator.status.AggregationState; import org.gcube.accounting.aggregator.status.AggregationStatus; import org.gcube.accounting.aggregator.utility.Utility; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.persistence.AccountingPersistenceConfiguration; import org.gcube.com.fasterxml.jackson.core.JsonProcessingException; import org.gcube.com.fasterxml.jackson.databind.JsonNode; import org.gcube.documentstore.persistence.PersistencePostgreSQL; import org.gcube.documentstore.records.DSMapper; import org.gcube.documentstore.records.Record; import org.gcube.documentstore.records.RecordUtility; /** * @author Luca Frosini (ISTI - CNR) */ public class InsertDocument extends DocumentElaboration { public static final String CSV_FILENAME_SUFFIX = "." + ServiceUsageRecord.CALLED_METHOD + "s.csv"; /** * This is used to save a CSV file which allow to made a post analysis of calledMethods * The format of the CSV file is * serviceClass,serviceName,calledMethod * e.g. * SDI,GeoNetwork,create * * There is an entry for each triple */ protected Map> serviceClassName_calledMethods; protected List unparsableLines; protected boolean serviceUsageRecordElaboration; protected File calledMethodCSVFile; protected AggregatorPersistenceDst aggregatorPersistenceDst; protected int count; // public InsertDocument(AggregationStatus aggregationStatus, File file, Bucket bucket){ public InsertDocument(AggregationStatus aggregationStatus, File file) throws Exception{ // super(aggregationStatus, AggregationState.ADDED, file, bucket, aggregationStatus.getAggregatedRecordsNumber()); super(aggregationStatus, AggregationState.ADDED, file, aggregationStatus.getAggregatedRecordsNumber()); serviceUsageRecordElaboration = false; serviceClassName_calledMethods = new TreeMap>(); unparsableLines = new ArrayList<>(); File destinationFolder = file.getParentFile(); calledMethodCSVFile = new File(destinationFolder, file.getName().replace(Elaborator.AGGREGATED_SUFFIX, CSV_FILENAME_SUFFIX)); aggregatorPersistenceDst = AggregatorPersistenceFactory.getAggregatorPersistenceDst(); count = 0; } protected String getKey(JsonNode jsonNode) { String serviceClass = jsonNode.get(ServiceUsageRecord.SERVICE_CLASS).asText(); String serviceName = jsonNode.get(ServiceUsageRecord.SERVICE_NAME).asText(); return serviceClass + "," + serviceName; } protected void addServiceClassName_calledMethods(JsonNode jsonNode) { String key = getKey(jsonNode); String calledMethod = jsonNode.get(ServiceUsageRecord.CALLED_METHOD).asText(); int operationCount = 0; try { operationCount = jsonNode.get(AggregatedServiceUsageRecord.OPERATION_COUNT).asInt(); }catch (Exception e) { logger.error("", e); // the record was not an Aggregated ServiceUsageRecord } Map calledMethodsMap = serviceClassName_calledMethods.get(key); if(calledMethodsMap==null) { calledMethodsMap = new TreeMap<>(); serviceClassName_calledMethods.put(key, calledMethodsMap); } if(!calledMethodsMap.containsKey(calledMethod)) { calledMethodsMap.put(calledMethod, operationCount); }else { int operationCountOnMap = calledMethodsMap.get(calledMethod); int sum = operationCountOnMap + operationCount; calledMethodsMap.put(calledMethod, sum); } } protected JsonNode analyseLine(String line) throws JsonProcessingException, IOException { JsonNode jsonNode = DSMapper.asJsonNode(line); if(serviceUsageRecordElaboration) { try { addServiceClassName_calledMethods(jsonNode); }catch (Throwable e) { unparsableLines.add(line); } } return jsonNode; } @Override protected void elaborateLine(String line) throws Exception { JsonNode jsonNode = analyseLine(line); Record record = RecordUtility.getRecord(jsonNode.toString()); aggregatorPersistenceDst.insert(record); ++count; if(count==100) { aggregatorPersistenceDst.commitAndClose(); count = 0; } } @Override protected void afterElaboration() throws Exception { aggregatorPersistenceDst.commitAndClose(); count = 0; if(serviceUsageRecordElaboration) { if(calledMethodCSVFile.exists()) { calledMethodCSVFile.delete(); } for(String key : serviceClassName_calledMethods.keySet()) { Map calledMethodsMap = serviceClassName_calledMethods.get(key); for(String calledMethod : calledMethodsMap.keySet()) { try { int operationCount = calledMethodsMap.get(calledMethod); Utility.printLine(calledMethodCSVFile, "\"" + key.replace(",", "\",\"") + "\",\"" + calledMethod + "\",\"" + String.valueOf(operationCount) + "\""); } catch(Throwable e) { logger.error("Unable to print CSV line : {},{}", key, calledMethod); } } } if(!unparsableLines.isEmpty()) { try { Utility.printLine(calledMethodCSVFile, ""); Utility.printLine(calledMethodCSVFile, "------------------------------------------------------------------"); Utility.printLine(calledMethodCSVFile, "Unparsable Lines"); Utility.printLine(calledMethodCSVFile, ""); }catch (Throwable e) { logger.error("Unable to add separator for unparsable lines in CSV"); } for(String unparsableLine : unparsableLines) { try { Utility.printLine(calledMethodCSVFile, unparsableLine); }catch(Throwable e) { logger.error("Unable to print unparsable line in CSV : {}", unparsableLine); } } } } } public void setServiceUsageRecordElaboration(boolean serviceUsageRecordElaboration) { this.serviceUsageRecordElaboration = serviceUsageRecordElaboration; } public File getCalledMethodCSVFile() { return calledMethodCSVFile; } }