package org.gcube.portal.databook.server; import java.net.InetSocketAddress; import java.time.Duration; import java.util.List; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Massimiliano Assante ISTI-CNR * @author Ahmed Ibrahim ISTI-CNR * */ public class CassandraClusterConnection { /** * logger */ private static final Logger _log = LoggerFactory.getLogger(CassandraClusterConnection.class); /** * keyspace location */ private static List hosts; private static String datacenterName; private static String keyspaceName; private CqlSession myKeyspaceSession; /** * * @param dropSchema set true if you want do drop the current and set up new one * the connection to cassandra cluster */ protected CassandraClusterConnection(boolean dropSchema) throws Exception { if (hosts == null || datacenterName == null || keyspaceName == null) { RunningCluster cluster = RunningCluster.getInstance(null); //host = cluster.getHost(); hosts = cluster.getHosts(); datacenterName = cluster.getDatacenterName(); keyspaceName = cluster.getKeyspaceName(); } _log.info(keyspaceName + " KeySpace SetUp ..."); SetUpKeySpaces(dropSchema); myKeyspaceSession = connect(keyspaceName); _log.info("CONNECTED! using KeySpace: " + keyspaceName); } /** * * @param dropSchema set true if you want to drop the current and set up new one * the connection to cassandra cluster */ protected CassandraClusterConnection(boolean dropSchema, String infrastructureName) throws Exception { if (hosts == null || datacenterName == null || keyspaceName == null) { RunningCluster cluster = RunningCluster.getInstance(infrastructureName); //host = cluster.getHost(); hosts = cluster.getHosts(); datacenterName = cluster.getDatacenterName(); keyspaceName = cluster.getKeyspaceName(); } _log.info(keyspaceName + " KeySpace SetUp ..."); SetUpKeySpaces(dropSchema); myKeyspaceSession = connect(keyspaceName); _log.info("CONNECTED! using KeySpace: " + keyspaceName); } public CqlSession getKeyspaceSession(){ if (myKeyspaceSession.isClosed()){ myKeyspaceSession = connect(keyspaceName); } return myKeyspaceSession; } /** * @param dropSchema set true if you want to drop the current and set up new one * the connection to cassandra cluster */ public void SetUpKeySpaces(boolean dropSchema) { boolean createNew = false; boolean found = false; CqlSession session = connect(); Metadata metaData = session.getMetadata(); for (KeyspaceMetadata meta : metaData.getKeyspaces().values()) { if (meta.getName().toString().equals(keyspaceName)){ found = true; break; } } try { if (dropSchema && found) { _log.info("Dropping Keyspace: " + keyspaceName + " ..."); try { ResultSet returned = dropKeyspace(); Thread.sleep(2000); if (returned.wasApplied()) _log.info("Dropped " + keyspaceName); else _log.info("Couldn't drop " + keyspaceName); } catch (Exception e) { _log.error("Dropping Keyspace operation Failed ... " + keyspaceName + " does NOT exists"); return; } createNew = true; } if (!found || createNew) { _log.info("Keyspace does not exist, triggering schema creation ... "); int replicationFactor = 2; createKeyspace(keyspaceName, replicationFactor); closeSession(session); createTables(); _log.info("Using Keyspace " + keyspaceName); } } catch (Exception e) { e.printStackTrace(); } } /* * ********************** CASSANDRA KEYSPACE CREATION *********************** * */ private static CqlSession connect() { CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withLocalDatacenter(datacenterName) .build(); _log.info("[OK] Connected to Cassandra Cluster"); return cqlSession; } private static CqlSession connect(String KEYSPACE_NAME) { CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withKeyspace(KEYSPACE_NAME) .withLocalDatacenter(datacenterName) .build(); _log.info("[OK] Connected to Keyspace {} ", KEYSPACE_NAME); return cqlSession; } public static void closeSession(CqlSession session) { if (session != null) session.close(); _log.info("[OK]Session is now closed"); } public void closeConnection(){ if(!myKeyspaceSession.isClosed()){ try{ _log.info("Closing connection"); closeSession(myKeyspaceSession); _log.info("Connection closed!"); }catch(Exception e){ _log.error("Unable to close connection", e); } } } private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder){ return cqlSessionBuilder .withConfigLoader(DriverConfigLoader.programmaticBuilder() // Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out after PT2S .withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(240000)) .withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(240000)) .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(240000)) .build()); } private static void createKeyspace(String keyspaceName, int replicationFactor) { try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withLocalDatacenter(datacenterName) .build()) { cqlSession.execute(SchemaBuilder.createKeyspace(keyspaceName) .ifNotExists() .withSimpleStrategy(replicationFactor) .withDurableWrites(true) .build()); _log.info("+ Keyspace '{}' created.", keyspaceName); closeSession(cqlSession); } } private static ResultSet dropKeyspace(){ ResultSet toreturn; try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withLocalDatacenter(datacenterName) .build()) { toreturn = cqlSession.execute(SchemaBuilder.dropKeyspace(keyspaceName).ifExists().build()); _log.info("Keyspace {} dropped.", keyspaceName); closeSession(cqlSession); } return toreturn; } private void createTables(){ try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withLocalDatacenter(datacenterName) .withKeyspace(keyspaceName) .build()) { createTableUSERNotificationsPreferences(cqlSession); createTableUSERNotifications(cqlSession); createTableVRETimeline(cqlSession); createTableAppTimeline(cqlSession); createTableUSERTimeline(cqlSession); createTableHashtaggedPosts(cqlSession); createTableHashtaggedComments(cqlSession); createTableHashtagsCounter(cqlSession); createTableUSERNotificationsUnread(cqlSession); createTableUSERLikes(cqlSession); createTableVREInvites(cqlSession); createTableEMAILInvites(cqlSession); createTableAttachments(cqlSession); createTableInvites(cqlSession); createTableLikes(cqlSession); createTableComments(cqlSession); createTableNotifications(cqlSession); createTablePosts(cqlSession); closeSession(cqlSession); } } private void createTableUSERNotificationsPreferences(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserNotificationsPreferences") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) .withPartitionKey("type", DataTypes.TEXT) .withColumn("preference", DataTypes.TEXT) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERNotificationsPreferences"); } private void createTableUSERNotifications(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserNotifications") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) .withPartitionKey("timestamp", DataTypes.TIMESTAMP) .withColumn("notid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERNotifications"); } private void createTableVRETimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("VRETimeline") .ifNotExists() .withPartitionKey("vreid", DataTypes.TEXT) .withPartitionKey("timestamp", DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "VRETimeline"); } private void createTableAppTimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("AppTimeline") .ifNotExists() .withPartitionKey("appid", DataTypes.TEXT) .withPartitionKey("timestamp", DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "AppTimeline"); } private void createTableUSERTimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserTimeline") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) .withPartitionKey("timestamp", DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERTimeline"); } private void createTableHashtaggedPosts(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtaggedPosts") .ifNotExists() .withPartitionKey("hashtag", DataTypes.TEXT) .withPartitionKey("postid", DataTypes.UUID) .withColumn("vreid", DataTypes.TEXT) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "HashtaggedPosts"); } private void createTableHashtaggedComments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtaggedComments") .ifNotExists() .withPartitionKey("hashtag", DataTypes.TEXT) .withPartitionKey("commentid", DataTypes.UUID) .withColumn("vreid", DataTypes.TEXT) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "HashtaggedComments"); } private void createTableHashtagsCounter(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtagsCounter") .ifNotExists() .withPartitionKey("vreid", DataTypes.TEXT) .withPartitionKey("hashtag", DataTypes.TEXT) .withColumn("count", DataTypes.BIGINT) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "HashtagsCounter"); } private void createTableUSERNotificationsUnread(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserUnreadNotifications") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) .withPartitionKey("timestamp", DataTypes.TIMESTAMP) .withColumn("notid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERNotificationsUnread"); } private void createTableUSERLikes(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserLikes") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) .withPartitionKey("likeid", DataTypes.UUID) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERLikes"); } private void createTableVREInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("VREInvites") .ifNotExists() .withPartitionKey("vreid", DataTypes.TEXT) .withPartitionKey("inviteid", DataTypes.UUID) .withColumn("status", DataTypes.TEXT) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "VREInvites"); } private void createTableEMAILInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("EmailInvites") .ifNotExists() .withPartitionKey("email", DataTypes.TEXT) .withPartitionKey("vreid", DataTypes.TEXT) .withColumn("inviteid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "EMAILInvites"); } private void createTableAttachments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Attachments") .ifNotExists() .withPartitionKey("attachid", DataTypes.UUID) .withColumn("postid", DataTypes.UUID) .withColumn("uri", DataTypes.TEXT) .withColumn("name", DataTypes.TEXT) .withColumn("description", DataTypes.TEXT) .withColumn("urithumbnail", DataTypes.TEXT) .withColumn("mimetype", DataTypes.TEXT) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("post_attach") .ifNotExists() .onTable("Attachments") .andColumn("postid") .build()); _log.info("+ Table '{}' has been created (if needed).", "Attachments"); } private void createTableInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Invites") .ifNotExists() .withPartitionKey("inviteid", DataTypes.UUID) .withColumn("senderuserid", DataTypes.TEXT) .withColumn("vreid", DataTypes.TEXT) .withColumn("email", DataTypes.TEXT) .withColumn("controlcode", DataTypes.TEXT) .withColumn("status", DataTypes.TEXT) .withColumn("timestamp", DataTypes.TIMESTAMP) .withColumn("senderfullname", DataTypes.TEXT) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("sender") .ifNotExists() .onTable("Invites") .andColumn("senderuserid") .build()); _log.info("+ Table '{}' has been created (if needed).", "Invites"); } private void createTableLikes(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Likes") .ifNotExists() .withPartitionKey("likeid", DataTypes.UUID) .withColumn("userid", DataTypes.TEXT) .withColumn("fullname", DataTypes.TEXT) .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("postid", DataTypes.UUID) .withColumn("timestamp", DataTypes.TIMESTAMP) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("post_likes") .ifNotExists() .onTable("Likes") .andColumn("postid") .build()); _log.info("+ Table '{}' has been created (if needed).", "Likes"); } private void createTableComments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Comments") .ifNotExists() .withPartitionKey("commentid", DataTypes.UUID) .withColumn("userid", DataTypes.TEXT) .withColumn("fullname", DataTypes.TEXT) .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("comment", DataTypes.TEXT) .withColumn("postid", DataTypes.UUID) .withColumn("timestamp", DataTypes.TIMESTAMP) .withColumn("isedit", DataTypes.BOOLEAN) .withColumn("lastedittime", DataTypes.TIMESTAMP) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("post_comments") .ifNotExists() .onTable("Comments") .andColumn("postid") .build()); _log.info("+ Table '{}' has been created (if needed).", "Comments"); } private void createTableNotifications(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Notifications") .ifNotExists() .withPartitionKey("notid", DataTypes.UUID) .withColumn("type", DataTypes.TEXT) .withColumn("userid", DataTypes.TEXT) .withColumn("subjectid", DataTypes.TEXT) .withColumn("timestamp", DataTypes.TIMESTAMP) .withColumn("description", DataTypes.TEXT) .withColumn("uri", DataTypes.TEXT) .withColumn("senderid", DataTypes.TEXT) .withColumn("senderfullname", DataTypes.TEXT) .withColumn("senderthumbnailurl", DataTypes.TEXT) .withColumn("isread", DataTypes.BOOLEAN) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("not_type") .ifNotExists() .onTable("Notifications") .andColumn("type") .build()); _log.info("+ Table '{}' has been created (if needed).", "Notifications"); } private void createTablePosts(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Posts") .ifNotExists() .withPartitionKey("postid", DataTypes.UUID) .withColumn("linkhost", DataTypes.TEXT) .withColumn("description", DataTypes.TEXT) .withColumn("email", DataTypes.TEXT) .withColumn("likesno", DataTypes.BIGINT) .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("linkdescription", DataTypes.TEXT) .withColumn("timestamp", DataTypes.TIMESTAMP) .withColumn("uri", DataTypes.TEXT) .withColumn("isapplicationpost", DataTypes.BOOLEAN) .withColumn("entityid", DataTypes.TEXT) .withColumn("privacy", DataTypes.TEXT) .withColumn("type", DataTypes.TEXT) .withColumn("urithumbnail", DataTypes.TEXT) .withColumn("vreid", DataTypes.TEXT) .withColumn("multifileupload", DataTypes.BOOLEAN) .withColumn("fullname", DataTypes.TEXT) .withColumn("commentsno", DataTypes.BIGINT) .withColumn("linktitle", DataTypes.TEXT) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("posts_privacy") .ifNotExists() .onTable("Posts") .andColumn("privacy") .build()); _log.info("+ Table '{}' has been created (if needed).", "Posts"); } }