first version with StorageStatus accounting with UPLOAD, DELETE and COPY OPERATION
git-svn-id: https://svn.d4science.research-infrastructures.eu/gcube/trunk/content-management/storage-manager-trigger@96862 82a268e6-3cf1-43bd-a215-b396298e98cf
This commit is contained in:
parent
2e61305e67
commit
dae808dce6
10
pom.xml
10
pom.xml
|
@ -50,11 +50,11 @@
|
|||
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
<!-- INFRA CONNECTION -->
|
||||
<dependency>
|
||||
<groupId>org.gcube.core</groupId>
|
||||
<artifactId>common-gcore-stubs</artifactId>
|
||||
<version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version>
|
||||
</dependency>
|
||||
<!-- <dependency> -->
|
||||
<!-- <groupId>org.gcube.core</groupId> -->
|
||||
<!-- <artifactId>common-gcore-stubs</artifactId> -->
|
||||
<!-- <version>[1.0.0-SNAPSHOT,2.0.0-SNAPSHOT)</version> -->
|
||||
<!-- </dependency> -->
|
||||
<dependency>
|
||||
<groupId>org.gcube.resources.discovery</groupId>
|
||||
<artifactId>ic-client</artifactId>
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package org.gcube.contentmanager.storageserver.accounting;
|
||||
|
||||
import org.gcube.accounting.datamodel.RawUsageRecord;
|
||||
|
||||
public interface Report {
|
||||
|
||||
public void init();
|
||||
|
@ -11,23 +13,18 @@ public interface Report {
|
|||
* @param resourceScope
|
||||
* @return
|
||||
*/
|
||||
public void start(String consumerId, String resourceScope, String creationTime);
|
||||
/**
|
||||
* set start time of the operation
|
||||
* @return
|
||||
*/
|
||||
public void timeUpdate();
|
||||
public RawUsageRecord setGenericProperties(String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner);
|
||||
|
||||
/**
|
||||
* Set end time of operation and other specific properties
|
||||
* @return
|
||||
*/
|
||||
public void ultimate(String owner, String operation, String size, String filePath, String id, String callerIP, String lastAccess);
|
||||
public RawUsageRecord setSpecificProperties(RawUsageRecord sur, String owner, String operation, String size, String filePath, String dataType, String dataCount);
|
||||
|
||||
/**
|
||||
* send report
|
||||
* @return
|
||||
*/
|
||||
public void send();
|
||||
public void send(RawUsageRecord sur);
|
||||
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class ReportAccountingImpl implements Report {
|
||||
final Logger logger = LoggerFactory.getLogger(ReportAccountingImpl.class);
|
||||
// storage usage record
|
||||
public RawUsageRecord sur;
|
||||
// public RawUsageRecord sur;
|
||||
// storage status record
|
||||
public RawUsageRecord ssr;
|
||||
public ResourceAccounting raFactory;
|
||||
|
@ -41,15 +41,16 @@ import org.slf4j.LoggerFactory;
|
|||
}
|
||||
|
||||
@Override
|
||||
public void start(String consumerId, String resourceScope, String creationTime) {
|
||||
logger.info("set accounting properties: consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime);
|
||||
public RawUsageRecord setGenericProperties(String resourceType, String consumerId, String resourceScope, String creationTime, String lastAccess, String owner) {
|
||||
logger.info("set accounting generic properties: resourceType: "+resourceType+" consumerId "+consumerId+" scope: "+resourceScope+ " creationTime "+creationTime+" lastAccess "+lastAccess+" owner "+ owner);
|
||||
if(raFactory==null) init();
|
||||
this.sur = new RawUsageRecord();
|
||||
RawUsageRecord sr = new RawUsageRecord();
|
||||
// generic properties
|
||||
sur.setResourceType("storage-usage");
|
||||
if(consumerId!=null) sur.setConsumerId(consumerId);
|
||||
// sur.setResourceType("storage-usage");
|
||||
sr.setResourceType(resourceType);
|
||||
if(consumerId!=null) sr.setConsumerId(consumerId);
|
||||
// ur.setResourceOwner("paolo.fabriani");
|
||||
if(resourceScope !=null) sur.setResourceScope(resourceScope);
|
||||
if(resourceScope !=null) sr.setResourceScope(resourceScope);
|
||||
//set creation time
|
||||
if(creationTime!=null){
|
||||
SimpleDateFormat formatter = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z");
|
||||
|
@ -59,45 +60,43 @@ import org.slf4j.LoggerFactory;
|
|||
} catch (ParseException e) {
|
||||
logger.error("Error in parsing date: "+creationTime+" exc msg: "+e.getMessage());
|
||||
}
|
||||
sur.setCreateTime(date);
|
||||
sr.setCreateTime(date);
|
||||
// set the mandatory fields
|
||||
try {
|
||||
date = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z").parse(lastAccess);
|
||||
sr.setStartTime(date);
|
||||
sr.setEndTime(date);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
//specific properties
|
||||
if(owner != null) sr.setResourceOwner(owner);
|
||||
// end mandatory files
|
||||
}
|
||||
return sr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void timeUpdate() {
|
||||
setStartTime();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ultimate(String owner, String operation, String size, String filePath, String id, String callerIP, String lastAccess) {
|
||||
logger.info("set accounting properties: owner "+owner+" operation: "+operation+" size: "+size+ " remotePath: "+filePath+" id: "+id+"callerIP "+callerIP+" lastAccess"+lastAccess);
|
||||
if(sur==null) this.sur = new RawUsageRecord();
|
||||
//specific properties
|
||||
if(owner != null) sur.setResourceOwner(owner);
|
||||
public RawUsageRecord setSpecificProperties( RawUsageRecord sur, String operation, String size, String filePath, String callerIP, String dataType, String dataCount) {
|
||||
logger.info("set accounting properties: operation: "+operation+" size: "+size+ " remotePath: "+filePath+" callerIP "+callerIP+" dataType "+dataType+" dataCount "+dataCount);
|
||||
if(sur==null) sur = new RawUsageRecord();
|
||||
if (operation!=null) sur.setResourceSpecificProperty("operationType",operation);
|
||||
if(size!= null) sur.setResourceSpecificProperty("dataVolume", size);
|
||||
if(filePath != null) sur.setResourceSpecificProperty("remotePath", filePath);
|
||||
if(id!= null) sur.setResourceSpecificProperty("id", id);
|
||||
// if(id!= null) sur.setResourceSpecificProperty("id", id);
|
||||
if(callerIP!=null) sur.setResourceSpecificProperty("callerIP", callerIP);
|
||||
if(lastAccess!=null){
|
||||
sur.setResourceSpecificProperty("lastAccess", lastAccess);
|
||||
// set the mandatory fields
|
||||
Date date=null;
|
||||
try {
|
||||
date = new SimpleDateFormat("dd MM yyyy 'at' hh:mm:ss z").parse(lastAccess);
|
||||
sur.setStartTime(date);
|
||||
sur.setEndTime(date);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
// end mandatory files
|
||||
}
|
||||
// set static properties
|
||||
sur.setResourceSpecificProperty("dataType","STORAGE");
|
||||
sur.setResourceSpecificProperty("dataCount", "1");
|
||||
// if(lastAccess!=null){
|
||||
// sur.setResourceSpecificProperty("lastAccess", lastAccess);
|
||||
//
|
||||
// }
|
||||
sur.setResourceSpecificProperty("dataType",dataType);
|
||||
sur.setResourceSpecificProperty("dataCount", dataCount);
|
||||
return sur;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send() {
|
||||
public void send(RawUsageRecord sur) {
|
||||
logger.info("report sending...");
|
||||
if(raFactory!=null)
|
||||
raFactory.sendAccountingMessage(sur);
|
||||
|
@ -105,43 +104,4 @@ import org.slf4j.LoggerFactory;
|
|||
logger.error("Problem on building accounting record: Factory Object is null ");
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private void setIpAddress() {
|
||||
String address=null;
|
||||
try {
|
||||
address=InetAddress.getLocalHost().toString();
|
||||
} catch (UnknownHostException e) {
|
||||
|
||||
}
|
||||
logger.info("caller ip: "+address);
|
||||
if (address!=null) sur.setResourceSpecificProperty("callerIP", address);;
|
||||
}
|
||||
|
||||
private void setStartTime() {
|
||||
//set start time
|
||||
Calendar startTime = new GregorianCalendar();
|
||||
Date time=startTime.getTime();
|
||||
logger.info("set start time: "+time);
|
||||
try {
|
||||
sur.setStartTime(time);
|
||||
}
|
||||
catch (InvalidValueException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
private void setEndTime() {
|
||||
// set end time
|
||||
Calendar endTime = new GregorianCalendar();
|
||||
Date time=endTime.getTime();
|
||||
SimpleDateFormat sdf=new SimpleDateFormat();
|
||||
sdf.applyPattern("dd MM yyyy 'at' hh:mm:ss z");
|
||||
sdf.format(time);
|
||||
logger.info("set end time: "+time);
|
||||
try {
|
||||
sur.setEndTime(time);
|
||||
}
|
||||
catch (InvalidValueException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
package org.gcube.contentmanager.storageserver.parse;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.bson.types.ObjectId;
|
||||
import org.gcube.accounting.datamodel.RawUsageRecord;
|
||||
import org.gcube.contentmanager.storageserver.accounting.Report;
|
||||
import org.gcube.contentmanager.storageserver.accounting.ReportConfig;
|
||||
import org.gcube.contentmanager.storageserver.accounting.ReportException;
|
||||
import org.gcube.contentmanager.storageserver.accounting.ReportFactory;
|
||||
import org.gcube.contentmanager.storageserver.data.CubbyHole;
|
||||
import org.gcube.contentmanager.storageserver.parse.utils.ValidationUtils;
|
||||
import org.gcube.contentmanager.storageserver.store.MongoDB;
|
||||
import org.gcube.contentmanager.storageserver.store.StorageStatusRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -31,12 +36,31 @@ public class JsonParser extends Thread{
|
|||
private String lastUser;
|
||||
private int linkCount;
|
||||
private String delete;
|
||||
private String id;
|
||||
// private String id;
|
||||
private String callerIp;
|
||||
private String user;
|
||||
private String password;
|
||||
String[] server;
|
||||
// private String previousInsert;
|
||||
public JsonParser(CubbyHole c, int number){
|
||||
|
||||
public JsonParser(String[] srvs, CubbyHole c, int number){
|
||||
this.c=c;
|
||||
this.number=number;
|
||||
this.server=srvs;
|
||||
// init the accounting report
|
||||
try {
|
||||
init();
|
||||
} catch (ReportException e) {
|
||||
throw new RuntimeException("Accounting report Exception initialization");
|
||||
}
|
||||
}
|
||||
|
||||
public JsonParser(String[] srvs, String user, String password, CubbyHole c, int number){
|
||||
this.c=c;
|
||||
this.number=number;
|
||||
this.server=srvs;
|
||||
this.user=user;
|
||||
this.password=password;
|
||||
// init the accounting report
|
||||
try {
|
||||
init();
|
||||
|
@ -60,9 +84,10 @@ public class JsonParser extends Thread{
|
|||
retrieveObjectFields(obj);
|
||||
long length=-1;
|
||||
if(obj.get("length")!=null) length=(long)obj.get("length");
|
||||
logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " id: "+id+" lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
|
||||
logger.info("[recordCheck] operation: "+op+" name: "+name+" type: "+type+" path: "+filename+" length: "+length+" owner: "+owner+"\n\t cretionTime: "+creationTime+ " lastOperation "+lastOperation+" lastUser: "+lastUser+" lastAccess: "+lastAccess);
|
||||
if(((length >0) && (((filename!=null) && (filename.contains("/"))) || (linkCount > 0)))){
|
||||
|
||||
//convert from byte to kb
|
||||
length=length/1024;
|
||||
String scope=null;
|
||||
if((filename!=null)&& (filename.contains("/")))
|
||||
scope=retrieveScopeFromRemoteFilePath(filename);
|
||||
|
@ -81,10 +106,12 @@ public class JsonParser extends Thread{
|
|||
continue;
|
||||
}
|
||||
// operation=mappingOperationField(op, id, delete, lastAccess);
|
||||
MongoDB mongo=new MongoDB(server, user, password);
|
||||
StorageStatusRecord ssr=mongo.update(lastUser, length, 1, lastOperation);
|
||||
//call to the accounting library
|
||||
report( filename, owner, creationTime, id, length, scope, lastOperation, callerIp, lastAccess, lastUser);
|
||||
report( filename, owner, creationTime, length, scope, lastOperation, callerIp, lastAccess, lastUser, ssr.getVolume()+"", ssr.getCount()+"");
|
||||
logger.info(" operation accounted "+lastOperation);
|
||||
logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t id: "+id+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
|
||||
logger.info("\n[accountingCall] operation: "+lastOperation+"\n\t name: "+name+"\n\t type: "+type+"\n\t path: "+filename+"\n\t length: "+length+"\n\t owner: "+owner+"\n\t cretionTime: "+creationTime+"\n\t scope: "+scope+"\n\t lastOperation "+lastOperation+"\n\t lastUser: "+lastUser+"\n\t lastAccess: "+lastAccess+"\n\t callerIp: "+callerIp);
|
||||
}else{
|
||||
logger.info("operation is not accounted: invalid scope: "+scope);
|
||||
}
|
||||
|
@ -112,18 +139,22 @@ public class JsonParser extends Thread{
|
|||
if(obj.get("linkCount") != null) linkCount=(int)obj.get("linkCount");
|
||||
delete = null;
|
||||
if(obj.get("onDeleting") != null) delete=(String)obj.get("onDeleting");
|
||||
ObjectId objectId=(ObjectId)obj.get("_id");
|
||||
id = objectId.toString();
|
||||
// ObjectId objectId=(ObjectId)obj.get("_id");
|
||||
// id = objectId.toString();
|
||||
}
|
||||
|
||||
private void report(String filename, String owner,
|
||||
String creationTime, String id, long length,
|
||||
String scope, String operation, String callerIP, String lastAccess, String lastUser) {
|
||||
//convert from byte to kb
|
||||
length=length/1024;
|
||||
report.start(lastUser, scope, creationTime);
|
||||
report.ultimate(owner, operation, length+"", filename, id, callerIP, lastAccess);
|
||||
report.send();
|
||||
String creationTime, long length,
|
||||
String scope, String operation, String callerIP, String lastAccess, String lastUser, String totVolume, String totCount) {
|
||||
// ACCOUNTING CALL TYPE: STORAGE USAGE
|
||||
RawUsageRecord sur=report.setGenericProperties("storage-usage",lastUser, scope, creationTime, lastAccess, owner);
|
||||
sur=report.setSpecificProperties(sur, operation, length+"", filename, callerIP, "STORAGE", "1");
|
||||
report.send(sur);
|
||||
// ACCOUNTING CALL TYPE: STORAGE STATUS
|
||||
RawUsageRecord ssr=report.setGenericProperties("storage-status",lastUser, scope, creationTime, lastAccess, owner);
|
||||
sur=report.setSpecificProperties(ssr, operation, totVolume, filename, callerIP, "STORAGE", totCount);
|
||||
report.send(ssr);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -19,30 +19,37 @@ public class Startup {
|
|||
|
||||
|
||||
public static void main(String[] args) {
|
||||
if(args.length != 1 && args.length != 3){
|
||||
if(args.length != 2 && args.length != 4){
|
||||
System.out.println("Usage:");
|
||||
System.out.println("\tjava Startup scope user password\n\n");
|
||||
System.out.println("\tjava Startup scope ip user password\n\n");
|
||||
System.out.println("Example:");
|
||||
System.out.println("\tjava Startup /gcube/devsec pippo pluT0\n");
|
||||
System.out.println("\tjava Startup /gcube/devsec localhost pippo pluT0\n");
|
||||
System.out.println("or ");
|
||||
System.out.println("Usage:");
|
||||
System.out.println("\tjava Startup scope \n\n");
|
||||
System.out.println("\tjava Startup scope ip\n\n");
|
||||
System.out.println("Example:");
|
||||
System.out.println("\tjava Startup /gcube \n\n");
|
||||
System.out.println("\tjava Startup /gcube localhost\n\n");
|
||||
|
||||
return;
|
||||
}
|
||||
scope=args[0];
|
||||
user=args[1];
|
||||
password=args[2];
|
||||
String oplogServer= args[1];
|
||||
if(args.length == 4){
|
||||
user=args[2];
|
||||
password=args[3];
|
||||
}
|
||||
String[] server=retrieveConfiguration();
|
||||
CubbyHole c = new CubbyHole();
|
||||
ReadingMongoOplog producer=null;
|
||||
if(args.length == 3)
|
||||
producer=new ReadingMongoOplog( Arrays.asList(server), args[1], args[2], c, 1 );
|
||||
producer=new ReadingMongoOplog( Arrays.asList(oplogServer), args[1], args[2], c, 1 );
|
||||
else
|
||||
producer=new ReadingMongoOplog( Arrays.asList(server), c, 1 );
|
||||
JsonParser consumer=new JsonParser(c, 1);
|
||||
JsonParser consumer=null;
|
||||
if(args.length == 3)
|
||||
consumer=new JsonParser(server, c, 1);
|
||||
else
|
||||
consumer=new JsonParser(server, args[1], args[2], c, 1);
|
||||
producer.start();
|
||||
consumer.start();
|
||||
}
|
||||
|
|
|
@ -110,23 +110,45 @@ public class MongoDB {
|
|||
ssCollection.insert(doc);
|
||||
}
|
||||
|
||||
public void update(String consumer, long volume, int count){
|
||||
public StorageStatusRecord update(String consumer, long volume, int count, String operation){
|
||||
StorageStatusRecord ssr=get(consumer);
|
||||
if(ssr != null){
|
||||
int currentCount=ssr.getCount();
|
||||
currentCount=currentCount+count;
|
||||
ssr.setCount(currentCount);
|
||||
count = setCount(count, currentCount, operation);
|
||||
ssr.setCount(count);
|
||||
long currentVolume=ssr.getVolume();
|
||||
currentVolume=currentVolume+volume;
|
||||
volume = setVolume(volume, currentVolume, operation);
|
||||
final BasicDBObject query = new BasicDBObject("consumer", consumer);
|
||||
// Creating BasicDBObjectBuilder object without arguments
|
||||
DBObject documentBuilder = BasicDBObjectBuilder.start()
|
||||
.add("volume", currentVolume).add("count", currentCount).get();
|
||||
.add("volume", volume).add("count", count).get();
|
||||
// get the dbobject from builder and Inserting document
|
||||
ssCollection.update(query,new BasicDBObject("$set", documentBuilder), true, false);
|
||||
}else{
|
||||
put(consumer, volume, count);
|
||||
}
|
||||
return new StorageStatusRecord(consumer, volume, count);
|
||||
}
|
||||
|
||||
private long setVolume(long volume, long currentVolume, String operation) {
|
||||
logger.info("accounting: operation "+operation+" total Volume "+currentVolume+" current volume "+volume);
|
||||
if(operation.equalsIgnoreCase("UPLOAD") || operation.equalsIgnoreCase("COPY")){
|
||||
currentVolume=currentVolume+volume;
|
||||
}else if(operation.equalsIgnoreCase("DELETE")){
|
||||
currentVolume=currentVolume-volume;
|
||||
}
|
||||
logger.info("new current volume "+currentVolume);
|
||||
return currentVolume;
|
||||
}
|
||||
|
||||
private int setCount(int count, int currentCount, String operation) {
|
||||
logger.info("accounting: operation "+operation+" total count "+currentCount+" current count"+count);
|
||||
if(operation.equalsIgnoreCase("UPLOAD"))
|
||||
currentCount=currentCount+count;
|
||||
else if(operation.equalsIgnoreCase("DELETE"))
|
||||
currentCount=currentCount-count;
|
||||
logger.info("new count calculated: "+currentCount);
|
||||
return currentCount;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -14,6 +14,12 @@ public class StorageStatusRecord {
|
|||
|
||||
private DBObject dbo;
|
||||
|
||||
public StorageStatusRecord(String consumer, long volume, int count){
|
||||
this.consumer=consumer;
|
||||
this.volume=volume;
|
||||
this.count=count;
|
||||
}
|
||||
|
||||
public StorageStatusRecord(String id, String consumer, long volume, int count, DBObject obj){
|
||||
this.id=id;
|
||||
this.consumer=consumer;
|
||||
|
|
|
@ -18,13 +18,8 @@ public class MongoDBTest {
|
|||
|
||||
@Test
|
||||
public void putAndRetrieve(){
|
||||
mongo.update("test.consumer", 100, -1);
|
||||
mongo.update("test.consumer", 100, -1, "UPLOAD");
|
||||
}
|
||||
|
||||
|
||||
// @Test
|
||||
public void test() {
|
||||
fail("Not yet implemented");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue