504 lines
17 KiB
Java
504 lines
17 KiB
Java
package org.gcube.portal.databook.server;
|
|
|
|
import java.net.InetSocketAddress;
|
|
import java.time.Duration;
|
|
import java.util.List;
|
|
|
|
|
|
import com.datastax.oss.driver.api.core.CqlSession;
|
|
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
|
|
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
|
|
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
|
|
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
|
import com.datastax.oss.driver.api.core.metadata.Metadata;
|
|
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
|
|
import com.datastax.oss.driver.api.core.type.DataTypes;
|
|
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
/**
|
|
* @author Massimiliano Assante ISTI-CNR
|
|
* @author Ahmed Salah Tawfik Ibrahim ISTI-CNR
|
|
*
|
|
*/
|
|
public class CassandraClusterConnection {
|
|
/**
|
|
* logger
|
|
*/
|
|
private static final Logger _log = LoggerFactory.getLogger(CassandraClusterConnection.class);
|
|
|
|
/**
|
|
* keyspace location
|
|
*/
|
|
private static List<InetSocketAddress> hosts;
|
|
private static String datacenterName;
|
|
private static String keyspaceName;
|
|
private CqlSession myKeyspaceSession;
|
|
|
|
/**
|
|
*
|
|
* @param dropSchema set true if you want do drop the current and set up new one
|
|
* the connection to cassandra cluster
|
|
*/
|
|
protected CassandraClusterConnection(boolean dropSchema) {
|
|
if (hosts == null || datacenterName == null || keyspaceName == null) {
|
|
RunningCluster cluster = RunningCluster.getInstance(null);
|
|
|
|
//host = cluster.getHost();
|
|
hosts = cluster.getHosts();
|
|
datacenterName = cluster.getDatacenterName();
|
|
keyspaceName = cluster.getKeyspaceName();
|
|
}
|
|
|
|
_log.info(keyspaceName + " KeySpace SetUp ...");
|
|
SetUpKeySpaces(dropSchema);
|
|
myKeyspaceSession = connect(keyspaceName);
|
|
_log.info("CONNECTED! using KeySpace: " + keyspaceName);
|
|
}
|
|
|
|
/**
|
|
*
|
|
* @param dropSchema set true if you want to drop the current and set up new one
|
|
* the connection to cassandra cluster
|
|
*/
|
|
protected CassandraClusterConnection(boolean dropSchema, String infrastructureName) {
|
|
if (hosts == null || datacenterName == null || keyspaceName == null) {
|
|
RunningCluster cluster = RunningCluster.getInstance(infrastructureName);
|
|
//host = cluster.getHost();
|
|
hosts = cluster.getHosts();
|
|
datacenterName = cluster.getDatacenterName();
|
|
keyspaceName = cluster.getKeyspaceName();
|
|
}
|
|
_log.info(keyspaceName + " KeySpace SetUp ...");
|
|
SetUpKeySpaces(dropSchema);
|
|
myKeyspaceSession = connect(keyspaceName);
|
|
_log.info("CONNECTED! using KeySpace: " + keyspaceName);
|
|
}
|
|
|
|
public CqlSession getKeyspaceSession(){
|
|
if (myKeyspaceSession.isClosed()){
|
|
myKeyspaceSession = connect(keyspaceName);
|
|
}
|
|
return myKeyspaceSession;
|
|
}
|
|
|
|
/**
|
|
* @param dropSchema set true if you want to drop the current and set up new one
|
|
* the connection to cassandra cluster
|
|
*/
|
|
public void SetUpKeySpaces(boolean dropSchema) {
|
|
boolean createNew = false;
|
|
boolean found = false;
|
|
CqlSession session = connect();
|
|
Metadata metaData = session.getMetadata();
|
|
for (KeyspaceMetadata meta : metaData.getKeyspaces().values()) {
|
|
if (meta.getName().toString().equals(keyspaceName)){
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
try {
|
|
if (dropSchema && found) {
|
|
_log.info("Dropping Keyspace: " + keyspaceName + " ...");
|
|
try {
|
|
ResultSet returned = dropKeyspace();
|
|
Thread.sleep(2000);
|
|
if (returned.wasApplied())
|
|
_log.info("Dropped " + keyspaceName);
|
|
else
|
|
_log.info("Couldn't drop " + keyspaceName);
|
|
} catch (Exception e) {
|
|
_log.error("Dropping Keyspace operation Failed ... " + keyspaceName + " does NOT exists");
|
|
return;
|
|
}
|
|
createNew = true;
|
|
}
|
|
|
|
if (!found || createNew) {
|
|
_log.info("Keyspace does not exist, triggering schema creation ... ");
|
|
int replicationFactor = 2;
|
|
createKeyspace(keyspaceName, replicationFactor);
|
|
closeSession(session);
|
|
createTables();
|
|
_log.info("Using Keyspace " + keyspaceName);
|
|
}
|
|
} catch (Exception e) {
|
|
e.printStackTrace();
|
|
}
|
|
}
|
|
|
|
/*
|
|
*
|
|
********************** CASSANDRA KEYSPACE CREATION ***********************
|
|
*
|
|
*/
|
|
private static CqlSession connect() {
|
|
CqlSession cqlSession = configBuilder(CqlSession.builder())
|
|
.addContactPoints(hosts)
|
|
.withLocalDatacenter(datacenterName)
|
|
.build();
|
|
_log.info("[OK] Connected to Cassandra Cluster");
|
|
return cqlSession;
|
|
}
|
|
private static CqlSession connect(String KEYSPACE_NAME) {
|
|
CqlSession cqlSession = configBuilder(CqlSession.builder())
|
|
.addContactPoints(hosts)
|
|
.withKeyspace(KEYSPACE_NAME)
|
|
.withLocalDatacenter("1")
|
|
.build();
|
|
_log.info("[OK] Connected to Keyspace {} ", KEYSPACE_NAME);
|
|
return cqlSession;
|
|
}
|
|
|
|
public static void closeSession(CqlSession session) {
|
|
if (session != null) session.close();
|
|
_log.info("[OK]Session is now closed");
|
|
}
|
|
|
|
public void closeConnection(){
|
|
if(!myKeyspaceSession.isClosed()){
|
|
try{
|
|
_log.info("Closing connection");
|
|
closeSession(myKeyspaceSession);
|
|
_log.info("Connection closed!");
|
|
}catch(Exception e){
|
|
_log.error("Unable to close connection", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder){
|
|
return cqlSessionBuilder
|
|
.withConfigLoader(DriverConfigLoader.programmaticBuilder()
|
|
// Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out after PT2S
|
|
.withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(240000))
|
|
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(240000))
|
|
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(240000))
|
|
.build());
|
|
}
|
|
private static void createKeyspace(String keyspaceName, int replicationFactor) {
|
|
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
|
|
.addContactPoints(hosts)
|
|
.withLocalDatacenter("1")
|
|
.build()) {
|
|
cqlSession.execute(SchemaBuilder.createKeyspace(keyspaceName)
|
|
.ifNotExists()
|
|
.withSimpleStrategy(replicationFactor)
|
|
.withDurableWrites(true)
|
|
.build());
|
|
_log.info("+ Keyspace '{}' created.", keyspaceName);
|
|
closeSession(cqlSession);
|
|
}
|
|
}
|
|
|
|
private static ResultSet dropKeyspace(){
|
|
ResultSet toreturn;
|
|
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
|
|
.addContactPoints(hosts)
|
|
.withLocalDatacenter("1")
|
|
.build()) {
|
|
toreturn = cqlSession.execute(SchemaBuilder.dropKeyspace(keyspaceName).ifExists().build());
|
|
_log.info("Keyspace {} dropped.", keyspaceName);
|
|
closeSession(cqlSession);
|
|
}
|
|
return toreturn;
|
|
}
|
|
private void createTables(){
|
|
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
|
|
.addContactPoints(hosts)
|
|
.withLocalDatacenter("1")
|
|
.withKeyspace(keyspaceName)
|
|
.build()) {
|
|
|
|
createTableUSERNotificationsPreferences(cqlSession);
|
|
createTableUSERNotifications(cqlSession);
|
|
createTableVRETimeline(cqlSession);
|
|
createTableAppTimeline(cqlSession);
|
|
createTableUSERTimeline(cqlSession);
|
|
createTableHashtaggedPosts(cqlSession);
|
|
createTableHashtaggedComments(cqlSession);
|
|
createTableHashtagsCounter(cqlSession);
|
|
createTableUSERNotificationsUnread(cqlSession);
|
|
createTableUSERLikes(cqlSession);
|
|
createTableVREInvites(cqlSession);
|
|
createTableEMAILInvites(cqlSession);
|
|
createTableAttachments(cqlSession);
|
|
createTableInvites(cqlSession);
|
|
createTableLikes(cqlSession);
|
|
createTableComments(cqlSession);
|
|
createTableNotifications(cqlSession);
|
|
createTablePosts(cqlSession);
|
|
|
|
closeSession(cqlSession);
|
|
}
|
|
}
|
|
private void createTableUSERNotificationsPreferences(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("UserNotificationsPreferences")
|
|
.ifNotExists()
|
|
.withPartitionKey("userid", DataTypes.TEXT)
|
|
.withPartitionKey("type", DataTypes.TEXT)
|
|
.withColumn("preference", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsPreferences");
|
|
}
|
|
private void createTableUSERNotifications(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("UserNotifications")
|
|
.ifNotExists()
|
|
.withPartitionKey("userid", DataTypes.TEXT)
|
|
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("notid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "USERNotifications");
|
|
}
|
|
private void createTableVRETimeline(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("VRETimeline")
|
|
.ifNotExists()
|
|
.withPartitionKey("vreid", DataTypes.TEXT)
|
|
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "VRETimeline");
|
|
}
|
|
private void createTableAppTimeline(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("AppTimeline")
|
|
.ifNotExists()
|
|
.withPartitionKey("appid", DataTypes.TEXT)
|
|
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "AppTimeline");
|
|
}
|
|
private void createTableUSERTimeline(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("UserTimeline")
|
|
.ifNotExists()
|
|
.withPartitionKey("userid", DataTypes.TEXT)
|
|
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
_log.info("+ Table '{}' has been created (if needed).", "USERTimeline");
|
|
}
|
|
private void createTableHashtaggedPosts(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("HashtaggedPosts")
|
|
.ifNotExists()
|
|
.withPartitionKey("hashtag", DataTypes.TEXT)
|
|
.withPartitionKey("postid", DataTypes.UUID)
|
|
.withColumn("vreid", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedPosts");
|
|
}
|
|
private void createTableHashtaggedComments(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("HashtaggedComments")
|
|
.ifNotExists()
|
|
.withPartitionKey("hashtag", DataTypes.TEXT)
|
|
.withPartitionKey("commentid", DataTypes.UUID)
|
|
.withColumn("vreid", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedComments");
|
|
}
|
|
private void createTableHashtagsCounter(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("HashtagsCounter")
|
|
.ifNotExists()
|
|
.withPartitionKey("vreid", DataTypes.TEXT)
|
|
.withPartitionKey("hashtag", DataTypes.TEXT)
|
|
.withColumn("count", DataTypes.BIGINT)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "HashtagsCounter");
|
|
}
|
|
private void createTableUSERNotificationsUnread(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("UserUnreadNotifications")
|
|
.ifNotExists()
|
|
.withPartitionKey("userid", DataTypes.TEXT)
|
|
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("notid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsUnread");
|
|
}
|
|
private void createTableUSERLikes(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("UserLikes")
|
|
.ifNotExists()
|
|
.withPartitionKey("userid", DataTypes.TEXT)
|
|
.withPartitionKey("likeid", DataTypes.UUID)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "USERLikes");
|
|
}
|
|
private void createTableVREInvites(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("VREInvites")
|
|
.ifNotExists()
|
|
.withPartitionKey("vreid", DataTypes.TEXT)
|
|
.withPartitionKey("inviteid", DataTypes.UUID)
|
|
.withColumn("status", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "VREInvites");
|
|
}
|
|
private void createTableEMAILInvites(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("EmailInvites")
|
|
.ifNotExists()
|
|
.withPartitionKey("email", DataTypes.TEXT)
|
|
.withPartitionKey("vreid", DataTypes.TEXT)
|
|
.withColumn("inviteid", DataTypes.UUID)
|
|
.withCompactStorage()
|
|
.build());
|
|
_log.info("+ Table '{}' has been created (if needed).", "EMAILInvites");
|
|
}
|
|
private void createTableAttachments(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Attachments")
|
|
.ifNotExists()
|
|
.withPartitionKey("attachid", DataTypes.UUID)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withColumn("uri", DataTypes.TEXT)
|
|
.withColumn("name", DataTypes.TEXT)
|
|
.withColumn("description", DataTypes.TEXT)
|
|
.withColumn("urithumbnail", DataTypes.TEXT)
|
|
.withColumn("mimetype", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("post_attach")
|
|
.ifNotExists()
|
|
.onTable("Attachments")
|
|
.andColumn("postid")
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "Attachments");
|
|
}
|
|
private void createTableInvites(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Invites")
|
|
.ifNotExists()
|
|
.withPartitionKey("inviteid", DataTypes.UUID)
|
|
.withColumn("senderuserid", DataTypes.TEXT)
|
|
.withColumn("vreid", DataTypes.TEXT)
|
|
.withColumn("email", DataTypes.TEXT)
|
|
.withColumn("controlcode", DataTypes.TEXT)
|
|
.withColumn("status", DataTypes.TEXT)
|
|
.withColumn("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("senderfullname", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("sender")
|
|
.ifNotExists()
|
|
.onTable("Invites")
|
|
.andColumn("senderuserid")
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "Invites");
|
|
}
|
|
private void createTableLikes(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Likes")
|
|
.ifNotExists()
|
|
.withPartitionKey("likeid", DataTypes.UUID)
|
|
.withColumn("userid", DataTypes.TEXT)
|
|
.withColumn("fullname", DataTypes.TEXT)
|
|
.withColumn("thumbnailurl", DataTypes.TEXT)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withColumn("timestamp", DataTypes.TIMESTAMP)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("post_likes")
|
|
.ifNotExists()
|
|
.onTable("Likes")
|
|
.andColumn("postid")
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "Likes");
|
|
}
|
|
private void createTableComments(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Comments")
|
|
.ifNotExists()
|
|
.withPartitionKey("commentid", DataTypes.UUID)
|
|
.withColumn("userid", DataTypes.TEXT)
|
|
.withColumn("fullname", DataTypes.TEXT)
|
|
.withColumn("thumbnailurl", DataTypes.TEXT)
|
|
.withColumn("comment", DataTypes.TEXT)
|
|
.withColumn("postid", DataTypes.UUID)
|
|
.withColumn("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("isedit", DataTypes.BOOLEAN)
|
|
.withColumn("lastedittime", DataTypes.TIMESTAMP)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("post_comments")
|
|
.ifNotExists()
|
|
.onTable("Comments")
|
|
.andColumn("postid")
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "Comments");
|
|
}
|
|
private void createTableNotifications(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Notifications")
|
|
.ifNotExists()
|
|
.withPartitionKey("notid", DataTypes.UUID)
|
|
.withColumn("type", DataTypes.TEXT)
|
|
.withColumn("userid", DataTypes.TEXT)
|
|
.withColumn("subjectid", DataTypes.TEXT)
|
|
.withColumn("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("description", DataTypes.TEXT)
|
|
.withColumn("uri", DataTypes.TEXT)
|
|
.withColumn("senderid", DataTypes.TEXT)
|
|
.withColumn("senderfullname", DataTypes.TEXT)
|
|
.withColumn("senderthumbnailurl", DataTypes.TEXT)
|
|
.withColumn("isread", DataTypes.BOOLEAN)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("not_type")
|
|
.ifNotExists()
|
|
.onTable("Notifications")
|
|
.andColumn("type")
|
|
.build());
|
|
_log.info("+ Table '{}' has been created (if needed).", "Notifications");
|
|
}
|
|
private void createTablePosts(CqlSession cqlSession) {
|
|
cqlSession.execute(SchemaBuilder.createTable("Posts")
|
|
.ifNotExists()
|
|
.withPartitionKey("postid", DataTypes.UUID)
|
|
.withColumn("linkhost", DataTypes.TEXT)
|
|
.withColumn("description", DataTypes.TEXT)
|
|
.withColumn("email", DataTypes.TEXT)
|
|
.withColumn("likesno", DataTypes.BIGINT)
|
|
.withColumn("thumbnailurl", DataTypes.TEXT)
|
|
.withColumn("linkdescription", DataTypes.TEXT)
|
|
.withColumn("timestamp", DataTypes.TIMESTAMP)
|
|
.withColumn("uri", DataTypes.TEXT)
|
|
.withColumn("isapplicationpost", DataTypes.BOOLEAN)
|
|
.withColumn("entityid", DataTypes.TEXT)
|
|
.withColumn("privacy", DataTypes.TEXT)
|
|
.withColumn("type", DataTypes.TEXT)
|
|
.withColumn("urithumbnail", DataTypes.TEXT)
|
|
.withColumn("vreid", DataTypes.TEXT)
|
|
.withColumn("multifileupload", DataTypes.BOOLEAN)
|
|
.withColumn("fullname", DataTypes.TEXT)
|
|
.withColumn("commentsno", DataTypes.BIGINT)
|
|
.withColumn("linktitle", DataTypes.TEXT)
|
|
.withCompactStorage()
|
|
.build());
|
|
cqlSession.execute(SchemaBuilder.createIndex("posts_privacy")
|
|
.ifNotExists()
|
|
.onTable("Posts")
|
|
.andColumn("privacy")
|
|
.build());
|
|
|
|
_log.info("+ Table '{}' has been created (if needed).", "Posts");
|
|
}
|
|
|
|
}
|