storage-manager-trigger/src/main/java/org/gcube/contentmanager/storageserver/data/ReadingMongoOplog.java

103 lines
2.9 KiB
Java

package org.gcube.contentmanager.storageserver.data;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import javax.management.RuntimeErrorException;
import com.mongodb.BasicDBObject;
import com.mongodb.Bytes;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import org.bson.types.BSONTimestamp;
import org.gcube.contentmanager.storageserver.parse.JsonParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReadingMongoOplog extends Thread{
final static Logger logger=LoggerFactory.getLogger(ReadingMongoOplog.class);
private ServerAddress[] server;
private MongoClient mongoClient;
private DB local;
private DBCollection oplog;
private CubbyHole c;
private int number;
public ReadingMongoOplog(List<String> srvs, CubbyHole c, int number){
this.c=c;
this.number=number;
try {
if(srvs.size() > 0){
server=new ServerAddress[srvs.size()];
int i=0;
for(String s : srvs){
server[i]=new ServerAddress(s);
i++;
}
}else{
logger.error("MongoDB server not Setted. Please set one or more servers");
throw new RuntimeException("MongoDB server not Setted. Please set one or more servers");
}
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
init();
}
private void init() {
mongoClient = new MongoClient(Arrays.asList(server));//"146.48.123.71"
local = mongoClient.getDB("local");
oplog = local.getCollection("oplog.rs");
}
public void run() {
DBCursor lastCursor = oplog.find().sort(new BasicDBObject("$natural", -1)).limit(1);
if (!lastCursor.hasNext()) {
logger.error("no oplog!");
return;
}
DBObject last = lastCursor.next();
BSONTimestamp ts = (BSONTimestamp) last.get("ts");
while (true) {
logger.debug("starting at ts: " + ts);
DBCursor cursor = oplog.find(new BasicDBObject("ts", new BasicDBObject("$gt", ts)));
cursor.addOption(Bytes.QUERYOPTION_TAILABLE);
cursor.addOption(Bytes.QUERYOPTION_AWAITDATA);
while (cursor.hasNext()) {
DBObject x = cursor.next();
ts = (BSONTimestamp) x.get("ts");
if(x.get("o2")!=null){
if(x.containsField("o")){
// parser.jsonRecordParser(x);
c.put(x);
logger.info("Producer #" + this.number + " put: " + x);
}else{
logger.info("operation is not accounted");
}
}else{
logger.debug("record discarded: \t"+x);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}