social-networking-library/src/main/java/org/gcube/portal/databook/server/CassandraClusterConnection....

503 lines
17 KiB
Java

package org.gcube.portal.databook.server;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Massimiliano Assante ISTI-CNR
* @author Ahmed Salah Tawfik Ibrahim ISTI-CNR
*
*/
public class CassandraClusterConnection {
/**
* logger
*/
private static final Logger _log = LoggerFactory.getLogger(CassandraClusterConnection.class);
/**
* keyspace location
*/
private static List<InetSocketAddress> hosts;
private static String datacenterName;
private static String keyspaceName;
private CqlSession myKeyspaceSession;
/**
*
* @param dropSchema set true if you want do drop the current and set up new one
* the connection to cassandra cluster
*/
protected CassandraClusterConnection(boolean dropSchema) {
if (hosts == null || datacenterName == null || keyspaceName == null) {
RunningCluster cluster = RunningCluster.getInstance(null);
//host = cluster.getHost();
hosts = cluster.getHosts();
datacenterName = cluster.getDatacenterName();
keyspaceName = cluster.getKeyspaceName();
}
_log.info(keyspaceName + " KeySpace SetUp ...");
SetUpKeySpaces(dropSchema);
myKeyspaceSession = connect(keyspaceName);
_log.info("CONNECTED! using KeySpace: " + keyspaceName);
}
/**
*
* @param dropSchema set true if you want to drop the current and set up new one
* the connection to cassandra cluster
*/
protected CassandraClusterConnection(boolean dropSchema, String infrastructureName) {
if (hosts == null || datacenterName == null || keyspaceName == null) {
RunningCluster cluster = RunningCluster.getInstance(infrastructureName);
//host = cluster.getHost();
hosts = cluster.getHosts();
datacenterName = cluster.getDatacenterName();
keyspaceName = cluster.getKeyspaceName();
}
_log.info(keyspaceName + " KeySpace SetUp ...");
SetUpKeySpaces(dropSchema);
myKeyspaceSession = connect(keyspaceName);
_log.info("CONNECTED! using KeySpace: " + keyspaceName);
}
public CqlSession getKeyspaceSession(){
if (myKeyspaceSession.isClosed()){
myKeyspaceSession = connect(keyspaceName);
}
return myKeyspaceSession;
}
/**
* @param dropSchema set true if you want to drop the current and set up new one
* the connection to cassandra cluster
*/
public void SetUpKeySpaces(boolean dropSchema) {
boolean createNew = false;
boolean found = false;
CqlSession session = connect();
Metadata metaData = session.getMetadata();
for (KeyspaceMetadata meta : metaData.getKeyspaces().values()) {
if (meta.getName().toString().equals(keyspaceName)){
found = true;
break;
}
}
try {
if (dropSchema && found) {
_log.info("Dropping Keyspace: " + keyspaceName + " ...");
try {
ResultSet returned = dropKeyspace();
Thread.sleep(2000);
if (returned.wasApplied())
_log.info("Dropped " + keyspaceName);
else
_log.info("Couldn't drop " + keyspaceName);
} catch (Exception e) {
_log.error("Dropping Keyspace operation Failed ... " + keyspaceName + " does NOT exists");
return;
}
createNew = true;
}
if (!found || createNew) {
_log.info("Keyspace does not exist, triggering schema creation ... ");
int replicationFactor = 2;
createKeyspace(keyspaceName, replicationFactor);
closeSession(session);
createTables();
_log.info("Using Keyspace " + keyspaceName);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/*
*
********************** CASSANDRA KEYSPACE CREATION ***********************
*
*/
private static CqlSession connect() {
CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withLocalDatacenter(datacenterName)
.build();
_log.info("[OK] Connected to Cassandra Cluster");
return cqlSession;
}
private static CqlSession connect(String KEYSPACE_NAME) {
CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withKeyspace(KEYSPACE_NAME)
.withLocalDatacenter("1")
.build();
_log.info("[OK] Connected to Keyspace {} ", KEYSPACE_NAME);
return cqlSession;
}
public static void closeSession(CqlSession session) {
if (session != null) session.close();
_log.info("[OK]Session is now closed");
}
public void closeConnection(){
if(!myKeyspaceSession.isClosed()){
try{
_log.info("Closing connection");
closeSession(myKeyspaceSession);
_log.info("Connection closed!");
}catch(Exception e){
_log.error("Unable to close connection", e);
}
}
}
private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder){
return cqlSessionBuilder
.withConfigLoader(DriverConfigLoader.programmaticBuilder()
// Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out after PT2S
.withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(240000))
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(240000))
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(240000))
.build());
}
private static void createKeyspace(String keyspaceName, int replicationFactor) {
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withLocalDatacenter("1")
.build()) {
cqlSession.execute(SchemaBuilder.createKeyspace(keyspaceName)
.ifNotExists()
.withSimpleStrategy(replicationFactor)
.withDurableWrites(true)
.build());
_log.info("+ Keyspace '{}' created.", keyspaceName);
closeSession(cqlSession);
}
}
private static ResultSet dropKeyspace(){
ResultSet toreturn;
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withLocalDatacenter("1")
.build()) {
toreturn = cqlSession.execute(SchemaBuilder.dropKeyspace(keyspaceName).ifExists().build());
_log.info("Keyspace {} dropped.", keyspaceName);
closeSession(cqlSession);
}
return toreturn;
}
private void createTables(){
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withLocalDatacenter("1")
.withKeyspace(keyspaceName)
.build()) {
createTableUSERNotificationsPreferences(cqlSession);
createTableUSERNotifications(cqlSession);
createTableVRETimeline(cqlSession);
createTableAppTimeline(cqlSession);
createTableUSERTimeline(cqlSession);
createTableHashtaggedPosts(cqlSession);
createTableHashtaggedComments(cqlSession);
createTableHashtagsCounter(cqlSession);
createTableUSERNotificationsUnread(cqlSession);
createTableUSERLikes(cqlSession);
createTableVREInvites(cqlSession);
createTableEMAILInvites(cqlSession);
createTableAttachments(cqlSession);
createTableInvites(cqlSession);
createTableLikes(cqlSession);
createTableComments(cqlSession);
createTableNotifications(cqlSession);
createTablePosts(cqlSession);
closeSession(cqlSession);
}
}
private void createTableUSERNotificationsPreferences(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserNotificationsPreferences")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("type", DataTypes.TEXT)
.withColumn("preference", DataTypes.TEXT)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsPreferences");
}
private void createTableUSERNotifications(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserNotifications")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withColumn("notid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERNotifications");
}
private void createTableVRETimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("VRETimeline")
.ifNotExists()
.withPartitionKey("vreid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "VRETimeline");
}
private void createTableAppTimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("AppTimeline")
.ifNotExists()
.withPartitionKey("appid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "AppTimeline");
}
private void createTableUSERTimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserTimeline")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERTimeline");
}
private void createTableHashtaggedPosts(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtaggedPosts")
.ifNotExists()
.withPartitionKey("hashtag", DataTypes.TEXT)
.withPartitionKey("postid", DataTypes.UUID)
.withColumn("vreid", DataTypes.TEXT)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedPosts");
}
private void createTableHashtaggedComments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtaggedComments")
.ifNotExists()
.withPartitionKey("hashtag", DataTypes.TEXT)
.withPartitionKey("commentid", DataTypes.UUID)
.withColumn("vreid", DataTypes.TEXT)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedComments");
}
private void createTableHashtagsCounter(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtagsCounter")
.ifNotExists()
.withPartitionKey("vreid", DataTypes.TEXT)
.withPartitionKey("hashtag", DataTypes.TEXT)
.withColumn("count", DataTypes.BIGINT)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "HashtagsCounter");
}
private void createTableUSERNotificationsUnread(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserUnreadNotifications")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withColumn("notid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsUnread");
}
private void createTableUSERLikes(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserLikes")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("likeid", DataTypes.UUID)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERLikes");
}
private void createTableVREInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("VREInvites")
.ifNotExists()
.withPartitionKey("vreid", DataTypes.TEXT)
.withPartitionKey("inviteid", DataTypes.UUID)
.withColumn("status", DataTypes.TEXT)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "VREInvites");
}
private void createTableEMAILInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("EmailInvites")
.ifNotExists()
.withPartitionKey("email", DataTypes.TEXT)
.withPartitionKey("vreid", DataTypes.TEXT)
.withColumn("inviteid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "EMAILInvites");
}
private void createTableAttachments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Attachments")
.ifNotExists()
.withPartitionKey("attachid", DataTypes.UUID)
.withColumn("postid", DataTypes.UUID)
.withColumn("uri", DataTypes.TEXT)
.withColumn("name", DataTypes.TEXT)
.withColumn("description", DataTypes.TEXT)
.withColumn("urithumbnail", DataTypes.TEXT)
.withColumn("mimetype", DataTypes.TEXT)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("post_attach")
.ifNotExists()
.onTable("Attachments")
.andColumn("postid")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Attachments");
}
private void createTableInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Invites")
.ifNotExists()
.withPartitionKey("inviteid", DataTypes.UUID)
.withColumn("senderuserid", DataTypes.TEXT)
.withColumn("vreid", DataTypes.TEXT)
.withColumn("email", DataTypes.TEXT)
.withColumn("controlcode", DataTypes.TEXT)
.withColumn("status", DataTypes.TEXT)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn("senderfullname", DataTypes.TEXT)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("sender")
.ifNotExists()
.onTable("Invites")
.andColumn("senderuserid")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Invites");
}
private void createTableLikes(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Likes")
.ifNotExists()
.withPartitionKey("likeid", DataTypes.UUID)
.withColumn("userid", DataTypes.TEXT)
.withColumn("fullname", DataTypes.TEXT)
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("postid", DataTypes.UUID)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("post_likes")
.ifNotExists()
.onTable("Likes")
.andColumn("postid")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Likes");
}
private void createTableComments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Comments")
.ifNotExists()
.withPartitionKey("commentid", DataTypes.UUID)
.withColumn("userid", DataTypes.TEXT)
.withColumn("fullname", DataTypes.TEXT)
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("comment", DataTypes.TEXT)
.withColumn("postid", DataTypes.UUID)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn("isedit", DataTypes.BOOLEAN)
.withColumn("lastedittime", DataTypes.TIMESTAMP)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("post_comments")
.ifNotExists()
.onTable("Comments")
.andColumn("postid")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Comments");
}
private void createTableNotifications(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Notifications")
.ifNotExists()
.withPartitionKey("notid", DataTypes.UUID)
.withColumn("type", DataTypes.TEXT)
.withColumn("userid", DataTypes.TEXT)
.withColumn("subjectid", DataTypes.TEXT)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn("description", DataTypes.TEXT)
.withColumn("uri", DataTypes.TEXT)
.withColumn("senderid", DataTypes.TEXT)
.withColumn("senderfullname", DataTypes.TEXT)
.withColumn("senderthumbnailurl", DataTypes.TEXT)
.withColumn("isread", DataTypes.BOOLEAN)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("not_type")
.ifNotExists()
.onTable("Notifications")
.andColumn("type")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Notifications");
}
private void createTablePosts(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Posts")
.ifNotExists()
.withPartitionKey("postid", DataTypes.UUID)
.withColumn("linkhost", DataTypes.TEXT)
.withColumn("description", DataTypes.TEXT)
.withColumn("email", DataTypes.TEXT)
.withColumn("likesno", DataTypes.BIGINT)
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("linkdescription", DataTypes.TEXT)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn("uri", DataTypes.TEXT)
.withColumn("isapplicationpost", DataTypes.BOOLEAN)
.withColumn("entityid", DataTypes.TEXT)
.withColumn("privacy", DataTypes.TEXT)
.withColumn("type", DataTypes.TEXT)
.withColumn("urithumbnail", DataTypes.TEXT)
.withColumn("vreid", DataTypes.TEXT)
.withColumn("multifileupload", DataTypes.BOOLEAN)
.withColumn("fullname", DataTypes.TEXT)
.withColumn("commentsno", DataTypes.BIGINT)
.withColumn("linktitle", DataTypes.TEXT)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("posts_privacy")
.ifNotExists()
.onTable("Posts")
.andColumn("privacy")
.build());
_log.info("+ Table '{}' has been created (if needed).", "Posts");
}
}