package org.gcube.contentmanager.storageserver.consumer; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.List; import org.bson.types.ObjectId; import org.gcube.accounting.datamodel.usagerecords.StorageUsageRecord; import org.gcube.common.authorization.library.provider.AccessTokenProvider; import org.gcube.common.scope.api.ScopeProvider; import org.gcube.contentmanager.storageserver.accounting.Report; import org.gcube.contentmanager.storageserver.accounting.ReportConfig; import org.gcube.contentmanager.storageserver.accounting.ReportException; import org.gcube.contentmanager.storageserver.accounting.ReportFactory; import org.gcube.contentmanager.storageserver.data.CubbyHole; import org.gcube.contentmanager.storageserver.data.OpLogRemoteObject; import org.gcube.contentmanager.storageserver.parse.utils.ValidationUtils; import org.gcube.contentmanager.storageserver.store.MongoDB; import org.gcube.contentmanager.storageserver.store.StorageStatusObject; import org.gcube.contentmanager.storageserver.utils.Utils; import org.gcube.oidc.rest.JWTToken; import org.gcube.oidc.rest.OpenIdConnectRESTHelper; import org.gcube.oidc.rest.OpenIdConnectRESTHelperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mongodb.DBObject; public class UserAccountingConsumer extends Thread{ final static Logger logger=LoggerFactory.getLogger(UserAccountingConsumer.class); final static int MINUTE_DECREMENT=-2; private CubbyHole c; private int number; private Report report; // object fields private String op; private String user; private String password; private String clientId; private String secret; private String oidcEndpoint; String[] server; List dtsHosts; // private String id; public UserAccountingConsumer(String[] srvs, CubbyHole c, int number,List dtsHosts, String clientId, String secret, String authEndpoint){ this.c=c; this.number=number; this.server=srvs; this.dtsHosts=dtsHosts; this.clientId=clientId; this.secret=secret; this.oidcEndpoint=authEndpoint; // init the accounting report try { init(); } catch (ReportException e) { throw new RuntimeException("Accounting report Exception initialization"); } } public UserAccountingConsumer(String[] srvs, String user, String password, CubbyHole c, int number, List dtsHosts, String clientId, String secret, String authEndpoint){ this.c=c; this.number=number; this.server=srvs; this.dtsHosts=dtsHosts; this.user=user; this.password=password; this.clientId=clientId; this.secret=secret; this.oidcEndpoint=authEndpoint; // init the accounting report try { init(); } catch (ReportException e) { throw new RuntimeException("Accounting report Exception initialization"); } } private void init() throws ReportException{ report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE); report.init(); } public void run() { logger.debug("Consuming SU started "); MongoDB mongo=null; while(true){ DBObject x=null; try{ logger.debug("SU waiting next record..."); x=c.get(); logger.debug("Consumer #" + this.number + " got: " + x ); //get operation op = (String) x.get("op"); // retrieve object fields DBObject obj=(DBObject)x.get("o"); OpLogRemoteObject record=retrieveObjectFields(obj); // set object dimension logger.debug("[recordCheck] operation: "+op+" name: "+record.getName()+" type: "+record.getType()+" path: "+record.getFilename()+" dir Path: "+record.getDir()+" length: "+record.getLength()+" owner: "+record.getOwner()+ " lastOperation "+record.getLastOperation()+"\n\t lastUser: "+record.getLastUser()+" lastAccess: "+record.getLastAccess()); if(((record.getLength() >0) && (((record.getFilename() !=null) && (record.getFilename().length()>0) && (record.getDir().length()>0)&& (record.getDir().contains("/"))) || (record.getLinkCount() > 0)))){ String id=((DBObject)x.get("o")).get("_id").toString(); logger.debug("Consumer id " + id ); //convert from byte to kb record.setLength(record.getLength()/1024); // check scope String scope=null; String pathString=(String)obj.get("onScope"); logger.debug("[recordCheck] it is a link to: "+pathString); if((record.getDir()!=null)&& (record.getDir().contains("/"))){ scope=retrieveScopeFromRemoteFilePath(record.getDir()); }else{ // field added on storage manager library for retrieve scope. Used only if it is a link delete scope=retrieveScopeFromRemoteFilePath(pathString); } logger.debug("scope retrieved "); boolean validScope=ValidationUtils.validationScope(scope); if(validScope){ if(record.getDelete() != null){ record.setLastOperation("DELETE"); }else if ((record.getLastOperation() != null) && (op != null) && (record.getLastOperation().equalsIgnoreCase("LINK")) && (op.equalsIgnoreCase("u"))){ //add check on operation // it is an update on a link object this operation doesn't be accounted logger.debug("[recordCheck] update on link object is not accounted. Skip next "); continue; }else if((record.getLastOperation()==null) || (record.getLastUser()==null)){ logger.warn("lastOperation: "+record.getLastOperation()+" lastUser: "+record.getLastUser()+". These values cannot be null. Skip next "); continue; }else{ // if the lastoperation is download and the caller contains a dts host then it isn't a real user but it is a workspace accounting record if((dtsHosts != null) && (record.getLastOperation() != null) && (record.getLastOperation().equalsIgnoreCase("DOWNLOAD"))){ logger.debug("[recordCheck] check if the caller is from dts. CallerIP: "+record.getCallerIp()+" dts value: "+dtsHosts); for(String host: dtsHosts){ logger.debug("scan "+host); if(record.getCallerIp().contains(host)){ record.setLastOperation("workspace.accounting"); logger.debug("[recordCheck] the caller is dts service: caller "+record.getCallerIp()+ " dts host: "+host+" the new user is: "+record.getLastUser()); } } }else if((record.getLastOperation() != null) && (record.getLastOperation().equalsIgnoreCase("UPLOAD")) && op.equalsIgnoreCase("u")){ //it is only a metadata update on a previous insert so it is not need to send the record logger.info("[recordCheck] metadata update on a previous insert. Record discarded: "+id); continue; } } logger.debug("[recordCheck] accounting operation "+record.getLastOperation()); StorageStatusObject ssr=null; if(isNeedSSReport(record.getLastOperation())){ try{ logger.debug("[recordCheck] update SS record yet"); if (mongo== null) mongo=new MongoDB(server, user, password); if(record.getLastOperation().equalsIgnoreCase("COPY") || record.getLastOperation().equalsIgnoreCase("SOFT_COPY")) record.setOwner(record.getLastUser()); ssr=new StorageStatusObject(record.getOwner(), record.getLength(), 1); ssr=mongo.updateUserVolume(ssr, record.getLastOperation()); // mongo.close(); }catch(Exception e){ e.printStackTrace(); mongo.close(); logger.error("[recordCheck] Problem when updating storage status record: "+e.getMessage()); } } try{ // scope set add in 1.3 for new accounting libs ScopeProvider.instance.set(scope); //call to the accounting library if (ssr !=null) report( record, scope, ssr.getVolume()+"", ssr.getCount()+""); else report( record, scope, null, null); }catch(Exception e){ logger.error("[recordCheck] Problem sending accounting report. Exception message: "+e.getMessage()); } }else{ logger.warn("[recordCheck] operation "+record.getLastOperation()+" is not accounted: invalid scope: "+scope); } }else{ logger.debug("operation "+record.getLastOperation()+" is not accounted"); } }catch(Exception e){ logger.error("ERROR Processing record: "+x+" Exception throws: "+e.getMessage()); logger.info("skip to next record "); if(mongo!=null) mongo.close(); } } } /** * check if the lastOperation is a valid operation for storage status information record * @param lastOperation * @return */ private boolean isNeedSSReport(String lastOperation) { logger.trace("Last operation is "+lastOperation); if(lastOperation.equalsIgnoreCase("UPLOAD") || lastOperation.equalsIgnoreCase("COPY") || lastOperation.equalsIgnoreCase("SOFT_COPY")|| lastOperation.equalsIgnoreCase("DELETE")) return true; return false; } public void runWithoutThread(DBObject x){ try { report=ReportFactory.getReport(ReportConfig.ACCOUNTING_TYPE); } catch (ReportException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } report.init(); run(); } private OpLogRemoteObject retrieveObjectFields(DBObject obj) { OpLogRemoteObject record = new OpLogRemoteObject(); record.setFilename((String) obj.get("filename")); record.setType((String) obj.get("type")); record.setName( (String) obj.get("name")); record.setOwner((String) obj.get("owner")); record.setCreationTime((String) obj.get("creationTime")); record.setDir((String) obj.get("dir")); if(obj.get("lastAccess") != null) record.setLastAccess((String)obj.get("lastAccess")); if(obj.get("callerIP") != null) record.setCallerIp((String)obj.get("callerIP")); if(obj.get("lastOperation") != null) record.setLastOperation((String)obj.get("lastOperation")); if(obj.get("lastUser") != null) record.setLastUser((String)obj.get("lastUser")); if(obj.get("linkCount") != null) record.setLinkCount((int)obj.get("linkCount")); if(obj.get("onDeleting") != null) record.setDelete((String)obj.get("onDeleting")); if(obj.get("_id") != null){ ObjectId id= (ObjectId)obj.get("_id"); if(ObjectId.isValid(id.toString())) record.setId(id.toString()); } long length=-1; if(obj.get("length")!=null) record.setLength((long)obj.get("length")); return record; } private void report(OpLogRemoteObject record, String scope, String totVolume, String totCount){ // ACCOUNTING CALL TYPE: STORAGE USAGE StorageUsageRecord sur=report.setGenericProperties(null, "storage-usage", record.getLastUser(), scope, record.getCreationTime(), record.getLastAccess(), record.getOwner(), record.getLastOperation(), record.getLength()+""); sur=report.setSpecificProperties(sur, record.getFilename(), "STORAGE", record.getCallerIp(), record.getId()); logger.info("storage usage record ready"); logger.debug("try to call: "+oidcEndpoint); URL oidcAddress; try { logger.trace("preparing URL"); oidcAddress = new URL(oidcEndpoint); logger.trace("URL ready: "+oidcAddress); Utils.setToken(oidcAddress, clientId, secret); logger.trace("token ready"); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); logger.error("Malformed URL ",e); } catch (OpenIdConnectRESTHelperException e) { // TODO Auto-generated catch block e.printStackTrace(); logger.error("OpenIdConnectRESTHelperException ",e); } logger.debug("sending record to the accounting"); report.printRecord(sur); report.send(sur); } private String retrieveScopeFromRemoteFilePath(String filename) { String[] split=filename.split("/"); if(split.length>0){ String scope=null; int i=1; if(split[1].equals("VOLATILE")){ i=2; } scope="/"+split[i]; i++; while((!split[i].equals("home")) && (!split[i].equals("public"))){ scope=scope+"/"+split[i]; logger.debug("scope building: "+scope); i++; } logger.debug("[recordCheck] scope retrieved: "+scope); return scope; }else logger.error("Scope bad format: scope not retrieved from string: "+filename); return null; } }