From 9c9c7fa46b6f85a6a4fa099f59329fbe498b669f Mon Sep 17 00:00:00 2001 From: Alfredo Oliviero Date: Thu, 12 Sep 2024 15:08:39 +0200 Subject: [PATCH] modificato implementazione, sistemato schema, test funzionanti sistemato logica e aggiunto test. non funziona ordinamento all is working. still missing attachments implemented attachemnts, fixed code, wrote tests and msg generator developing messages --- messages.cql | 137 +-- .../server/CassandraClusterConnection.java | 549 +++++++----- .../server/DBCassandraAstyanaxImpl.java | 807 ++++++++++++------ .../server/DatabookCassandraTest.java | 3 - .../portal/databook/server/DatabookStore.java | 25 +- .../databook/server/RunningCluster.java | 20 + .../gcube/portal/databook/server/Schema.java | 49 ++ .../gcube/portal/databook/server/Tester.java | 53 -- .../gcube/portal/databook/shared/Message.java | 11 +- .../portal/databook/server/BaseDbTest.java | 2 +- .../portal/databook/server/MessagesTest.java | 662 ++++++++++++++ 11 files changed, 1732 insertions(+), 586 deletions(-) delete mode 100644 src/main/java/org/gcube/portal/databook/server/Tester.java create mode 100644 src/test/java/org/gcube/portal/databook/server/MessagesTest.java diff --git a/messages.cql b/messages.cql index a6d699b..5c5d0f1 100644 --- a/messages.cql +++ b/messages.cql @@ -1,66 +1,81 @@ --- Table for sent messages -CREATE TABLE IF NOT EXISTS messages_sent ( - userid TEXT, - message_id UUID, - user_name TEXT, - addresses LIST, - subject TEXT, - body TEXT, - timestamp TIMESTAMP, - with_attachments BOOLEAN, - read BOOLEAN, - opened BOOLEAN, - deleted BOOLEAN, - PRIMARY KEY ((userid), timestamp, message_id) -) -WITH CLUSTERING ORDER BY (timestamp DESC); +CREATE TABLE messages_received ( + recipient_id text, + message_id uuid, + body text, + deleted boolean, + from_id text, + from_name text, + opened boolean, + read boolean, + subject text, + timestamp timestamp, + with_attachments boolean, + addresses list, + PRIMARY KEY (recipient_id, message_id) +) WITH CLUSTERING ORDER BY (message_id ASC) + AND additional_write_policy = '99p' + AND bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND cdc = false + AND comment = '' + AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} + AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = 'default' + AND crc_check_chance = 1.0 + AND default_time_to_live = 0 + AND extensions = {} + AND gc_grace_seconds = 864000 + AND max_index_interval = 2048 + AND memtable_flush_period_in_ms = 0 + AND min_index_interval = 128 + AND read_repair = 'BLOCKING' + AND speculative_retry = '99p'; --- Table for received messages -CREATE TABLE IF NOT EXISTS messages_received ( - recipient_id TEXT, - message_id UUID, - userid TEXT, - user_name TEXT, - addresses LIST, - subject TEXT, - body TEXT, - timestamp TIMESTAMP, - with_attachments BOOLEAN, - read BOOLEAN, - opened BOOLEAN, - deleted BOOLEAN, - PRIMARY KEY ((recipient_id), timestamp, message_id) -) -WITH CLUSTERING ORDER BY (timestamp DESC); +CREATE INDEX message_received_deleted_idx ON messages_received (deleted); --- Materialized view for not deleted sent messages -CREATE MATERIALIZED VIEW IF NOT EXISTS messages_active_sent AS - SELECT * - FROM messages_sent - WHERE userid IS NOT NULL AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL - PRIMARY KEY (userid, timestamp, message_id) - WITH CLUSTERING ORDER BY (timestamp DESC); +CREATE INDEX message_received_id_idx ON messages_received (message_id); --- Materialized view for not deleted received messages -CREATE MATERIALIZED VIEW IF NOT EXISTS messages_active_received AS - SELECT * - FROM messages_received - WHERE recipient_id IS NOT NULL AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL - PRIMARY KEY (recipient_id, timestamp, message_id) - WITH CLUSTERING ORDER BY (timestamp DESC); +CREATE INDEX message_received_read_idx ON messages_received (read); --- Materialized view for unread and not deleted sent messages -CREATE MATERIALIZED VIEW IF NOT EXISTS messages_unread_sent AS - SELECT * - FROM messages_sent - WHERE userid IS NOT NULL AND read = false AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL - PRIMARY KEY (userid, timestamp, message_id) - WITH CLUSTERING ORDER BY (timestamp DESC); +CREATE INDEX message_received_timestamp_idx ON messages_received (timestamp); --- Materialized view for unread and not deleted received messages -CREATE MATERIALIZED VIEW IF NOT EXISTS messages_unread_received AS - SELECT * - FROM messages_received - WHERE recipient_id IS NOT NULL AND read = false AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL - PRIMARY KEY (recipient_id, timestamp, message_id) - WITH CLUSTERING ORDER BY (timestamp DESC); + +CREATE TABLE messages_sent ( + from_id text, + message_id uuid, + body text, + deleted boolean, + from_name text, + opened boolean, + read boolean, + subject text, + timestamp timestamp, + with_attachments boolean, + addresses list, + PRIMARY KEY (from_id, message_id) +) WITH CLUSTERING ORDER BY (message_id ASC) + AND additional_write_policy = '99p' + AND bloom_filter_fp_chance = 0.01 + AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'} + AND cdc = false + AND comment = '' + AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'} + AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} + AND memtable = 'default' + AND crc_check_chance = 1.0 + AND default_time_to_live = 0 + AND extensions = {} + AND gc_grace_seconds = 864000 + AND max_index_interval = 2048 + AND memtable_flush_period_in_ms = 0 + AND min_index_interval = 128 + AND read_repair = 'BLOCKING' + AND speculative_retry = '99p'; + +CREATE INDEX message_sent_deleted_idx ON messages_sent (deleted); + +CREATE INDEX message_sent_id_idx ON messages_sent (message_id); + +CREATE INDEX message_sent_read_idx ON messages_sent (read); + +CREATE INDEX message_sent_timestamp_idx ON messages_sent (timestamp); \ No newline at end of file diff --git a/src/main/java/org/gcube/portal/databook/server/CassandraClusterConnection.java b/src/main/java/org/gcube/portal/databook/server/CassandraClusterConnection.java index f283e23..99b518f 100644 --- a/src/main/java/org/gcube/portal/databook/server/CassandraClusterConnection.java +++ b/src/main/java/org/gcube/portal/databook/server/CassandraClusterConnection.java @@ -4,6 +4,8 @@ import java.net.InetSocketAddress; import java.time.Duration; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; @@ -14,12 +16,8 @@ import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.type.DataTypes; -import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.datastax.oss.driver.api.querybuilder.SchemaBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * @author Massimiliano Assante ISTI-CNR * @author Ahmed Ibrahim ISTI-CNR @@ -39,16 +37,37 @@ public class CassandraClusterConnection { private static String keyspaceName; private CqlSession myKeyspaceSession; + public static CassandraClusterConnection setCustomConfig(String config_host, String config_datacenterName, + String config_keyspaceName) throws Exception { + return setCustomConfig(config_host, config_datacenterName, config_keyspaceName, false, false); + } + + public static CassandraClusterConnection setCustomConfig(String config_host, String config_datacenterName, + String config_keyspaceName, boolean dropSchema, boolean forceCreateNew) throws Exception { + RunningCluster cluster = RunningCluster.setCustomizedInstance(config_host, config_datacenterName, + config_keyspaceName); + + // host = cluster.getHost(); + hosts = cluster.getHosts(); + datacenterName = cluster.getDatacenterName(); + keyspaceName = cluster.getKeyspaceName(); + + _log.info("set custom config for CassandraClusterConnection. Hosts " + hosts + ", datacenterName" + + datacenterName + ", CassandraClusterConnection" + keyspaceName); + + return new CassandraClusterConnection(dropSchema, forceCreateNew); + } + /** * * @param dropSchema set true if you want do drop the current and set up new one - * the connection to cassandra cluster + * the connection to cassandra cluster */ - protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew) throws Exception { + public CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew) throws Exception { if (hosts == null || datacenterName == null || keyspaceName == null) { RunningCluster cluster = RunningCluster.getInstance(null); - //host = cluster.getHost(); + // host = cluster.getHost(); hosts = cluster.getHosts(); datacenterName = cluster.getDatacenterName(); keyspaceName = cluster.getKeyspaceName(); @@ -63,12 +82,13 @@ public class CassandraClusterConnection { /** * * @param dropSchema set true if you want to drop the current and set up new one - * the connection to cassandra cluster + * the connection to cassandra cluster */ - protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew, String infrastructureName) throws Exception { + protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew, String infrastructureName) + throws Exception { if (hosts == null || datacenterName == null || keyspaceName == null) { RunningCluster cluster = RunningCluster.getInstance(infrastructureName); - //host = cluster.getHost(); + // host = cluster.getHost(); hosts = cluster.getHosts(); datacenterName = cluster.getDatacenterName(); keyspaceName = cluster.getKeyspaceName(); @@ -79,8 +99,8 @@ public class CassandraClusterConnection { _log.info("CONNECTED! using KeySpace: " + keyspaceName); } - public CqlSession getKeyspaceSession(){ - if (myKeyspaceSession.isClosed()){ + public CqlSession getKeyspaceSession() { + if (myKeyspaceSession.isClosed()) { myKeyspaceSession = connect(keyspaceName); } return myKeyspaceSession; @@ -88,15 +108,15 @@ public class CassandraClusterConnection { /** * @param dropSchema set true if you want to drop the current and set up new one - * the connection to cassandra cluster + * the connection to cassandra cluster */ public void SetUpKeySpaces(boolean dropSchema, boolean forceExecution) { boolean createNew = false; boolean found = false; - CqlSession session = connect(); + CqlSession session = connect(); Metadata metaData = session.getMetadata(); for (KeyspaceMetadata meta : metaData.getKeyspaces().values()) { - if (meta.getName().toString().equals(keyspaceName)){ + if (meta.getName().toString().equals(keyspaceName)) { found = true; break; } @@ -133,7 +153,7 @@ public class CassandraClusterConnection { /* * - ********************** CASSANDRA KEYSPACE CREATION *********************** + ********************** CASSANDRA KEYSPACE CREATION *********************** * */ private static CqlSession connect() { @@ -144,6 +164,7 @@ public class CassandraClusterConnection { _log.info("[OK] Connected to Cassandra Cluster"); return cqlSession; } + private static CqlSession connect(String KEYSPACE_NAME) { CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) @@ -155,31 +176,34 @@ public class CassandraClusterConnection { } public static void closeSession(CqlSession session) { - if (session != null) session.close(); + if (session != null) + session.close(); _log.info("[OK]Session is now closed"); } - public void closeConnection(){ - if(!myKeyspaceSession.isClosed()){ - try{ + public void closeConnection() { + if (!myKeyspaceSession.isClosed()) { + try { _log.info("Closing connection"); closeSession(myKeyspaceSession); _log.info("Connection closed!"); - }catch(Exception e){ + } catch (Exception e) { _log.error("Unable to close connection", e); } } } - private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder){ + private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder) { return cqlSessionBuilder .withConfigLoader(DriverConfigLoader.programmaticBuilder() - // Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out after PT2S + // Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out + // after PT2S .withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(240000)) .withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(240000)) .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(240000)) .build()); } + private static void createKeyspace(String keyspaceName, int replicationFactor) { try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) @@ -195,7 +219,7 @@ public class CassandraClusterConnection { } } - private static ResultSet dropKeyspace(){ + private static ResultSet dropKeyspace() { ResultSet toreturn; try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) @@ -207,7 +231,68 @@ public class CassandraClusterConnection { } return toreturn; } - private void createTables(){ + + private void dropMessagesTables(CqlSession cqlSession) { + // try { + cqlSession.execute( + SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_RECEIVED_ACTIVE).ifExists().build()); + cqlSession.execute( + SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_RECEIVED_READ).ifExists().build()); + cqlSession + .execute(SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_SENT_ACTIVE).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_SENT_READ).ifExists().build()); + // } catch (Exception e) { + // e.printStackTrace(); + // } + + // try { + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_ID).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_READ).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_DELETED).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_TIMESTAMP).ifExists().build()); + + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_ID).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_READ).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_DELETED).ifExists().build()); + cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_TIMESTAMP).ifExists().build()); + // } catch (Exception e) { + // e.printStackTrace(); + // } + + // try { + cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_SENT).ifExists().build()); + // } catch (Exception e) { + // e.printStackTrace(); + // } + + // try { + cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_RECEIVED).ifExists().build()); + // } catch (Exception e) { + // e.printStackTrace(); + // } + } + + private void createMessagesTables(CqlSession cqlSession) { + + // dropMessagesTables(cqlSession); + + // try { + createTableMessagesSent(cqlSession); + createTableMessagesReceived(cqlSession); + createTableMessagesAttachments(cqlSession); + + // } catch (Exception e) { + // e.printStackTrace(); + // } + + // materialized views sono disattivate + // createViewMessagesActiveSent(cqlSession); + // createViewMessagesActiveReceived(cqlSession); + // createViewMessagesUnreadReceived(cqlSession); + // createViewMessagesUnreadSent(cqlSession); + } + + private void createTables() { try (CqlSession cqlSession = configBuilder(CqlSession.builder()) .addContactPoints(hosts) .withLocalDatacenter(datacenterName) @@ -233,16 +318,14 @@ public class CassandraClusterConnection { createTableNotifications(cqlSession); createTablePosts(cqlSession); - createTableMessagesSent(cqlSession); - createTableMessagesReceived(cqlSession); - createViewMessagesActiveSent(cqlSession); - createViewMessagesActiveReceived(cqlSession); - createViewMessagesUnreadReceived(cqlSession); - createViewMessagesUnreadSent(cqlSession); + createMessagesTables(cqlSession); closeSession(cqlSession); + } catch (Exception e) { + e.printStackTrace(); } } + private void createTableUSERNotificationsPreferences(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserNotificationsPreferences") .ifNotExists() @@ -254,49 +337,54 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "USERNotificationsPreferences"); } + private void createTableUSERNotifications(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserNotifications") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) - .withPartitionKey("timestamp", DataTypes.TIMESTAMP) + .withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("notid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERNotifications"); } + private void createTableVRETimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("VRETimeline") .ifNotExists() .withPartitionKey("vreid", DataTypes.TEXT) - .withPartitionKey("timestamp", DataTypes.TIMESTAMP) + .withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "VRETimeline"); } + private void createTableAppTimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("AppTimeline") .ifNotExists() .withPartitionKey("appid", DataTypes.TEXT) - .withPartitionKey("timestamp", DataTypes.TIMESTAMP) + .withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "AppTimeline"); } + private void createTableUSERTimeline(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserTimeline") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) - .withPartitionKey("timestamp", DataTypes.TIMESTAMP) + .withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("postid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERTimeline"); } + private void createTableHashtaggedPosts(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtaggedPosts") .ifNotExists() @@ -308,6 +396,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "HashtaggedPosts"); } + private void createTableHashtaggedComments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtaggedComments") .ifNotExists() @@ -319,6 +408,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "HashtaggedComments"); } + private void createTableHashtagsCounter(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("HashtagsCounter") .ifNotExists() @@ -330,16 +420,18 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "HashtagsCounter"); } + private void createTableUSERNotificationsUnread(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserUnreadNotifications") .ifNotExists() .withPartitionKey("userid", DataTypes.TEXT) - .withPartitionKey("timestamp", DataTypes.TIMESTAMP) + .withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("notid", DataTypes.UUID) .withCompactStorage() .build()); _log.info("+ Table '{}' has been created (if needed).", "USERNotificationsUnread"); } + private void createTableUSERLikes(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("UserLikes") .ifNotExists() @@ -351,6 +443,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "USERLikes"); } + private void createTableVREInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("VREInvites") .ifNotExists() @@ -362,6 +455,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "VREInvites"); } + private void createTableEMAILInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("EmailInvites") .ifNotExists() @@ -372,6 +466,7 @@ public class CassandraClusterConnection { .build()); _log.info("+ Table '{}' has been created (if needed).", "EMAILInvites"); } + private void createTableAttachments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Attachments") .ifNotExists() @@ -392,6 +487,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "Attachments"); } + private void createTableInvites(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Invites") .ifNotExists() @@ -401,7 +497,7 @@ public class CassandraClusterConnection { .withColumn("email", DataTypes.TEXT) .withColumn("controlcode", DataTypes.TEXT) .withColumn("status", DataTypes.TEXT) - .withColumn("timestamp", DataTypes.TIMESTAMP) + .withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("senderfullname", DataTypes.TEXT) .withCompactStorage() .build()); @@ -413,6 +509,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "Invites"); } + private void createTableLikes(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Likes") .ifNotExists() @@ -421,7 +518,7 @@ public class CassandraClusterConnection { .withColumn("fullname", DataTypes.TEXT) .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("postid", DataTypes.UUID) - .withColumn("timestamp", DataTypes.TIMESTAMP) + .withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withCompactStorage() .build()); cqlSession.execute(SchemaBuilder.createIndex("post_likes") @@ -432,6 +529,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "Likes"); } + private void createTableComments(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Comments") .ifNotExists() @@ -441,7 +539,7 @@ public class CassandraClusterConnection { .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("comment", DataTypes.TEXT) .withColumn("postid", DataTypes.UUID) - .withColumn("timestamp", DataTypes.TIMESTAMP) + .withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("isedit", DataTypes.BOOLEAN) .withColumn("lastedittime", DataTypes.TIMESTAMP) .withCompactStorage() @@ -454,6 +552,7 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "Comments"); } + private void createTableNotifications(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Notifications") .ifNotExists() @@ -477,6 +576,7 @@ public class CassandraClusterConnection { .build()); _log.info("+ Table '{}' has been created (if needed).", "Notifications"); } + private void createTablePosts(CqlSession cqlSession) { cqlSession.execute(SchemaBuilder.createTable("Posts") .ifNotExists() @@ -487,7 +587,7 @@ public class CassandraClusterConnection { .withColumn("likesno", DataTypes.BIGINT) .withColumn("thumbnailurl", DataTypes.TEXT) .withColumn("linkdescription", DataTypes.TEXT) - .withColumn("timestamp", DataTypes.TIMESTAMP) + .withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) .withColumn("uri", DataTypes.TEXT) .withColumn("isapplicationpost", DataTypes.BOOLEAN) .withColumn("entityid", DataTypes.TEXT) @@ -510,169 +610,234 @@ public class CassandraClusterConnection { _log.info("+ Table '{}' has been created (if needed).", "Posts"); } - // Method to create the messages_sent table - private void createTableMessagesSent(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createTable("messages_sent") - .ifNotExists() - .withPartitionKey("userid", DataTypes.TEXT) - .withClusteringColumn("timestamp", DataTypes.TIMESTAMP) - .withClusteringColumn("message_id", DataTypes.UUID) - .withColumn("user_name", DataTypes.TEXT) - .withColumn("addresses", DataTypes.listOf(DataTypes.TEXT)) - .withColumn("subject", DataTypes.TEXT) - .withColumn("body", DataTypes.TEXT) - .withColumn("with_attachments", DataTypes.BOOLEAN) - .withColumn("read", DataTypes.BOOLEAN) - .withColumn("opened", DataTypes.BOOLEAN) - .withColumn("deleted", DataTypes.BOOLEAN) - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + // Method to create the messages_sent table + private void createTableMessagesSent(CqlSession cqlSession) { + cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_SENT) + .ifNotExists() + .withPartitionKey(Schema.FROM_ID, DataTypes.TEXT) // Partition key on sender ID + .withClusteringColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) // Clustering by timestamp for ordering + .withClusteringColumn(Schema.MESSAGE_ID, DataTypes.UUID) // Clustering by message ID for uniqueness + .withColumn(Schema.FROM_NAME, DataTypes.TEXT) // Sender's name + .withColumn(Schema.RECIPIENTS, DataTypes.listOf(DataTypes.TEXT)) // List of recipients + .withColumn(Schema.SUBJECT, DataTypes.TEXT) // Message subject + .withColumn(Schema.BODY, DataTypes.TEXT) // Message body + .withColumn(Schema.WITH_ATTACH, DataTypes.BOOLEAN) // Boolean flag for attachments + .withColumn(Schema.ISREAD, DataTypes.BOOLEAN) // Boolean flag for read status + .withColumn(Schema.ISOPENED, DataTypes.BOOLEAN) // Boolean flag for opened status + .withColumn(Schema.ISDELETED, DataTypes.BOOLEAN) // Boolean flag for deleted status + .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) // Descending order by timestamp + .build()); - _log.info("+ Table '{}' has been created (if needed).", "messages_sent"); - } + // Remove unnecessary indexes: no need for message_id or timestamp indexes + // Retain only the index for deleted and read messages + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_DELETED) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_SENT) + .andColumn(Schema.ISDELETED) + .build()); - // Method to create the messages_received table - private void createTableMessagesReceived(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createTable("messages_received") - .ifNotExists() - .withPartitionKey("recipient_id", DataTypes.TEXT) - .withClusteringColumn("timestamp", DataTypes.TIMESTAMP) - .withClusteringColumn("message_id", DataTypes.UUID) - .withColumn("userid", DataTypes.TEXT) - .withColumn("user_name", DataTypes.TEXT) - .withColumn("addresses", DataTypes.listOf(DataTypes.TEXT)) - .withColumn("subject", DataTypes.TEXT) - .withColumn("body", DataTypes.TEXT) - .withColumn("with_attachments", DataTypes.BOOLEAN) - .withColumn("read", DataTypes.BOOLEAN) - .withColumn("opened", DataTypes.BOOLEAN) - .withColumn("deleted", DataTypes.BOOLEAN) - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_READ) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_SENT) + .andColumn(Schema.ISREAD) + .build()); - _log.info("+ Table '{}' has been created (if needed).", "messages_received"); - } + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_ID) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_SENT) + .andColumn(Schema.MESSAGE_ID) + .build()); - // Method to create the materialized view for non-deleted sent messages - private void createViewMessagesActiveSent(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createMaterializedView("messages_active_sent") - .ifNotExists() - .asSelectFrom("messages_sent") - .column("userid") - .column("timestamp") - .column("message_id") - .column("user_name") - .column("addresses") - .column("subject") - .column("body") - .column("with_attachments") - .column("read") - .column("opened") - .column("deleted") - .whereColumn("userid").isNotNull() - .whereColumn("deleted").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("timestamp").isNotNull() - .whereColumn("message_id").isNotNull() - .withPartitionKey("userid") - .withClusteringColumn("timestamp") - .withClusteringColumn("message_id") - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + _log.info("+ Table '{}' has been created (if needed).", "messages_sent"); + } - _log.info("+ Materialized View '{}' has been created (if needed).", "messages_active_sent"); - } + private void createTableMessagesAttachments(CqlSession cqlSession) { - // Method to create the materialized view for non-deleted received messages - private void createViewMessagesActiveReceived(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createMaterializedView("messages_active_received") - .ifNotExists() - .asSelectFrom("messages_received") - .column("recipient_id") - .column("timestamp") - .column("message_id") - .column("userid") - .column("user_name") - .column("addresses") - .column("subject") - .column("body") - .column("with_attachments") - .column("read") - .column("opened") - .column("deleted") - .whereColumn("recipient_id").isNotNull() - .whereColumn("deleted").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("timestamp").isNotNull() - .whereColumn("message_id").isNotNull() - .withPartitionKey("recipient_id") - .withClusteringColumn("timestamp") - .withClusteringColumn("message_id") - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_ATTACHMENTS).ifExists().build()); - _log.info("+ Materialized View '{}' has been created (if needed).", "messages_active_received"); - } + cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_ATTACHMENTS) + .ifNotExists() + .withPartitionKey(Schema.MESSAGE_ID, DataTypes.UUID) // Partition key on MESSAGE ID + .withClusteringColumn(Schema.ATTACH_ID, DataTypes.UUID) // Clustering by message ID for uniqueness - // Method to create the materialized view for unread sent messages - private void createViewMessagesUnreadSent(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createMaterializedView("messages_unread_sent") - .ifNotExists() - .asSelectFrom("messages_sent") - .column("userid") - .column("timestamp") - .column("message_id") - .column("user_name") - .column("addresses") - .column("subject") - .column("body") - .column("with_attachments") - .column("read") - .column("opened") - .column("deleted") - .whereColumn("userid").isNotNull() - .whereColumn("read").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("deleted").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("timestamp").isNotNull() - .whereColumn("message_id").isNotNull() - .withPartitionKey("userid") - .withClusteringColumn("timestamp") - .withClusteringColumn("message_id") - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + .withColumn(Schema.URI, DataTypes.TEXT) // File URL + .withColumn(Schema.NAME, DataTypes.TEXT) // File Name + .withColumn(Schema.DESCRIPTION, DataTypes.TEXT) // File Description + .withColumn(Schema.URI_THUMBNAIL, DataTypes.TEXT) // File Thumb URI + .withColumn(Schema.MIME_TYPE, DataTypes.TEXT) // Boolean flag for attachments + .build()); + } - _log.info("+ Materialized View '{}' has been created (if needed).", "messages_unread_sent"); - } + // Method to create the messages_received table + private void createTableMessagesReceived(CqlSession cqlSession) { + cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_RECEIVED) + .ifNotExists() + .withPartitionKey(Schema.RECIPIENT_ID, DataTypes.TEXT) // Partition key on recipient ID + .withClusteringColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) // Clustering by timestamp for ordering + .withClusteringColumn(Schema.MESSAGE_ID, DataTypes.UUID) // Clustering by message ID for uniqueness + .withColumn(Schema.FROM_ID, DataTypes.TEXT) // Sender's ID + .withColumn(Schema.FROM_NAME, DataTypes.TEXT) // Sender's name + .withColumn(Schema.RECIPIENTS, DataTypes.listOf(DataTypes.TEXT)) // List of recipients + .withColumn(Schema.SUBJECT, DataTypes.TEXT) // Message subject + .withColumn(Schema.BODY, DataTypes.TEXT) // Message body + .withColumn(Schema.WITH_ATTACH, DataTypes.BOOLEAN) // Boolean flag for attachments + .withColumn(Schema.ISREAD, DataTypes.BOOLEAN) // Boolean flag for read status + .withColumn(Schema.ISOPENED, DataTypes.BOOLEAN) // Boolean flag for opened status + .withColumn(Schema.ISDELETED, DataTypes.BOOLEAN) // Boolean flag for deleted status + .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) // Descending order by timestamp + .build()); - // Method to create the materialized view for unread received messages - private void createViewMessagesUnreadReceived(CqlSession cqlSession) { - cqlSession.execute(SchemaBuilder.createMaterializedView("messages_unread_received") - .ifNotExists() - .asSelectFrom("messages_received") - .column("recipient_id") - .column("timestamp") - .column("message_id") - .column("userid") - .column("user_name") - .column("addresses") - .column("subject") - .column("body") - .column("with_attachments") - .column("read") - .column("opened") - .column("deleted") - .whereColumn("recipient_id").isNotNull() - .whereColumn("read").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("deleted").isEqualTo(QueryBuilder.literal(false)) - .whereColumn("timestamp").isNotNull() - .whereColumn("message_id").isNotNull() - .withPartitionKey("recipient_id") - .withClusteringColumn("timestamp") - .withClusteringColumn("message_id") - .withClusteringOrder("timestamp", ClusteringOrder.DESC) - .build()); + // Remove unnecessary indexes: no need for message_id or timestamp indexes + // Retain only the index for deleted and read messages + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_DELETED) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_RECEIVED) + .andColumn(Schema.ISDELETED) + .build()); - _log.info("+ Materialized View '{}' has been created (if needed).", "messages_unread_received"); - } + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_READ) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_RECEIVED) + .andColumn(Schema.ISREAD) + .build()); - + cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_ID) + .ifNotExists() + .onTable(Schema.TABLE_MESSAGES_RECEIVED) + .andColumn(Schema.MESSAGE_ID) + .build()); + _log.info("+ Table '{}' has been created (if needed).", "messages_received"); + } + + /* + * // Method to create the materialized view for non-deleted sent messages + * private void createViewMessagesActiveSent(CqlSession cqlSession) { + * cqlSession.execute(SchemaBuilder.createMaterializedView(Schema. + * TABLE_MESSAGES_SENT_ACTIVE) + * .ifNotExists() + * .asSelectFrom(Schema.TABLE_MESSAGES_SENT) + * .column(Schema.FROM_ID) + * .column(Schema.TIMESTAMP) + * .column(Schema.MESSAGE_ID) + * .column(Schema.FROM_NAME) + * .column(Schema.RECIPIENTS) + * .column(Schema.SUBJECT) + * .column(Schema.BODY) + * .column(Schema.WITH_ATTACH) + * .column(Schema.ISREAD) + * .column(Schema.ISOPENED) + * .column(Schema.ISDELETED) + * .whereColumn(Schema.FROM_ID).isNotNull() + * .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.TIMESTAMP).isNotNull() + * .whereColumn(Schema.MESSAGE_ID).isNotNull() + * .withPartitionKey(Schema.FROM_ID) + * .withClusteringColumn(Schema.TIMESTAMP) + * .withClusteringColumn(Schema.MESSAGE_ID) + * .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) + * .build()); + * + * _log.info("+ Materialized View '{}' has been created (if needed).", + * "messages_active_sent"); + * } + * + * // Method to create the materialized view for non-deleted received messages + * private void createViewMessagesActiveReceived(CqlSession cqlSession) { + * cqlSession.execute(SchemaBuilder.createMaterializedView(Schema. + * TABLE_MESSAGES_RECEIVED_ACTIVE) + * .ifNotExists() + * .asSelectFrom(Schema.TABLE_MESSAGES_RECEIVED) + * .column(Schema.RECIPIENT_ID) + * .column(Schema.TIMESTAMP) + * .column(Schema.MESSAGE_ID) + * .column(Schema.FROM_ID) + * .column(Schema.FROM_NAME) + * .column(Schema.RECIPIENTS) + * .column(Schema.SUBJECT) + * .column(Schema.BODY) + * .column(Schema.WITH_ATTACH) + * .column(Schema.ISREAD) + * .column(Schema.ISOPENED) + * .column(Schema.ISDELETED) + * .whereColumn(Schema.RECIPIENT_ID).isNotNull() + * .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.TIMESTAMP).isNotNull() + * .whereColumn(Schema.MESSAGE_ID).isNotNull() + * .withPartitionKey(Schema.RECIPIENT_ID) + * .withClusteringColumn(Schema.TIMESTAMP) + * .withClusteringColumn(Schema.MESSAGE_ID) + * .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) + * .build()); + * + * _log.info("+ Materialized View '{}' has been created (if needed).", + * "messages_active_received"); + * } + * + * // Method to create the materialized view for unread sent messages + * private void createViewMessagesUnreadSent(CqlSession cqlSession) { + * cqlSession.execute(SchemaBuilder.createMaterializedView(Schema. + * TABLE_MESSAGES_SENT_READ) + * .ifNotExists() + * .asSelectFrom(Schema.TABLE_MESSAGES_SENT) + * .column(Schema.FROM_ID) + * .column(Schema.TIMESTAMP) + * .column(Schema.MESSAGE_ID) + * .column(Schema.FROM_NAME) + * .column(Schema.RECIPIENTS) + * .column(Schema.SUBJECT) + * .column(Schema.BODY) + * .column(Schema.WITH_ATTACH) + * .column(Schema.ISREAD) + * .column(Schema.ISOPENED) + * .column(Schema.ISDELETED) + * .whereColumn(Schema.FROM_ID).isNotNull() + * .whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.TIMESTAMP).isNotNull() + * .whereColumn(Schema.MESSAGE_ID).isNotNull() + * .withPartitionKey(Schema.FROM_ID) + * .withClusteringColumn(Schema.TIMESTAMP) + * .withClusteringColumn(Schema.MESSAGE_ID) + * .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) + * .build()); + * + * _log.info("+ Materialized View '{}' has been created (if needed).", + * "messages_unread_sent"); + * } + * + * // Method to create the materialized view for unread received messages + * private void createViewMessagesUnreadReceived(CqlSession cqlSession) { + * cqlSession.execute(SchemaBuilder.createMaterializedView(Schema. + * TABLE_MESSAGES_RECEIVED_READ) + * .ifNotExists() + * .asSelectFrom(Schema.TABLE_MESSAGES_RECEIVED) + * .column(Schema.RECIPIENT_ID) + * .column(Schema.TIMESTAMP) + * .column(Schema.MESSAGE_ID) + * .column(Schema.FROM_ID) + * .column(Schema.FROM_NAME) + * .column(Schema.RECIPIENTS) + * .column(Schema.SUBJECT) + * .column(Schema.BODY) + * .column(Schema.WITH_ATTACH) + * .column(Schema.ISREAD) + * .column(Schema.ISOPENED) + * .column(Schema.ISDELETED) + * .whereColumn(Schema.RECIPIENT_ID).isNotNull() + * .whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false)) + * .whereColumn(Schema.TIMESTAMP).isNotNull() + * .whereColumn(Schema.MESSAGE_ID).isNotNull() + * .withPartitionKey(Schema.RECIPIENT_ID) + * .withClusteringColumn(Schema.TIMESTAMP) + * .withClusteringColumn(Schema.MESSAGE_ID) + * .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) + * .build()); + * + * _log.info("+ Materialized View '{}' has been created (if needed).", + * "messages_unread_received"); + * } + * + */ } \ No newline at end of file diff --git a/src/main/java/org/gcube/portal/databook/server/DBCassandraAstyanaxImpl.java b/src/main/java/org/gcube/portal/databook/server/DBCassandraAstyanaxImpl.java index 37c8baf..ee2c225 100644 --- a/src/main/java/org/gcube/portal/databook/server/DBCassandraAstyanaxImpl.java +++ b/src/main/java/org/gcube/portal/databook/server/DBCassandraAstyanaxImpl.java @@ -62,7 +62,12 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; +import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; +import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert; +import com.datastax.oss.driver.api.querybuilder.select.Select; +import com.datastax.oss.driver.api.querybuilder.update.Update; /** * @author Massimiliano Assante ISTI-CNR @@ -87,6 +92,10 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { return conn; } + public CqlSession getSession() { + return this.getConnection().getKeyspaceSession(); + } + /** * use this constructor carefully from test classes * @@ -4547,160 +4556,293 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { return toReturn; } - protected List getSentMessagesByUser(String userId, String messageId, Date timestamp, Integer limit, - Boolean include_deleted, CqlSession session) { + /// MESSAGES / - if (session == null) + /** + * {@inheritDoc} + * + * @throws NotificationTypeNotFoundException + */ + public List getAllNotifications(Date timestamp, Integer limit, CqlSession session) + throws NotificationTypeNotFoundException { + if (session == null) { session = conn.getKeyspaceSession(); - - List messages = new ArrayList<>(); - - String query = "SELECT message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted " - + "FROM messages_received WHERE userid = ? "; - - ArrayList params = new ArrayList<>(); - params.add(userId); - - if (messageId != null) { - query += " AND messageId = ? "; - params.add(messageId); } + // Build the select query from the appropriate table (active or full sent + // messages) + Select select = QueryBuilder.selectFrom(Schema.NOTIFICATIONS) + .all(); + + if (limit != null) { + select = select.limit(limit); + } + // Add a condition to get messages with a timestamp less than the provided one if (timestamp != null) { - query += " AND timestamp <= ? "; - params.add(timestamp); + select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant())); + select = select.allowFiltering(); } + ArrayList toReturn = new ArrayList(); + // Converte la query in SimpleStatement + SimpleStatement statement = select.build(); - if (!include_deleted) { - query += " AND deleted = false "; - } - - if (limit != null && limit > 0) { - query += " LIMIT ? "; - - params.add(limit); - } - - SimpleStatement statement = SimpleStatement.builder(query) - .addPositionalValues(params) - .build(); + // Visualizza la query CQL generata + // System.out.println("Generated CQL Query: " + statement.getQuery()); + // Execute the query and get the result set ResultSet resultSet = session.execute(statement); - for (Row row : resultSet) { - UUID message_id = row.getUuid("message_id"); - String userName = row.getString("user_name"); - List addresses = row.getList("addresses", String.class); - String subject = row.getString("subject"); - String body = row.getString("body"); - Date msgTimestamp = Date.from(row.getInstant("timestamp")); - boolean withAttachments = row.getBoolean("with_attachments"); - boolean isRead = row.getBoolean("read"); - boolean isOpened = row.getBoolean("opened"); - boolean isDeleted = row.getBoolean("deleted"); + for (Row record : resultSet) { + toReturn.add(readNotificationFromRow(record)); + } + return toReturn; + } + + @Override + public List getSentMessagesBySender(String fromId, String messageId, Date timestamp, Integer limit, + Boolean filter_deleted, Boolean filter_read, CqlSession session) { + + if (fromId == null) { + throw new NullArgumentException("fromId cannot be null"); + } + + if (session == null) { + session = conn.getKeyspaceSession(); + } + + // Build the select query from the appropriate table (active or full sent + // messages) + Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_SENT) + .all() + .whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(fromId)); + + if (limit != null) { + select = select.limit(limit); + } + + // Add a condition on messageId if provided + if (messageId != null) { + select = select.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); + } + + // Add a condition to get messages with a timestamp less than the provided one + if (timestamp != null) { + select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant())); + } + + // Add a condition to filter on deleted if filter_deleted is not null + if (filter_deleted != null) { + select = select.whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(filter_deleted)); + } + + // Add a condition to filter on read messages if filter_read is not null + if (filter_read != null) { + select = select.whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(filter_read)); + } + + // if (messageId == null) { + // // Add ordering by timestamp in descending order + // select = select.orderBy(Schema.TIMESTAMP, ClusteringOrder.DESC); + // } + + if (filter_deleted != null || filter_read != null) { + select = select.allowFiltering(); + } + + // Converte la query in SimpleStatement + SimpleStatement statement = select.build(); + + // Visualizza la query CQL generata + // System.out.println("Generated CQL Query: " + statement.getQuery()); + + // Execute the query and get the result set + ResultSet resultSet = session.execute(statement); + + // List to store the MessageSent objects + List messagesSent = new ArrayList<>(); + + // Iterate through the result set and create MessageSent objects + for (Row row : resultSet) { + // Use the constructor of MessageSent to initialize the object MessageSent message = new MessageSent( - message_id.toString(), - userId, - userName, - addresses, - subject, - body, - msgTimestamp, - withAttachments, - isRead, - isOpened, - isDeleted); + row.getUuid(Schema.MESSAGE_ID).toString(), // message ID + row.getString(Schema.FROM_ID), // from ID (user ID) + row.getString(Schema.FROM_NAME), // user name + row.getList(Schema.RECIPIENTS, String.class), // list of recipients + row.getString(Schema.SUBJECT), // subject of the message + row.getString(Schema.BODY), // body of the message + Date.from(row.getInstant(Schema.TIMESTAMP)), // timestamp of the message + row.getBoolean(Schema.WITH_ATTACH), // whether the message has attachments + row.getBoolean(Schema.ISREAD), // whether the message is read + row.getBoolean(Schema.ISOPENED), // whether the message is opened + row.getBoolean(Schema.ISDELETED) // whether the message is deleted + ); - messages.add(message); + // Add the message to the list + messagesSent.add(message); } - return messages; + // Return the list of messages + return messagesSent; } - protected List getReceivedMessagesByUser(String recipientId, String messageId, Date timestamp, - Integer limit, Boolean include_deleted, CqlSession session) { + @Override + public List getReceivedMessagesByRecipient(String recipientId, String messageId, Date timestamp, + Integer limit, Boolean filter_deleted, Boolean filter_read, CqlSession session) { + + if (recipientId == null) { + throw new NullArgumentException("recipientId cannot be null"); + } + + if (session == null) { + session = conn.getKeyspaceSession(); + } + + // Build the select query from the appropriate table (active or full received + // messages) + Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_RECEIVED) + .all() + .whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(recipientId)); + + if (limit != null) { + select = select.limit(limit); + } + + // Add a condition on messageId if provided + if (messageId != null) { + select = select.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); + } + + // Add a condition to get messages with a timestamp less than the provided one + if (timestamp != null) { + select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant())); + } + + // Add a condition to filter on deleted if filter_deleted is not null + if (filter_deleted != null) { + select = select.whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(filter_deleted)); + } + + // Add a condition to filter on read messages if filter_read is not null + if (filter_read != null) { + select = select.whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(filter_read)); + } + + // if (messageId == null) { + // // Add ordering by timestamp in descending order + // select = select.orderBy(Schema.TIMESTAMP, ClusteringOrder.DESC); + // } + + if (filter_deleted != null || filter_read != null) { + select = select.allowFiltering(); + } + + // Converte la query in SimpleStatement + SimpleStatement statement = select.build(); + + // Visualizza la query CQL generata + // System.out.println("Generated CQL Query: " + statement.getQuery()); + + // Execute the query and get the result set + ResultSet resultSet = session.execute(statement); + + // List to store the MessageReceived objects + List messagesReceived = new ArrayList<>(); + + // Iterate through the result set and create MessageReceived objects + for (Row row : resultSet) { + // Use the constructor of MessageReceived to initialize the object + MessageReceived message = new MessageReceived( + row.getUuid(Schema.MESSAGE_ID).toString(), // message ID + row.getString(Schema.RECIPIENT_ID), // recipient ID + row.getString(Schema.FROM_ID), // sender ID (user ID) + row.getString(Schema.FROM_NAME), // sender name + row.getList(Schema.RECIPIENTS, String.class), // list of recipients (addresses) + row.getString(Schema.SUBJECT), // subject of the message + row.getString(Schema.BODY), // body of the message + Date.from(row.getInstant(Schema.TIMESTAMP)), // timestamp of the message + row.getBoolean(Schema.WITH_ATTACH), // whether the message has attachments + row.getBoolean(Schema.ISREAD), // whether the message is read + row.getBoolean(Schema.ISOPENED), // whether the message is opened + row.getBoolean(Schema.ISDELETED) // whether the message is deleted + ); + + // Add the message to the list + messagesReceived.add(message); + } + + // Return the list of messages + return messagesReceived; + } + + protected List retrieveAttachmentsMessage(String messageId, String attachId, CqlSession session) { + // Inserting data if (session == null) session = conn.getKeyspaceSession(); + // an entry in the Attachment CF - List messages = new ArrayList(); + // Build the select query from the appropriate table (active or full received + // messages) + Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_ATTACHMENTS) + .all() + .whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); - String query = "SELECT message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted " - + "FROM messages_received WHERE recipientid = ? "; + // Add a condition on attachName if provided + if (attachId != null) { + select = select.whereColumn(Schema.ATTACH_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(attachId))); - ArrayList params = new ArrayList<>(); - params.add(recipientId); - - if (messageId != null) { - query += " AND messageId = ? "; - params.add(messageId); + select = select.allowFiltering(); } - if (timestamp != null) { - query += " AND timestamp <= ? "; - params.add(timestamp); - } - - if (!include_deleted) { - query += " AND deleted = false "; - } - - if (limit != null && limit > 0) { - query += " LIMIT ? "; - - params.add(limit); - } - - SimpleStatement statement = SimpleStatement.builder(query) - .addPositionalValues(params) - .build(); + // Converte la query in SimpleStatement + SimpleStatement statement = select.build(); ResultSet resultSet = session.execute(statement); + List attachments = new ArrayList<>(); + + // Iterate through the result set and create MessageReceived objects for (Row row : resultSet) { - UUID message_id = row.getUuid("message_id"); - String senderId = row.getString("userid"); - String userName = row.getString("user_name"); - List addresses = row.getList("addresses", String.class); - String subject = row.getString("subject"); - String body = row.getString("body"); - Date msgTimestamp = Date.from(row.getInstant("timestamp")); - boolean withAttachments = row.getBoolean("with_attachments"); - boolean isRead = row.getBoolean("read"); - boolean isOpened = row.getBoolean("opened"); - boolean isDeleted = row.getBoolean("deleted"); + // Use the constructor of MessageReceived to initialize the object + Attachment attach = new Attachment( + // String id, + // String uri, + // String name, + // String description, + // String thumbnailURL, + // String mimeType - MessageReceived message = new MessageReceived( - message_id.toString(), - recipientId, - senderId, - userName, - addresses, - subject, - body, - msgTimestamp, - withAttachments, - isRead, - isOpened, - isDeleted); + // row.getUuid(Schema.MESSAGE_ID).toString(), // message ID + row.getUuid(Schema.ATTACH_ID).toString(), // message ID + row.getString(Schema.URI), // file uri + row.getString(Schema.NAME), // file name + row.getString(Schema.DESCRIPTION), // file description + row.getString(Schema.URI_THUMBNAIL), // file uri + row.getString(Schema.MIME_TYPE) // mime type + ); - messages.add(message); + // Add the message to the list + attachments.add(attach); } - return messages; + // Return the list of messages + return attachments; } - protected MessageSent getSentMessageById(String userId, String messageId, Boolean include_deleted, + protected MessageSent getSentMessageById(String fromId, String messageId, Boolean filter_deleted, + Boolean filter_read, CqlSession session) { - List messages = getSentMessagesByUser(userId, messageId, null, 1, include_deleted, session); + List messages = getSentMessagesBySender(fromId, messageId, null, 1, filter_deleted, null, session); if (messages.size() > 0) return messages.get(0); else return null; } - public MessageReceived getReceivedMessageById(String recipientId, String messageId, Boolean include_deleted, + public MessageReceived getReceivedMessageById(String recipientId, String messageId, Boolean filter_deleted, + Boolean filter_read, CqlSession session) { - List messages = getReceivedMessagesByUser(recipientId, messageId, null, 1, include_deleted, - session); + List messages = getReceivedMessagesByRecipient(recipientId, messageId, null, 1, filter_deleted, + filter_read, session); if (messages.size() > 0) return messages.get(0); else @@ -4714,7 +4856,7 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { * @param session * @return */ - protected boolean checkAndDeleteAttachmenForDeletedMessages(Message message, CqlSession session) { + protected boolean deleteAttachmentForDeletedMessages(Message message, CqlSession session) { if (!message.isWith_attachments()) { return false; @@ -4726,14 +4868,15 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { // search for not deleted message_sent. if we found it, we cannot delete // attachments - MessageSent messageSent = getSentMessageById(message.getUserid(), message.getId(), false, session); + MessageSent messageSent = getSentMessageById(message.getUserid(), message.getId(), false, null, session); if (messageSent != null) { return false; } // search for not deleted message_received. if we found any, we cannot delete // attachments - List messagesReceived = getReceivedMessagesByUser(null, message.getId(), null, 1, false, + List messagesReceived = getReceivedMessagesByRecipient(null, message.getId(), null, 1, false, + null, session); if (messagesReceived.size() > 0) { return false; @@ -4744,27 +4887,73 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { return false; } - protected MessageSent deleteMessageSent(String userId, String messageId, Boolean checkAttachmentDeletion, + @Override + public Message deleteMessage(String messageId) { + CqlSession session = conn.getKeyspaceSession(); + + MessageSent message = getSentMessageById(null, messageId, false, null, session); + for (String recipientId : message.getAddresses()) { + deleteMessageReceived(recipientId, messageId, false, session); + } + + deleteMessageSent(message.getUserid(), messageId, true, session); + return getSentMessageById(null, messageId, null, null, session); + } + + protected MessageSent deleteMessageSent(String fromId, String messageId, Boolean checkAttachmentDeletion, CqlSession session) { if (session == null) { session = conn.getKeyspaceSession(); } - - SimpleStatement updateStatement = SimpleStatement.builder( - "UPDATE messages_sent SET deleted = true WHERE userid = ? AND message_id = ?") - .addPositionalValues(userId, UUID.fromString(messageId)) - .build(); - - session.execute(updateStatement); - - MessageSent message = getSentMessageById(userId, messageId, true, session); - - if (checkAttachmentDeletion && message.isWith_attachments()) { - checkAndDeleteAttachmenForDeletedMessages(message, session); + // Recupera il messaggio per ottenere il timestamp necessario per l'update + MessageSent sentMessage = getSentMessageById(fromId, messageId); + if (sentMessage == null) { + throw new IllegalArgumentException( + "Message not found with fromId: " + fromId + " and messageId: " + messageId); } - return message; + return deleteMessageSent(sentMessage, checkAttachmentDeletion, session); + } + + protected MessageSent deleteMessageSent(MessageSent sentMessage, Boolean checkAttachmentDeletion, + CqlSession session) { + + if (session == null) { + session = conn.getKeyspaceSession(); + } + // Recupera il messaggio per ottenere il timestamp necessario per l'update + if (sentMessage == null) { + throw new IllegalArgumentException( + "Message not found"); + } + + // Costruisci la query di aggiornamento usando il QueryBuilder + Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_SENT) + .setColumn(Schema.ISDELETED, QueryBuilder.literal(true)) // Imposta il campo 'read' al valore fornito + .whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(sentMessage.getUserid())) // Filtro per + // 'from_id' + .whereColumn(Schema.TIMESTAMP).isEqualTo(QueryBuilder.literal(sentMessage.getTimestamp().toInstant())) + .whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(sentMessage.getUUID())); // Filtro + // per + // 'message_id' + + SimpleStatement statement = update.build(); + String updateString = statement.getQuery(); + + // Esegui la query di aggiornamento + session.execute(statement); + + // Retrieve the message after marking it as deleted + sentMessage = getSentMessageById(sentMessage.getUserid(), sentMessage.getId(), true, null, session); + + // Check if attachment deletion is required and the message has attachments + if (checkAttachmentDeletion && sentMessage.isWith_attachments()) { + deleteAttachmentForDeletedMessages(sentMessage, session); + } + + // Return the deleted message + return sentMessage; } protected MessageReceived deleteMessageReceived(String recipientId, String messageId, @@ -4772,76 +4961,103 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { if (session == null) { session = conn.getKeyspaceSession(); } - - SimpleStatement updateStatement = SimpleStatement.builder( - "UPDATE messages_received SET deleted = true WHERE recipientid = ? AND message_id = ?") - .addPositionalValues(recipientId, UUID.fromString(messageId)) - .build(); - - session.execute(updateStatement); - - MessageReceived message = getReceivedMessageById(recipientId, messageId, true, session); - - if (checkAttachmentDeletion && message.isWith_attachments()) { - checkAndDeleteAttachmenForDeletedMessages(message, session); + // Recupera il messaggio per ottenere il timestamp necessario per l'update + MessageReceived receivedMessage = getReceivedMessageById(recipientId, messageId); + if (receivedMessage == null) { + throw new IllegalArgumentException( + "Received Message not found with fromId: " + recipientId + " and messageId: " + messageId); } - return message; + + return deleteMessageReceived(receivedMessage, checkAttachmentDeletion, session); + } - protected MessageSent saveSentMessage(Message message, CqlSession session) { - MessageSent messageSent = new MessageSent(message, false, false, false); + protected MessageReceived deleteMessageReceived(MessageReceived receivedMessage, Boolean checkAttachmentDeletion, + CqlSession session) { + if (session == null) { + session = conn.getKeyspaceSession(); + } + // Recupera il messaggio per ottenere il timestamp necessario per l'update + if (receivedMessage == null) { + throw new IllegalArgumentException( + "Message not found"); + } - SimpleStatement statement = SimpleStatement.builder( - "INSERT INTO messages_sent (message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted) " - + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - .addPositionalValues( - messageSent.getId(), - messageSent.getUserid(), - messageSent.getUser_name(), - messageSent.getAddresses(), - messageSent.getSubject(), - messageSent.getBody(), - messageSent.getTimestamp(), - messageSent.isWith_attachments(), - messageSent.isRead(), - messageSent.isOpened(), - messageSent.isDeleted()) - .build(); + // Costruisci la query di aggiornamento usando il QueryBuilder + Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_RECEIVED) + .setColumn(Schema.ISDELETED, QueryBuilder.literal(true)) // Imposta il campo 'read' al valore fornito + .whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(receivedMessage.getRecipientid())) // Filtro + // per + // 'from_id' + .whereColumn(Schema.TIMESTAMP) + .isEqualTo(QueryBuilder.literal(receivedMessage.getTimestamp().toInstant())) + .whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(receivedMessage.getUUID())); // Filtro + // per + // 'message_id' + SimpleStatement statement = update.build(); + String updateString = statement.getQuery(); + + // Esegui la query di aggiornamento session.execute(statement); - _log.debug("Wrote sent message with id " + messageSent.getId()); + // Retrieve the message after marking it as deleted + receivedMessage = getReceivedMessageById(receivedMessage.getRecipientid(), receivedMessage.getId(), true, null, + session); - return messageSent; + // Check if attachment deletion is required and the message has attachments + if (checkAttachmentDeletion && receivedMessage.isWith_attachments()) { + deleteAttachmentForDeletedMessages(receivedMessage, session); + } + + // Return the deleted message + return receivedMessage; } - protected MessageReceived saveReceivedMessage(Message message, String recipientid, CqlSession session) { - MessageReceived messageReceived = new MessageReceived(message, recipientid, false, false, false); + protected boolean saveSentMessage(Message message, CqlSession session) { + // MessageSent messageSent = new MessageSent(message, false, false, false); - SimpleStatement statement = SimpleStatement.builder( - "INSERT INTO messages_received (message_id, recipientid, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted) " - + - "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") - .addPositionalValues( - messageReceived.getId(), - messageReceived.getRecipientid(), - messageReceived.getUserid(), - messageReceived.getUser_name(), - messageReceived.getAddresses(), - messageReceived.getSubject(), - messageReceived.getBody(), - messageReceived.getTimestamp(), - messageReceived.isWith_attachments(), - messageReceived.isRead(), - messageReceived.isOpened(), - messageReceived.isDeleted()) - .build(); + RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_SENT) + .value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(message.getId()))) + .value(Schema.FROM_ID, QueryBuilder.literal(message.getUserid())) + .value(Schema.FROM_NAME, QueryBuilder.literal(message.getUser_name())) + .value(Schema.RECIPIENTS, QueryBuilder.literal(message.getAddresses())) + .value(Schema.SUBJECT, QueryBuilder.literal(message.getSubject())) + .value(Schema.BODY, QueryBuilder.literal(message.getBody())) + .value(Schema.TIMESTAMP, QueryBuilder.literal(message.getTimestamp().toInstant())) + .value(Schema.ISREAD, QueryBuilder.literal(false)) + .value(Schema.ISOPENED, QueryBuilder.literal(false)) + .value(Schema.ISDELETED, QueryBuilder.literal(false)); - session.execute(statement); + session.execute(insert.build()); - _log.debug("Wrote sent message with id " + messageReceived.getId()); + _log.debug("Wrote sent message with id " + message.getId()); - return messageReceived; + return true; + } + + protected boolean saveReceivedMessage(Message message, String recipientId, CqlSession session) { + // MessageReceived messageReceived = new MessageReceived(message, recipientId, + // false, false, false); + + RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_RECEIVED) + .value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(message.getId()))) + .value(Schema.RECIPIENT_ID, QueryBuilder.literal(recipientId)) + .value(Schema.FROM_ID, QueryBuilder.literal(message.getUserid())) + .value(Schema.FROM_NAME, QueryBuilder.literal(message.getUser_name())) + .value(Schema.RECIPIENTS, QueryBuilder.literal(message.getAddresses())) + .value(Schema.SUBJECT, QueryBuilder.literal(message.getSubject())) + .value(Schema.BODY, QueryBuilder.literal(message.getBody())) + .value(Schema.TIMESTAMP, QueryBuilder.literal(message.getTimestamp().toInstant())) + .value(Schema.ISREAD, QueryBuilder.literal(false)) + .value(Schema.ISOPENED, QueryBuilder.literal(false)) + .value(Schema.ISDELETED, QueryBuilder.literal(false)); + + session.execute(insert.build()); + + _log.debug("Wrote sent message with id " + message.getId()); + + return true; } protected boolean saveAttachmentMessageEntry(String messageKey, Attachment toSave, CqlSession session) { @@ -4849,24 +5065,20 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { if (session == null) session = conn.getKeyspaceSession(); // an entry in the Attachment CF + try { - List boundStatements = insertIntoAttachments(session, toSave, "message_" + messageKey); - BatchStatement writeBatch = getBatch().addAll(boundStatements); - // boundStatements.forEach(stmt->writeBatch.add(stmt)); - ResultSet res = session.execute(writeBatch); - _log.debug(res.getExecutionInfos().toString()); - _log.debug("" + res.wasApplied()); - /* - * session.execute(createNewaAttachEntry(session).bind( - * UUID.fromString(toSave.getId()), - * UUID.fromString(feedId), - * toSave.getUri(), - * toSave.getName(), - * toSave.getDescription(), - * toSave.getThumbnailURL(), - * toSave.getMimeType() - * )); - */ + RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_ATTACHMENTS) + .value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(messageKey))) + .value(Schema.ATTACH_ID, QueryBuilder.literal(UUID.fromString(toSave.getId()))) + .value(Schema.URI, QueryBuilder.literal(toSave.getUri())) + .value(Schema.NAME, QueryBuilder.literal(toSave.getName())) + .value(Schema.DESCRIPTION, QueryBuilder.literal(toSave.getDescription())) + .value(Schema.URI_THUMBNAIL, QueryBuilder.literal(toSave.getThumbnailURL())) + .value(Schema.MIME_TYPE, QueryBuilder.literal(toSave.getMimeType())); + + session.execute(insert.build()); + + _log.debug("Wrote attachment " + toSave.getName() + " for message with id " + messageKey); } catch (Exception e) { @@ -4876,78 +5088,114 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { return true; } - protected MessageSent saveNewMessage(Message message, List attachments, CqlSession session) { + protected boolean saveNewMessage(Message message, List attachments, CqlSession session) + throws IllegalArgumentException { + + if (message.getId() == null) + message.setId(Message.generateUUID()); // Generate UUID for the message + + boolean has_attachments = attachments != null && !attachments.isEmpty(); + message.setWith_attachments(has_attachments); + if (message.getTimestamp() == null) { + message.setTimestamp(new Date()); + } + + if (message == null) { + throw new IllegalArgumentException("message cannot be null or empty"); + } + + if (message.getUserid() == null) { + throw new IllegalArgumentException("message from cannot be null"); + } + + if (message.getId() == null) { + throw new IllegalArgumentException("message id cannot be null"); + } + + if (message.getAddresses() == null || message.getAddresses().isEmpty()) { + throw new IllegalArgumentException("Recipient addresses cannot be null or empty"); + } + if (session == null) session = conn.getKeyspaceSession(); - MessageSent sentMessage = saveSentMessage(message, session); - if (sentMessage == null) { + boolean sent = saveSentMessage(message, session); + if (!sent) { _log.error("Sent Message failed to be saved: from " + message.getUserid()); - return null; + return false; } ArrayList messagesReceived = new ArrayList<>(); for (String recipientId : message.getAddresses()) { - MessageReceived received = saveReceivedMessage(message, recipientId, session); + boolean received = saveReceivedMessage(message, recipientId, session); - if (received == null) { + if (!received) { _log.error("Received message failed to be saved: from " + message.getUserid()); // TODO: Manage error } - messagesReceived.add(received); + // messagesReceived.add(received); } _log.debug("Message " + message.getId() + "has been saved"); String postkey = message.getKey(); - for (Attachment attachment : attachments) { - boolean attachSaveResult = saveAttachmentMessageEntry(postkey, attachment, session); - if (!attachSaveResult) - _log.error("Some of the attachments failed to be saved: " + attachment.getName()); - else { - _log.debug("Attachment " + attachment.getName() + "has been saved for message " + message.getId()); + if (attachments != null) { + for (Attachment attachment : attachments) { + boolean attachSaveResult = saveAttachmentMessageEntry(postkey, attachment, session); + if (!attachSaveResult) + _log.error("Some of the attachments failed to be saved: " + attachment.getName()); + else { + _log.debug("Attachment " + attachment.getName() + "has been saved for message " + message.getId()); + } } } - return sentMessage; + return true; } @Override - public MessageSent sendMessage(String fromId, List addresses, String subject, String body, - List attachments) { + public Message sendMessage(String fromId, List addresses, String subject, String body, + List attachments) throws IllegalArgumentException { String messageId = UUID.randomUUID().toString(); // Generate UUID for the message boolean has_attachments = attachments != null && !attachments.isEmpty(); Message message = new Message(messageId, fromId, null, addresses, subject, body, new Date(), has_attachments); - MessageSent sent = saveNewMessage(message, attachments, null); - if (sent == null) - return null; - return sent; + boolean saved = saveNewMessage(message, attachments, null); + + return message; + } + + public Message sendMessage(Message message, + List attachments, CqlSession session) throws IllegalArgumentException { + + boolean saved = saveNewMessage(message, attachments, session); + + return message; } @Override - public List getSentMessagesByUser(String userId, Date timestamp, Integer limit) { - return getSentMessagesByUser(userId, null, timestamp, limit, false, null); + public List getSentMessagesBySender(String userId, Date timestamp, Integer limit) { + return getSentMessagesBySender(userId, null, timestamp, limit, false, null, null); } @Override - public List getReceivedMessagesByUser(String recipientId, Date timestamp, Integer limit) { - return getReceivedMessagesByUser(recipientId, null, timestamp, limit, false, null); + public List getReceivedMessagesByRecipient(String recipientId, Date timestamp, Integer limit) { + return getReceivedMessagesByRecipient(recipientId, null, timestamp, limit, false, null, null); } @Override - public MessageSent getSentMessageById(String userId, String messageId) { - return getSentMessageById(userId, messageId, false, null); + public MessageSent getSentMessageById(String fromId, String messageId) { + return getSentMessageById(fromId, messageId, false, null, null); } @Override public MessageReceived getReceivedMessageById(String recipientId, String messageId) { - return getReceivedMessageById(recipientId, messageId, false, null); + return getReceivedMessageById(recipientId, messageId, false, null, null); } @Override - public MessageSent deleteMessageSent(String userId, String messageId) { - return deleteMessageSent(userId, messageId, true, null); + public MessageSent deleteMessageSent(String fromId, String messageId) { + return deleteMessageSent(fromId, messageId, true, null); } @Override @@ -4956,57 +5204,84 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore { } @Override - public MessageSent setSentMessageRead(String userId, String messageId, boolean set_read) { + public MessageSent setSentMessageRead(String fromId, String messageId, boolean set_read) { CqlSession session = conn.getKeyspaceSession(); - SimpleStatement updateStatement = SimpleStatement.builder( - "UPDATE messages_sent SET read = ? WHERE userid = ? AND message_id = ?") - .addPositionalValues(set_read, userId, UUID.fromString(messageId)) - .build(); + // Recupera il messaggio per ottenere il timestamp necessario per l'update + MessageSent sentMessage = getSentMessageById(fromId, messageId); - session.execute(updateStatement); + if (sentMessage == null) { + throw new IllegalArgumentException( + "Message not found with fromId: " + fromId + " and messageId: " + messageId); + } - return getSentMessageById(userId, messageId, true, session); + // Costruisci la query di aggiornamento usando il QueryBuilder + Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_SENT) + .setColumn(Schema.ISREAD, QueryBuilder.literal(set_read)) // Imposta il campo 'read' al valore fornito + .whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(fromId)) // Filtro per 'from_id' + .whereColumn(Schema.TIMESTAMP).isEqualTo(QueryBuilder.literal(sentMessage.getTimestamp().toInstant())) + .whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); // Filtro + // per + // 'message_id' + + SimpleStatement statement = update.build(); + String updateString = statement.getQuery(); + + // Esegui la query di aggiornamento + session.execute(statement); + + // Ritorna il messaggio aggiornato + return getSentMessageById(fromId, messageId, null, null, session); } @Override public MessageReceived setReceivedMessageRead(String recipientId, String messageId, boolean set_read) { CqlSession session = conn.getKeyspaceSession(); + MessageReceived receivedMessage = getReceivedMessageById(recipientId, messageId); - SimpleStatement updateStatement = SimpleStatement.builder( - "UPDATE messages_received SET read = ? WHERE recipientid = ? AND message_id = ?") - .addPositionalValues(set_read, recipientId, UUID.fromString(messageId)) - .build(); + // Build the update query using QueryBuilder + Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_RECEIVED) + .setColumn(Schema.ISREAD, QueryBuilder.literal(set_read)) // Set the 'read' column to the provided value + .whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(recipientId)) + .whereColumn(Schema.TIMESTAMP) + .isEqualTo(QueryBuilder.literal(receivedMessage.getTimestamp().toInstant())) + .whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); - session.execute(updateStatement); + // Execute the update query + session.execute(update.build()); - return getReceivedMessageById(recipientId, messageId, true, session); + // Return the updated message + return getReceivedMessageById(recipientId, messageId); + + // Return the updated message + // return getReceivedMessageById(recipientId, messageId, null, null, session); } @Override - public Attachment getMessageAttachmentById(String messageid) { - // TODO Auto-generated method stub - throw new UnsupportedOperationException("Unimplemented method 'getMessageAttachmentById'"); - } + public Attachment getMessageAttachmentById(String messageid, String attachid) { - @Override - public Message getMessageById(String messageId) { - return getSentMessageById(null, messageId); - } - - @Override - public Message deleteMessage(String messageId) { - CqlSession session = conn.getKeyspaceSession(); - - MessageSent message = getSentMessageById(null, messageId, true, session); - for (String recipientId : message.getAddresses()) { - deleteMessageReceived(recipientId, messageId, false, session); + if (attachid == null) { + throw new NullArgumentException("the timeInMillis must be before today"); + } + List results = retrieveAttachmentsMessage(messageid, attachid, null); + if (results.size() > 1) { + throw new IllegalArgumentException("more than 1 attachment retrieved"); } - deleteMessageSent(message.getUserid(), messageId, true, session); - getSentMessageById(null, messageId); - - return getSentMessageById(null, messageId, true, session); + if (results.size() > 0) { + return results.get(0); + } + return null; } + @Override + public List getMessageAttachmentsById(String messageId) { + return retrieveAttachmentsMessage(messageId, null, null); + } + + // @Override + // public Message getMessageById(String messageId) { + // return getSentMessageById(null, messageId); + // } + } diff --git a/src/main/java/org/gcube/portal/databook/server/DatabookCassandraTest.java b/src/main/java/org/gcube/portal/databook/server/DatabookCassandraTest.java index 8d02836..0edc714 100644 --- a/src/main/java/org/gcube/portal/databook/server/DatabookCassandraTest.java +++ b/src/main/java/org/gcube/portal/databook/server/DatabookCassandraTest.java @@ -1,9 +1,6 @@ package org.gcube.portal.databook.server; -import java.util.UUID; - import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Test; public class DatabookCassandraTest { diff --git a/src/main/java/org/gcube/portal/databook/server/DatabookStore.java b/src/main/java/org/gcube/portal/databook/server/DatabookStore.java index 11e42ae..28aebca 100644 --- a/src/main/java/org/gcube/portal/databook/server/DatabookStore.java +++ b/src/main/java/org/gcube/portal/databook/server/DatabookStore.java @@ -679,23 +679,30 @@ public interface DatabookStore { */ void closeConnection(); - public MessageSent sendMessage(String fromId, List addresses, String subject, String body, List attachments); + public Message sendMessage(String fromId, List addresses, String subject, String body, List attachments); - public List getSentMessagesByUser(String fromId, Date timestamp, Integer limit); - public List getReceivedMessagesByUser(String recipientId, Date timestamp, Integer limit); + public List getSentMessagesBySender(String fromId, Date timestamp, Integer limit); + public List getReceivedMessagesByRecipient(String recipientId, Date timestamp, Integer limit); - public Message getMessageById(String messageId); - public MessageSent getSentMessageById(String messageId, String fromId); + // public Message getMessageById(String messageId); + public MessageSent getSentMessageById(String fromId, String messageId); public MessageReceived getReceivedMessageById(String messageId, String recipientId); public Message deleteMessage(String messageId); - public MessageSent deleteMessageSent(String messageId, String fromId); + public MessageSent deleteMessageSent(String fromId, String messageId); public MessageReceived deleteMessageReceived(String messageId, String recipientId); - public MessageSent setSentMessageRead(String messageId, String fromId, boolean set_read); - public MessageReceived setReceivedMessageRead(String messageId, String recipientId, boolean set_read); + public MessageSent setSentMessageRead(String fromId, String messageId, boolean set_read); + public MessageReceived setReceivedMessageRead(String recipientId, String messageId, boolean set_read); - public Attachment getMessageAttachmentById(String messageId); + public Attachment getMessageAttachmentById(String messageId, String filename); + public List getMessageAttachmentsById(String messageId); + + public List getSentMessagesBySender(String fromId, String messageId, Date timestamp, Integer limit, + Boolean filter_deleted, Boolean filter_read, CqlSession session); + + public List getReceivedMessagesByRecipient(String recipientId, String messageId, Date timestamp, Integer limit, + Boolean filter_deleted, Boolean filter_read, CqlSession session); // public MessageSent saveSentMessage(Message message, CqlSession session); // public MessageReceived saveReceivedMessage(Message message, String recipientId, CqlSession session); diff --git a/src/main/java/org/gcube/portal/databook/server/RunningCluster.java b/src/main/java/org/gcube/portal/databook/server/RunningCluster.java index 5dfd9b3..c753533 100644 --- a/src/main/java/org/gcube/portal/databook/server/RunningCluster.java +++ b/src/main/java/org/gcube/portal/databook/server/RunningCluster.java @@ -58,6 +58,22 @@ public class RunningCluster implements Serializable { private static final String DEFAULT_CONFIGURATION = "/org/gcube/portal/databook/server/resources/databook.properties"; private static RunningCluster singleton; + + public static synchronized RunningCluster setCustomizedInstance(String host, String datacenterName, + String keyspaceName) { + singleton = new RunningCluster(host, datacenterName, keyspaceName); + + String[] params = { + singleton.host, singleton.datacenterName, singleton.keyspaceName + }; + + _log.info( + "socialdb will use custom configuration host:{}, datacenter:{}, keyspace: {}", + params); + + return singleton; + } + /** * Host */ @@ -158,6 +174,10 @@ public class RunningCluster implements Serializable { } } catch (Exception e) { e.printStackTrace(); + + _log.error("Error getting configuration from IS: {} - {}", e.getMessage()); + _log.error("Error {}", e); + // throw e; } /* diff --git a/src/main/java/org/gcube/portal/databook/server/Schema.java b/src/main/java/org/gcube/portal/databook/server/Schema.java index f787c2d..a4e75d2 100644 --- a/src/main/java/org/gcube/portal/databook/server/Schema.java +++ b/src/main/java/org/gcube/portal/databook/server/Schema.java @@ -1,5 +1,7 @@ package org.gcube.portal.databook.server; +import com.datastax.oss.driver.api.core.CqlIdentifier; + /** * @author Massimiliano Assante ISTI-CNR * @author Ahmed Ibrahim ISTI-CNR @@ -73,4 +75,51 @@ public class Schema { public static final String COMMENTS_NO = "commentsno"; //big int public static final String LINK_TITLE = "linktitle"; //text + // messages + + + public static final String TABLE_MESSAGES_SENT = "messages_sent"; + public static final String TABLE_MESSAGES_RECEIVED = "messages_received"; + public static final String TABLE_MESSAGES_ATTACHMENTS = "messages_attachments"; + + public static final String IDX_MESSAGES_SENT_ID = "message_sent_id_idx"; + public static final String IDX_MESSAGES_SENT_READ = "message_sent_read_idx"; + public static final String IDX_MESSAGES_SENT_DELETED = "message_sent_deleted_idx"; + public static final String IDX_MESSAGES_SENT_TIMESTAMP = "message_sent_timestamp_idx"; + + + public static final String IDX_MESSAGES_RECEIVED_ID = "message_received_id_idx"; + public static final String IDX_MESSAGES_RECEIVED_READ = "message_received_read_idx"; + public static final String IDX_MESSAGES_RECEIVED_DELETED = "message_received_deleted_idx"; + public static final String IDX_MESSAGES_RECEIVED_TIMESTAMP = "message_received_timestamp_idx"; + + + + public static final String TABLE_MESSAGES_SENT_ACTIVE = "messages_active_sent"; + public static final String TABLE_MESSAGES_RECEIVED_ACTIVE = "messages_active_received"; + public static final String TABLE_MESSAGES_SENT_READ = "messages_unread_sent"; + public static final String TABLE_MESSAGES_RECEIVED_READ = "messages_unread_received"; + + + + // public static final String URI = "uri"; //text + // public static final String URI_THUMBNAIL = "urithumbnail"; //text + // public static final String NAME = "name"; //text + // public static final String DESCRIPTION = "description"; //text + // public static final String MIME_TYPE = "mimetype"; //text + + + public static final String FROM_ID = "from_id"; //text + public static final String FROM_NAME = "from_name"; //text + // public static final String TIMESTAMP = "timestamp"; //text + public static final String MESSAGE_ID = "message_id"; //text + public static final String RECIPIENT_ID = "recipient_id"; //text + public static final String RECIPIENTS = "addresses"; //text + // public static final String RECIPIENTS = "recipients"; //text + public static final String SUBJECT = "subject"; //text + public static final String BODY = "body"; //text + public static final String WITH_ATTACH = "with_attachments"; //text + public static final String ISREAD = "read"; //text + public static final String ISOPENED = "opened"; //text + public static final String ISDELETED = "deleted"; //text } \ No newline at end of file diff --git a/src/main/java/org/gcube/portal/databook/server/Tester.java b/src/main/java/org/gcube/portal/databook/server/Tester.java deleted file mode 100644 index 87182d4..0000000 --- a/src/main/java/org/gcube/portal/databook/server/Tester.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.gcube.portal.databook.server; - -import org.gcube.portal.databook.shared.*; -import org.gcube.portal.databook.shared.ex.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * @author Massimiliano Assante ISTI-CNR - * @author Ahmed Ibrahim ISTI-CNR - * - * @version 2.0.0 October 2023 - * - */ -public class Tester { - private static DBCassandraAstyanaxImpl store; - private static Logger LOGGER = LoggerFactory.getLogger(Tester.class); - - public Tester() { - store = new DBCassandraAstyanaxImpl("gcube"); //set to true if you want to drop the KeySpace and recreate it - - } - - public static void main(String[] args) throws ColumnNameNotFoundException, PrivacyLevelTypeNotFoundException, FeedIDNotFoundException, FeedTypeNotFoundException { - Tester test = new Tester(); - //test.getComment(); - test.testFunc(); - System.exit(0); - - } - public void testFunc() throws ColumnNameNotFoundException, PrivacyLevelTypeNotFoundException, FeedIDNotFoundException, FeedTypeNotFoundException { - String postIdToUpdate = "047c601d-2291-4974-9224-d6732b1fbe26"; - Post read = store.readPost(postIdToUpdate); - - List readC = store.getAllCommentByPost("047c601d-2291-4974-9224-d6732b1fbe26"); - System.out.println(read); - readC.forEach(c -> System.out.println(c.getText())); - } - public void getComment(){ - String uuid = "820969b2-4632-4197-9fd6-5aafab781faa"; - - Comment c; - try { - c = store.readCommentById(uuid); - System.out.println(c); - } catch (CommentIDNotFoundException e) { - // TODO Auto-generated catch block - System.err.println(e.toString()); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/gcube/portal/databook/shared/Message.java b/src/main/java/org/gcube/portal/databook/shared/Message.java index 6ae1c07..63505ec 100644 --- a/src/main/java/org/gcube/portal/databook/shared/Message.java +++ b/src/main/java/org/gcube/portal/databook/shared/Message.java @@ -3,6 +3,7 @@ package org.gcube.portal.databook.shared; import java.io.Serializable; import java.util.Date; import java.util.List; +import java.util.UUID; /** * @@ -57,6 +58,10 @@ public class Message implements Serializable, Comparable { // protected Date creation_time; protected Date timestamp; + public UUID getUUID() { + return UUID.fromString(getId()); + } + public String getId() { return id; } @@ -130,6 +135,10 @@ public class Message implements Serializable, Comparable { super(); } + public static String generateUUID(){ + UUID uuid = UUID.randomUUID(); + return uuid.toString(); + } public Message(String id, String userid, String user_name, List addresses, String subject, String body, Date timestamp, boolean with_attachments ) { @@ -139,7 +148,7 @@ public class Message implements Serializable, Comparable { this.user_name = user_name; this.addresses = addresses; this.subject = subject; - this.body = subject; + this.body = body; this.timestamp = timestamp; this.with_attachments = with_attachments; } diff --git a/src/test/java/org/gcube/portal/databook/server/BaseDbTest.java b/src/test/java/org/gcube/portal/databook/server/BaseDbTest.java index 0f3cade..03e0616 100644 --- a/src/test/java/org/gcube/portal/databook/server/BaseDbTest.java +++ b/src/test/java/org/gcube/portal/databook/server/BaseDbTest.java @@ -69,7 +69,7 @@ public class BaseDbTest { assertNotNull(store); } public CassandraClusterConnection getConnection() throws Exception { - return new CassandraClusterConnection(false, null); + return new CassandraClusterConnection(false, false, null); } @Test diff --git a/src/test/java/org/gcube/portal/databook/server/MessagesTest.java b/src/test/java/org/gcube/portal/databook/server/MessagesTest.java new file mode 100644 index 0000000..e2a62e2 --- /dev/null +++ b/src/test/java/org/gcube/portal/databook/server/MessagesTest.java @@ -0,0 +1,662 @@ +package org.gcube.portal.databook.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import org.gcube.portal.databook.shared.Attachment; +import org.gcube.portal.databook.shared.Message; +import org.gcube.portal.databook.shared.MessageReceived; +import org.gcube.portal.databook.shared.MessageSent; +import org.gcube.portal.databook.shared.Notification; +import org.gcube.portal.databook.shared.ex.NotificationTypeNotFoundException; +import org.junit.Before; +import org.junit.Test; + +import com.datastax.oss.driver.api.core.CqlSession; + +public class MessagesTest extends BaseDbTest{ + + private CassandraClusterConnection connection; + private DBCassandraAstyanaxImpl store; + private String testFrom; + private String recipient; + private String recipient2; + private String subject; + private String body; + private List recipients; + + @Before + public void setUp() throws Exception { + // connection = DbTest.getConnection(); + testFrom = "alfredo.oliviero@isti.cnr.it"; + recipient = "massimiliano.assante@isti.cnr.it"; + recipient2 = "andrea.rossi@isti.cnr.it"; + subject = "test subject"; + body = "test body"; + recipients = Arrays.asList(recipient, recipient2); + } + + @Test + public void empty() { + System.out.print("Testing works"); + return; + } + + @Test + public void testCreateMessage() throws Exception { + Message message = store.sendMessage(testFrom, recipients, subject, body, null); + + assertNotNull("Message should not be null after creation", message); + assertEquals("User ID does not match", testFrom, message.getUserid()); + assertEquals("Recipient does not match", recipient, message.getAddresses().get(0)); + assertEquals("Subject does not match", subject, message.getSubject()); + assertEquals("Body does not match", body, message.getBody()); + + MessageSent messageSent = store.getSentMessageById(testFrom, message.getId()); + + assertNotNull("Sent message should not be null", messageSent); + assertEquals("User ID of sent message does not match", testFrom, messageSent.getUserid()); + assertEquals("Recipient of sent message does not match", recipient, messageSent.getAddresses().get(0)); + assertEquals("Subject of sent message does not match", subject, messageSent.getSubject()); + assertEquals("Body of sent message does not match", body, messageSent.getBody()); + + MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId()); + + assertNotNull("Received message should not be null", messageReceived); + assertEquals("User ID of received message does not match", testFrom, messageReceived.getUserid()); + assertEquals("Recipient of received message does not match", recipient, messageReceived.getRecipientid()); + assertEquals("Subject of received message does not match", subject, messageReceived.getSubject()); + assertEquals("Body of received message does not match", body, messageReceived.getBody()); + + MessageReceived messageReceived2 = store.getReceivedMessageById(recipient2, message.getId()); + + assertNotNull("Received2 message should not be null", messageReceived2); + assertEquals("User ID of received2 message does not match", testFrom, messageReceived2.getUserid()); + assertEquals("Recipient of received2 message does not match", recipient2, messageReceived2.getRecipientid()); + assertEquals("Subject of received2 message does not match", subject, messageReceived2.getSubject()); + assertEquals("Body of received2 message does not match", body, messageReceived2.getBody()); + } + + @Test + public void testMessagesReceivedByRecipient() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testMessagesReceivedByRecipient", body, null); + + List receiveds = store.getReceivedMessagesByRecipient(recipient, null, null); + MessageReceived received = receiveds.get(0); + assertNotNull(received); + } + + @Test + public void testUnreadMessagesReceivedByRecipient() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testUnreadMessagesReceivedByRecipient", body, null); + + List receiveds = store.getReceivedMessagesByRecipient(recipient, null, null, 1, false, false, + null); + MessageReceived received = receiveds.get(0); + assertFalse(received.isRead()); + assertNotNull(received); + } + + @Test + public void testMessageReceivedById() throws Exception { + // String msg_id = "d78e096e-df88-4783-9e8e-32036158f54b"; + Message message = store.sendMessage(testFrom, recipients, "testMessageReceivedById", body, null); + String msg_id = message.getId(); + + MessageReceived received = store.getReceivedMessageById(recipient, msg_id); + + assertNotNull(received); + assertEquals(received.getId(), msg_id); + } + + @Test + public void testSetSentMessageReadUnread() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testSetSentMessageReadUnread", body, null); + assertNotNull(message); + + String recipient = recipients.get(0); + String recipient1 = recipients.get(1); + + MessageSent messageSent = store.getSentMessageById(testFrom, message.getId()); + MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId()); + MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageSent); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + MessageSent sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), true); + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(sentMessageUpdate); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertTrue(messageSent.isRead()); + assertTrue(sentMessageUpdate.isRead()); + + assertFalse(messageReceived.isRead()); + assertFalse(messageReceived1.isRead()); + + sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), false); + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(sentMessageUpdate); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertFalse(messageSent.isRead()); + assertFalse(sentMessageUpdate.isRead()); + + assertFalse(messageReceived.isRead()); + assertFalse(messageReceived1.isRead()); + + sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), true); + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(sentMessageUpdate); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertTrue(messageSent.isRead()); + assertTrue(sentMessageUpdate.isRead()); + + assertFalse(messageReceived.isRead()); + assertFalse(messageReceived1.isRead()); + + } + + @Test + public void testSetReceivedMessageReadUnread() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testSetReceivedMessageReadUnread", body, null); + assertNotNull(message); + + String recipient = recipients.get(0); + String recipient1 = recipients.get(1); + + MessageSent messageSent = store.getSentMessageById(testFrom, message.getId()); + MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId()); + MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageSent); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + // set read for recipient + MessageReceived messageUpdated = store.setReceivedMessageRead(recipient, message.getId(), true); + + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageUpdated); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertTrue(messageReceived.isRead()); + assertTrue(messageUpdated.isRead()); + + assertFalse(messageSent.isRead()); + assertFalse(messageReceived1.isRead()); + + // set not read for recipient + messageUpdated = store.setReceivedMessageRead(recipient, message.getId(), false); + + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageUpdated); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertFalse(messageReceived.isRead()); + assertFalse(messageUpdated.isRead()); + assertFalse(messageSent.isRead()); + assertFalse(messageReceived1.isRead()); + + // set read for recipient2 + messageUpdated = store.setReceivedMessageRead(recipient2, message.getId(), true); + + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageUpdated); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertTrue(messageReceived1.isRead()); + assertTrue(messageUpdated.isRead()); + + assertFalse(messageSent.isRead()); + assertFalse(messageReceived.isRead()); + + } + + @Test + public void testDeleteSentMessage() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testDeleteSentMessage", body, null); + assertNotNull(message); + + String recipient = recipients.get(0); + String recipient1 = recipients.get(1); + + MessageSent messageSent = store.getSentMessageById(testFrom, message.getId()); + MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId()); + MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageSent); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertFalse(messageSent.isDeleted()); + assertFalse(messageReceived.isDeleted()); + assertFalse(messageReceived.isDeleted()); + + MessageSent deletedMessage = store.deleteMessageSent(testFrom, message.getId()); + + assertNotNull(deletedMessage); + assertTrue(deletedMessage.isDeleted()); + + messageSent = store.getSentMessageById(testFrom, message.getId()); + + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNull(messageSent); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + } + + @Test + public void testDeleteReceivedMessage() throws Exception { + Message message = store.sendMessage(testFrom, recipients, "testDeleteReceivedMessage", body, null); + assertNotNull(message); + + String recipient = recipients.get(0); + String recipient1 = recipients.get(1); + + MessageSent messageSent = store.getSentMessageById(testFrom, message.getId()); + MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId()); + MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageSent); + assertNotNull(messageReceived); + assertNotNull(messageReceived1); + + assertFalse(messageSent.isDeleted()); + assertFalse(messageReceived.isDeleted()); + assertFalse(messageReceived.isDeleted()); + + MessageReceived deletedMessage = store.deleteMessageReceived(recipient, message.getId()); + + assertNotNull(deletedMessage); + assertTrue(deletedMessage.isDeleted()); + + messageSent = store.getSentMessageById(testFrom, message.getId()); + messageReceived = store.getReceivedMessageById(recipient, message.getId()); + messageReceived1 = store.getReceivedMessageById(recipient1, message.getId()); + + assertNotNull(messageSent); + assertNull(messageReceived); + assertNotNull(messageReceived1); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetInvalidIdMessage() { + store.getSentMessageById(testFrom, "not-valid-id"); + } + + @Test + public void testGetNonExistentMessage() { + MessageSent messageSent = store.getSentMessageById(testFrom, "ffffaaaa-9d90-4cc8-a634-bf158c7cc068"); + assertNull("Expected null for non-existent message", messageSent); + } + + @Test + public void testGetNonExistentFrom() { + MessageSent messageSent = store.getSentMessageById("aaaa", "ffffaaaa-9d90-4cc8-a634-bf158c7cc068"); + assertNull("Expected null for non-existent message", messageSent); + } + + // @Test + // public void testSaveNewMessageWithNullMessage() { + // store.saveNewMessage(null, null, null); // Message is null + // assertEquals("message cannot be null or empty", exception.getMessage()); + // } + + @Test(expected = IllegalArgumentException.class) + public void testSaveNewMessageWithNullUserId() { + store.sendMessage(null, recipients, subject, body, null); // User ID is null + } + + @Test(expected = IllegalArgumentException.class) + public void testSaveNewMessageWithNullAddresses() { + store.sendMessage(testFrom, null, subject, body, null); // Addresses are null + } + + @Test(expected = IllegalArgumentException.class) + public void testSaveNewMessageWithEmptyAddresses() { + store.sendMessage(testFrom, Arrays.asList(), subject, body, null); // Addresses are empty + } + + @Test + public void testGetSentMessagesOnlyNonDeleted() throws Exception { + // Invia un messaggio normale + Message message1 = store.sendMessage(testFrom, recipients, subject, body, null); + + // Invia un altro messaggio che verrĂ  cancellato + Message message2 = store.sendMessage(testFrom, recipients, subject + " deleted", body, null); + + // Cancella il secondo messaggio + store.deleteMessageSent(testFrom, message2.getId()); + + // Recupera tutti i messaggi inviati dall'utente + List sentMessages = store.getSentMessagesBySender(testFrom, null, 10); + + // Verifica che non contenga il messaggio cancellato + assertNotNull("Sent messages list should not be null", sentMessages); + assertFalse("Sent messages list should not contain deleted messages", + sentMessages.stream().anyMatch(m -> m.getId().equals(message2.getId()))); + + // Verifica che il messaggio non cancellato sia ancora presente + assertTrue("Sent messages list should contain non-deleted message", + sentMessages.stream().anyMatch(m -> m.getId().equals(message1.getId()))); + } + + @Test + public void testSentMessagesOrderedByTimestampDescending() throws Exception { + // Invia tre messaggi con timestamp differenti + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + Thread.sleep(1000); // Attendi un altro secondo + Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null); + + // Recupera tutti i messaggi inviati dall'utente + List sentMessages = store.getSentMessagesBySender(testFrom, null, 10); + + // Verifica che la lista non sia nulla e contenga almeno tre messaggi + assertNotNull("Sent messages list should not be null", sentMessages); + assertTrue("Sent messages list should contain at least 3 messages", sentMessages.size() >= 3); + + // Verifica che i messaggi siano ordinati in modo decrescente in base al + + // timestamp + Date timestamp1 = sentMessages.get(0).getTimestamp(); + Date timestamp2 = sentMessages.get(1).getTimestamp(); + Date timestamp3 = sentMessages.get(2).getTimestamp(); + + assertTrue("First message should be newer than second", timestamp1.after(timestamp2)); + assertTrue("Second message should be newer than third", timestamp2.after(timestamp3)); + } + + @Test + public void testReceivedMessagesOrderedByTimestampDescending() throws Exception { + // Invia tre messaggi con timestamp differenti + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + Thread.sleep(1000); // Attendi un altro secondo + Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null); + + // Recupera tutti i messaggi ricevuti dal destinatario + List receivedMessages = store.getReceivedMessagesByRecipient(recipient, null, 10); + + // Verifica che la lista non sia nulla e contenga almeno tre messaggi + assertNotNull("Received messages list should not be null", receivedMessages); + assertTrue("Received messages list should contain at least 3 messages", receivedMessages.size() >= 3); + + // Verifica che i messaggi siano ordinati in modo decrescente in base al + // timestamp + Date timestamp1 = receivedMessages.get(0).getTimestamp(); + Date timestamp2 = receivedMessages.get(1).getTimestamp(); + Date timestamp3 = receivedMessages.get(2).getTimestamp(); + + assertTrue("First message should be newer than second", timestamp1.after(timestamp2)); + assertTrue("Second message should be newer than third", timestamp2.after(timestamp3)); + } + + @Test + public void testUnreadMessagesOnlyReturned() throws Exception { + // Invia un messaggio normale + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + + // Invia un altro messaggio che verrĂ  marcato come letto + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + store.setReceivedMessageRead(recipient, message2.getId(), true); // Marca come letto + + // Recupera solo i messaggi non letti + List unreadMessages = store.getReceivedMessagesByRecipient(recipient, null, null, 10, false, + false, null); + + assertNotNull("Unread messages list should not be null", unreadMessages); + assertTrue("Unread messages list should contain only unread messages", + unreadMessages.stream().allMatch(m -> !m.isRead())); + } + + @Test + public void testMessagesWithLimitAndOrder() throws Exception { + // Invia tre messaggi + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + Thread.sleep(1000); // Attendi 1 secondo + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + Thread.sleep(1000); + Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null); + + // Recupera i primi due messaggi inviati, ordinati in modo decrescente per + // timestamp + List sentMessages = store.getSentMessagesBySender(testFrom, null, 2); + + assertNotNull("Sent messages list should not be null", sentMessages); + assertEquals("Sent messages list should contain 2 messages", 2, sentMessages.size()); + + // Verifica che i messaggi siano ordinati per timestamp decrescente + Date timestamp1 = sentMessages.get(0).getTimestamp(); + Date timestamp2 = sentMessages.get(1).getTimestamp(); + assertTrue("First message should be newer than second", timestamp1.after(timestamp2)); + } + + @Test + public void testRetrieveMessagesWithLimit() throws Exception { + int count = 4; + for (int i = 0; i < count; i++) { + store.sendMessage(testFrom, recipients, subject + " " + i, body, null); + } + ; + + // Recupera solo 2 messaggi + List sentMessages = store.getSentMessagesBySender(testFrom, null, 2); + assertNotNull("Sent messages list should not be null", sentMessages); + assertEquals("Only 2 messages should be returned", 2, sentMessages.size()); + } + + @Test + public void testGetSentMessagesBySenderWithTimestampFilter() throws Exception { + // Invia tre messaggi con timestamp differenti + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + Thread.sleep(1000); // Attendi un altro secondo + Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null); + + // Usa il timestamp del secondo messaggio per il test + Date filterTimestamp = message2.getTimestamp(); + + // Recupera tutti i messaggi con timestamp <= filterTimestamp + List sentMessages = store.getSentMessagesBySender(testFrom, filterTimestamp, 10); + + // Verifica che la lista non sia nulla + assertNotNull("Sent messages list should not be null", sentMessages); + + // Verifica che i messaggi abbiano timestamp <= filterTimestamp + for (MessageSent message : sentMessages) { + assertTrue("Message timestamp should be less than or equal to the filter timestamp", + message.getTimestamp().compareTo(filterTimestamp) <= 0); + } + + // Verifica che il terzo messaggio (con timestamp maggiore) non sia nella lista + assertFalse("Message with timestamp greater than filterTimestamp should not be returned", + sentMessages.stream().anyMatch(m -> m.getId().equals(message3.getId()))); + } + + @Test + public void testGetReceivedMessagesByRecipientWithTimestampFilter() throws Exception { + // Invia tre messaggi con timestamp differenti + Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null); + Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso + Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null); + Thread.sleep(1000); // Attendi un altro secondo + Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null); + + // Usa il timestamp del secondo messaggio per il test + Date filterTimestamp = message2.getTimestamp(); + + // Recupera tutti i messaggi ricevuti con timestamp <= filterTimestamp + List receivedMessages = store.getReceivedMessagesByRecipient(recipient, filterTimestamp, 10); + + // Verifica che la lista non sia nulla + assertNotNull("Received messages list should not be null", receivedMessages); + + // Verifica che i messaggi abbiano timestamp <= filterTimestamp + for (MessageReceived message : receivedMessages) { + assertTrue("Message timestamp should be less than or equal to the filter timestamp", + message.getTimestamp().compareTo(filterTimestamp) <= 0); + } + + // Verifica che il terzo messaggio (con timestamp maggiore) non sia nella lista + assertFalse("Message with timestamp greater than filterTimestamp should not be returned", + receivedMessages.stream().anyMatch(m -> m.getId().equals(message3.getId()))); + } + + @Test + public void testGetEmptyAttachmentByMessageId() throws Exception { + Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, null); + List attachs = store.getMessageAttachmentsById(msg.getId()); + assertEquals("attachs list should be empty", attachs.size(), 0); + } + + @Test + public void testGetSingleAttachmentByMessageId() throws Exception { + Attachment attach = new Attachment(); + String attachId = UUID.randomUUID().toString(); // Generate UUID for the message + String filename = "file name"; + String description = "file description"; + String uri = "uri"; + String thumb_url = "thumb url"; + + attach.setId(attachId); + attach.setName(filename); + attach.setDescription(description); + attach.setUri(uri); + attach.setThumbnailURL(thumb_url); + + List attachments = Arrays.asList(attach); + Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, attachments); + + List attachs = store.getMessageAttachmentsById(msg.getId()); + assertEquals("attachs list should have 1 element", 1, attachs.size()); + + Attachment obtained_attach = attachs.get(0); + + assertEquals("attachs list should have 1 element", attach.getId(), obtained_attach.getId()); + checkEqualAttachments(attach, obtained_attach); + } + + @Test + public void testMultiSingleAttachmentByMessageId() throws Exception { + + String filename = "file name"; + String description = "file description"; + String uri = "uri"; + String thumb_url = "thumb url"; + + List attachments = new ArrayList(); + + int count = 10; + + for (int i = 0; i < count; i++) { + Attachment attach = new Attachment(); + String attachId = UUID.randomUUID().toString(); // Generate UUID for the message + attach.setId(attachId); + attach.setName(filename + " " + i); + attach.setDescription(description + " " + i); + attach.setUri(uri + " " + i); + attach.setThumbnailURL(thumb_url + " " + i); + + attachments.add(attach); + } + + Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, attachments); + + String msg_id = msg.getId(); + List attachs_by_message_id = store.getMessageAttachmentsById(msg_id); + assertEquals("attachs list should have element", count, attachs_by_message_id.size()); + + for (Attachment attach : attachments) { + String attach_id = attach.getId(); + + Attachment by_message_id = attachs_by_message_id.stream().filter(c -> c.getId().equals(attach_id)) + .findFirst().orElse(null); + + assertNotNull("cannot obtain attach ", attach_id + "by msg id " + msg_id); + checkEqualAttachments(attach, by_message_id); + + Attachment by_attach_id = store.getMessageAttachmentById(msg_id, attach_id); + assertNotNull("cannot obtain attach by id", attach_id); + checkEqualAttachments(attach, by_attach_id); + } + } + + private boolean checkEqualAttachments(Attachment attach, Attachment obtained_attach) { + assertEquals("wrong attachId", attach.getId(), obtained_attach.getId()); + assertEquals("wrong filename", attach.getName(), obtained_attach.getName()); + assertEquals("wrong description", attach.getDescription(), obtained_attach.getDescription()); + assertEquals("wrong uri", attach.getUri(), obtained_attach.getUri()); + assertEquals("wrong thumb uri", attach.getThumbnailURL(), obtained_attach.getThumbnailURL()); + + return true; + } + + @Test + public void populate_messages() throws NotificationTypeNotFoundException { + CqlSession session = store.getSession(); + Date timestamp = null; + // Integer limit = 100000; + Integer limit = 100; + + List destinatari = Arrays.asList("alfredo.oliviero", "massimiliano.assante", "andrea.rossi"); + List notifications = store.getAllNotifications(timestamp, limit, session); + + for (Notification n : notifications) { + // protected MessageUser sender; + String sender = n.getSenderid(); + Message m = new Message(Message.generateUUID(), n.getSenderid(), n.getSenderFullName(), destinatari, + n.getSubjectid().toString(), n.getDescription(), n.getTime(), true); + Attachment attach = new Attachment(Message.generateUUID(), n.getUri(), m.getId(), n.getDescription(), + n.getSenderThumbnail(), "image/png"); + store.sendMessage(m, Arrays.asList(attach), session); + if (!destinatari.contains(sender)) + destinatari.set(0, sender); + System.out.println("created message " + m); + } + } + +}