package org.gcube.accounting.aggregator.plugin; import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.PrintStream; import java.io.Serializable; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.gcube.accounting.aggregator.configuration.ConfigurationServiceEndpoint; import org.gcube.accounting.aggregator.configuration.Constant; import org.gcube.accounting.aggregator.configuration.ManagementFileBackup; import org.gcube.accounting.aggregator.madeaggregation.Aggregation; import org.gcube.accounting.aggregator.madeaggregation.AggregationType; import org.gcube.accounting.aggregator.persistence.AggregatorPersistenceBackendQueryConfiguration; import org.gcube.accounting.aggregator.recovery.RecoveryRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedJobUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedPortletUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedServiceUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedStorageUsageRecord; import org.gcube.accounting.datamodel.aggregation.AggregatedTaskUsageRecord; import org.gcube.accounting.datamodel.usagerecords.JobUsageRecord; import org.gcube.accounting.datamodel.usagerecords.PortletUsageRecord; import org.gcube.accounting.datamodel.usagerecords.ServiceUsageRecord; import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; import org.gcube.accounting.datamodel.usagerecords.TaskUsageRecord; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.documentstore.exception.InvalidValueException; import org.gcube.documentstore.persistence.PersistenceCouchBase; import org.gcube.documentstore.records.AggregatedRecord; import org.gcube.documentstore.records.RecordUtility; import org.gcube.vremanagement.executor.plugin.Plugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.PersistTo; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.json.JsonArray; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.view.ViewQuery; import com.couchbase.client.java.view.ViewResult; import com.couchbase.client.java.view.ViewRow; /** * @author Alessandro Pieve (ISTI - CNR) * */ public class AccountingAggregatorPlugin extends Plugin { private static Logger logger = LoggerFactory.getLogger(AccountingAggregatorPlugin.class); public Bucket accountingBucket; protected Cluster cluster; public Aggregation aggregate; public static final String AGGREGATED = "aggregated"; private final static String LINE_FREFIX = "{"; private final static String LINE_SUFFIX = "}"; private final static String KEY_VALUE_PAIR_SEPARATOR = ","; private final static String KEY_VALUE_LINKER = "="; public static Integer countInsert=0; public static Integer countDelete=0; public static Integer recoveryMode=0; public Boolean backup=true; //value if 0 PersistTo.MASTER if 1 PersistTo.ONE public static Integer typePersisted=0; protected PersistTo persisted ; /** * @param runningPluginEvolution */ public AccountingAggregatorPlugin(AccountingAggregatorPluginDeclaration pluginDeclaration) { super(pluginDeclaration); } /* The environment configuration */ protected static final CouchbaseEnvironment ENV = DefaultCouchbaseEnvironment.builder() .connectTimeout(Constant.CONNECTION_TIMEOUT * 1000) .maxRequestLifetime(Constant.MAX_REQUEST_LIFE_TIME * 1000) .queryTimeout(Constant.CONNECTION_TIMEOUT * 1000) //15 Seconds in milliseconds .viewTimeout(Constant.VIEW_TIMEOUT_BUCKET * 1000)//120 Seconds in milliseconds .keepAliveInterval(3600 * 1000) // 3600 Seconds in milliseconds .kvTimeout(5000) //in ms .build(); /**{@inheritDoc}*/ @Override public void launch(Map inputs) throws Exception { countInsert=0; countDelete=0; if(inputs == null || inputs.isEmpty()){ logger.debug("{} inputs {}", this.getClass().getSimpleName(), inputs); throw new Exception("Inputs null"); } //Type :HOURLY,DAILY,MONTHLY,YEARLY //Interval: Number of hour,day,month,year if (!inputs.containsKey("type") || !inputs.containsKey("interval")) throw new IllegalArgumentException("Interval and type must be defined"); AggregationType aggType =AggregationType.valueOf((String)inputs.get("type")); Integer intervaTot=(Integer)inputs.get("interval"); Integer interval=intervaTot* aggType.getMultiplierFactor(); //new feature for not elaborate the full range but a set of small intervals if (inputs.containsKey("intervalStep")) interval=(Integer)inputs.get("intervalStep"); Integer inputStartTime=null; String pathFile = null; if (inputs.containsKey("startTime")) inputStartTime=(Integer)inputs.get("startTime"); else{ //get start time with file logger.debug("Attention get start Time from file"); if (inputs.containsKey("pathFile")){ //get start time from file pathFile=(String) inputs.get("pathFile"); logger.error("open file:{}",pathFile); BufferedReader reader = new BufferedReader(new FileReader(pathFile)); String line; while ((line = reader.readLine()) != null) { line=line.trim(); String strDate = line; SimpleDateFormat fmt = new SimpleDateFormat("yyyy/MM/dd"); Date d1 = fmt.parse(strDate); Date now = new Date(); long millisDiff = now.getTime() - d1.getTime(); inputStartTime= (int) (millisDiff / 86400000); logger.debug("Read Start Time:{}",d1.toString()); logger.debug("Start Time:{}",inputStartTime); } reader.close(); } } Boolean currentScope =false; String scope=null; if (inputs.containsKey("currentScope")) currentScope=(Boolean)inputs.get("currentScope"); if (currentScope) scope=ScopeProvider.instance.get(); if (inputs.containsKey("user")) Constant.user=(String)inputs.get("user"); else Constant.user="service.aggregatorAccounting"; if (inputs.containsKey("recovery")) recoveryMode=(Integer)inputs.get("recovery"); if (inputs.containsKey("backup")) backup=(Boolean)inputs.get("backup"); if (inputs.containsKey("typePersisted")) typePersisted=(Integer)inputs.get("typePersisted"); switch(typePersisted) { case 0: persisted=PersistTo.MASTER; break; case 1: persisted=PersistTo.ONE; break; default: persisted=PersistTo.MASTER; } logger.debug("-Launch with Type:{}, Interval:{}, startTime:{}, Scope:{}, Recovery:{}",aggType.toString(),interval,inputStartTime,scope,recoveryMode); logger.debug("persist:{} backup:{}",persisted.toString(),backup); if(!backup){ logger.warn("Attention backup disabled"); Thread.sleep(20000); } if (inputs.containsKey("intervalStep")){ logger.debug("Interval is not considered, aggregate only :{} step",interval); } //Get Configuration from service end point String url=null; String password =null; List listBucket=new ArrayList(); AggregatorPersistenceBackendQueryConfiguration configuration; try{ configuration = new AggregatorPersistenceBackendQueryConfiguration(PersistenceCouchBase.class); url = configuration.getProperty(ConfigurationServiceEndpoint.URL_PROPERTY_KEY); password = configuration.getProperty(ConfigurationServiceEndpoint.PASSWORD_PROPERTY_KEY); if (inputs.containsKey("bucket")) listBucket.add(inputs.get("bucket").toString()); else{ listBucket.add(configuration.getProperty(ConfigurationServiceEndpoint.BUCKET_STORAGE_NAME_PROPERTY_KEY)); listBucket.add(configuration.getProperty(ConfigurationServiceEndpoint.BUCKET_SERVICE_NAME_PROPERTY_KEY)); listBucket.add(configuration.getProperty(ConfigurationServiceEndpoint.BUCKET_JOB_NAME_PROPERTY_KEY)); listBucket.add(configuration.getProperty(ConfigurationServiceEndpoint.BUCKET_PORTLET_NAME_PROPERTY_KEY)); listBucket.add(configuration.getProperty(ConfigurationServiceEndpoint.BUCKET_TASK_NAME_PROPERTY_KEY)); } } catch (Exception e) { logger.error("launch",e.getLocalizedMessage()); throw e; } Cluster cluster = CouchbaseCluster.create(ENV, url); //Define a type for aggregate RecordUtility.addRecordPackage(PortletUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedPortletUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(JobUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedJobUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(TaskUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedTaskUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(StorageUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedStorageUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(ServiceUsageRecord.class.getPackage()); RecordUtility.addRecordPackage(AggregatedServiceUsageRecord.class.getPackage()); //end define Date today = new Date(); Date endScriptTime = new Date(); if (inputs.containsKey("endScriptTime")){ DateFormat df = new SimpleDateFormat ("MM/dd/yyyy HH:mm"); endScriptTime = df.parse ((today.getMonth()+1)+"/"+today.getDate()+"/"+(today.getYear()+1900) +" "+(String)inputs.get("endScriptTime")); logger.debug("Script Run until :{}"+endScriptTime); } do { logger.debug("--Start Time Loop:{}"+inputStartTime); initFolder(); if ((recoveryMode==2)||(recoveryMode==0)){ logger.debug("Recovery mode enabled"); RecoveryRecord.searchFile(cluster,configuration); } if (recoveryMode!=2){ /* for (String bucket:listBucket){ logger.trace("OpenBucket:{}",bucket); accountingBucket = cluster.openBucket(bucket,password); //elaborate bucket, with scope, type aggregation and interval elaborateBucket(bucket,scope, inputStartTime, interval, aggType); } */ if (inputs.containsKey("pathFile")){ //update a file for new start time FileOutputStream file = new FileOutputStream(pathFile); PrintStream output = new PrintStream(file); logger.debug("Update pathfile:{} with new start time:{}",pathFile,inputStartTime-intervaTot); Date dateNow = new Date(); Calendar data = Calendar.getInstance(); data.setTime(dateNow); data.add(Calendar.DATE,-(inputStartTime-intervaTot)); SimpleDateFormat format1 = new SimpleDateFormat("yyyy/MM/dd"); String formatted = format1.format(data.getTime()); output.println(formatted); inputStartTime=inputStartTime-intervaTot; today = new Date(); } logger.debug("Complete countInsert{}, countDelete{}",countInsert,countDelete); } } while(today.compareTo(endScriptTime)<0); logger.debug("Plugin Terminated"); } /**{@inheritDoc}*/ @Override protected void onStop() throws Exception { logger.trace("{} onStop() function", this.getClass().getSimpleName()); Thread.currentThread().interrupt(); } /** * Init folder for backup file */ public void initFolder(){ Constant.PATH_DIR_BACKUP=System.getProperty(Constant.HOME_SYSTEM_PROPERTY)+"/"+Constant.NAME_DIR_BACKUP; Constant.PATH_DIR_BACKUP_INSERT=Constant.PATH_DIR_BACKUP+"/insert"; Constant.PATH_DIR_BACKUP_DELETE=Constant.PATH_DIR_BACKUP+"/delete"; File DirRoot = new File(Constant.PATH_DIR_BACKUP); if (!DirRoot.exists()) { DirRoot.mkdir(); } logger.debug("init folder:{}",Constant.PATH_DIR_BACKUP); } /** * Elaborate a Bucket from startTime to interval * @param bucket * @param inputStartTime * @param interval * @param aggType * @return * @throws Exception */ protected boolean elaborateBucket(String bucket,String scope ,Integer inputStartTime,Integer interval,AggregationType aggType) throws Exception{ SimpleDateFormat format = new SimpleDateFormat(aggType.getDateformat()); //calculate a start time and end time for map reduce key Calendar now, nowTemp; if (inputStartTime==null){ now= Calendar.getInstance(); nowTemp= Calendar.getInstance(); }else{ now=Calendar.getInstance(); nowTemp= Calendar.getInstance(); switch (aggType.name()) { case "YEARLY": now.add( Calendar.YEAR, -inputStartTime ); nowTemp.add( Calendar.YEAR, -inputStartTime ); break; case "MONTHLY": now.add( Calendar.MONTH, -inputStartTime ); nowTemp.add( Calendar.MONTH, -inputStartTime ); break; case "DAILY": now.add( Calendar.DATE, -inputStartTime ); nowTemp.add( Calendar.DATE, -inputStartTime ); break; case "HOURLY": now.add( Calendar.HOUR, -inputStartTime ); nowTemp.add( Calendar.HOUR, -inputStartTime ); break; } } String endAllKeyString = format.format(now.getTime()); String endKeyString = format.format(now.getTime()); //save a record modified into a file and save into a workspace nowTemp.add(aggType.getCalendarField(), -1*interval); String startAllKeyString = format.format(nowTemp.getTime()); if (backup){ logger.debug("Start Backup"); WorkSpaceManagement.onSaveBackupFile(accountingBucket,bucket,scope,startAllKeyString, endAllKeyString,aggType); } else logger.debug("No Backup required"); List documentElaborate=new ArrayList(); for (int i=0; i documentElaborate) throws Exception{ int i=0; JsonDocument documentJson = null; try { //patch for field of long type String document=row.value().toString().replace("\":", "=").replace("\"", ""); i=1;//1 Map map = getMapFromString(document); i=2;//2 //prepare a document for elaborate String identifier=(String) row.document().content().get("id"); i=3;//3 documentJson = JsonDocument.create(identifier, row.document().content()); i=4;//4 @SuppressWarnings("rawtypes") AggregatedRecord record = (AggregatedRecord)RecordUtility.getRecord(map); i=5;//5 aggregate.aggregate(record); i=6;//6 //insert an elaborate row into list JsonDocument for memory document elaborate documentElaborate.add(documentJson); i=7;//7 return true; } catch(InvalidValueException ex){ logger.warn("InvalidValueException - Record is not valid. Anyway, it will be persisted i:{}",i); logger.warn("Runtime Exception ex",ex); if ((i==5)&&(documentJson!=null)){ documentElaborate.add(documentJson); } return false; } catch(RuntimeException exr){ logger.warn("Runtime Exception -Record is not valid. Anyway, it will be persisted i:{}",i); logger.warn("Runtime Exception exr",exr); if ((i==5)&&(documentJson!=null)){ documentElaborate.add(documentJson); logger.debug("Record is elaborate"); } return false; } catch (Exception e) { logger.error("record is not elaborated:"+row.toString()+" but it will be persisted"); logger.error("error elaborateRow", e); logger.error("i:{}",i); if ((i==5)&&(documentJson!=null)){ documentElaborate.add(documentJson); logger.debug("Record is elaborate"); } return false; } } /** * getMapFromString * @param serializedMap * @return */ protected static Map getMapFromString(String serializedMap){ /* Checking line sanity */ if(!serializedMap.startsWith(LINE_FREFIX) && !serializedMap.endsWith(LINE_SUFFIX)){ return null; } /* Cleaning prefix and suffix to parse line */ serializedMap = serializedMap.replace(LINE_FREFIX, ""); serializedMap = serializedMap.replace(LINE_SUFFIX, ""); Map map = new HashMap(); String[] pairs = serializedMap.split(KEY_VALUE_PAIR_SEPARATOR); for (int i=0;i docs,String nameFile) throws Exception{ if (docs.size()!=0){ Integer index=0; boolean succesfulDelete=false; logger.trace("Start a delete document:{}",docs.size()); //before elaborate a record, create a backup file List notDeleted = docs; List notInserted = aggregate.reallyFlush(); nameFile =nameFile+"-"+UUID.randomUUID(); ManagementFileBackup.getInstance().onCreateStringToFile(notDeleted,Constant.FILE_RECORD_NO_AGGREGATE+"_"+nameFile,false); ManagementFileBackup.getInstance().onCreateStringToFile(notInserted,Constant.FILE_RECORD_AGGREGATE+"_"+nameFile,true); List notDeletedTemp = null; while ((index < Constant.NUM_RETRY) && !succesfulDelete){ notDeletedTemp = new ArrayList(); for (JsonDocument doc: notDeleted){ if (index>0){ logger.trace("delete Start {} pass",index); } countDelete ++; try{ accountingBucket.remove(doc.id(),persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); } catch(Exception e){ logger.warn("doc:{} not deleted retry:{} for error:{}",doc.id(),index,e); Thread.sleep(1500); try{ if (accountingBucket.exists(doc.id())) notDeletedTemp.add(doc); } catch(Exception ext){ logger.warn("doc:{} not verify for delete because timeout, retry:{}",doc.id(),index,ext); Thread.sleep(3000); try{ if (accountingBucket.exists(doc.id())) notDeletedTemp.add(doc); } catch(Exception ex) { logger.error("doc:{} not delete ({}), problem with exist bucket",doc.id(),doc.toString(),ex); logger.error("force insert into list for delete"); notDeletedTemp.add(doc); } } } } if (notDeletedTemp.isEmpty()){ succesfulDelete=true; } else { index++; notDeleted = new ArrayList(notDeletedTemp); Thread.sleep(1000); logger.trace("First pass no delete all succesfulDelete:{} index:{}",succesfulDelete,index); } } if (!succesfulDelete){ logger.error("Error Delete record"); } logger.debug("Delete complete:{}, Start a insert aggregated document:{}",countDelete,notInserted.size()); // delete all record and ready for insert a new aggregated record if (succesfulDelete){ //if successful record delete, delete backup file ManagementFileBackup.getInstance().onDeleteFile(Constant.FILE_RECORD_NO_AGGREGATE+"_"+nameFile,false); index=0; boolean succesfulInsert=false; while ((index < Constant.NUM_RETRY) && !succesfulInsert){ List notInsertedTemp = new ArrayList(); for (JsonDocument document: notInserted){ if (index>0){ logger.trace("insert Start {} pass for document:{}",index,document.toString()); } countInsert ++; try{ //JsonDocument response = accountingBucket.upsert(document,PersistTo.MASTER,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); JsonDocument response = accountingBucket.upsert(document,persisted,Constant.CONNECTION_TIMEOUT_BUCKET, TimeUnit.SECONDS); } catch(Exception e){ logger.warn("record:{} not insert retry:{} for error:{}",document.id(),index,e); Thread.sleep(1500); try{ if (!accountingBucket.exists(document.id())) notInsertedTemp.add(document); } catch(Exception ext){ logger.warn("doc:{} not verify for insert because timeout, retry",document.id(),ext); Thread.sleep(3000); try{ if (!accountingBucket.exists(document.id())) notInsertedTemp.add(document); } catch(Exception ex) { logger.error("doc:{} not insert ({}), problem with exist bucket",document.id(),document.toString(),ex); logger.error("force insert into list for insert"); notInsertedTemp.add(document); } } } } if (notInsertedTemp.isEmpty()){ succesfulInsert=true; } else { index++; notInserted = new ArrayList(notInsertedTemp); Thread.sleep(1000); logger.trace("First pass no insert all succesfulInsert:{} index:{}",succesfulInsert,index); } } if (!succesfulInsert){ //do something clever with the exception logger.error("Error Insert record{}"); } else{ logger.debug("elaborate record aggregate:{} and record not aggregate:{}",countInsert, countDelete); ManagementFileBackup.getInstance().onDeleteFile(Constant.FILE_RECORD_AGGREGATE+"_"+nameFile,true); } } logger.trace("Insert complete"); } return true; } }