modificato implementazione, sistemato schema, test funzionanti

sistemato logica e aggiunto test. non funziona ordinamento

all is working. still missing attachments

implemented attachemnts, fixed code, wrote tests and msg generator

developing messages
This commit is contained in:
Alfredo Oliviero 2024-09-12 15:08:39 +02:00
parent 042b719297
commit 9c9c7fa46b
11 changed files with 1732 additions and 586 deletions

View File

@ -1,66 +1,81 @@
-- Table for sent messages
CREATE TABLE IF NOT EXISTS messages_sent (
userid TEXT,
message_id UUID,
user_name TEXT,
addresses LIST<TEXT>,
subject TEXT,
body TEXT,
timestamp TIMESTAMP,
with_attachments BOOLEAN,
read BOOLEAN,
opened BOOLEAN,
deleted BOOLEAN,
PRIMARY KEY ((userid), timestamp, message_id)
)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE TABLE messages_received (
recipient_id text,
message_id uuid,
body text,
deleted boolean,
from_id text,
from_name text,
opened boolean,
read boolean,
subject text,
timestamp timestamp,
with_attachments boolean,
addresses list<text>,
PRIMARY KEY (recipient_id, message_id)
) WITH CLUSTERING ORDER BY (message_id ASC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND memtable = 'default'
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
-- Table for received messages
CREATE TABLE IF NOT EXISTS messages_received (
recipient_id TEXT,
message_id UUID,
userid TEXT,
user_name TEXT,
addresses LIST<TEXT>,
subject TEXT,
body TEXT,
timestamp TIMESTAMP,
with_attachments BOOLEAN,
read BOOLEAN,
opened BOOLEAN,
deleted BOOLEAN,
PRIMARY KEY ((recipient_id), timestamp, message_id)
)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE INDEX message_received_deleted_idx ON messages_received (deleted);
-- Materialized view for not deleted sent messages
CREATE MATERIALIZED VIEW IF NOT EXISTS messages_active_sent AS
SELECT *
FROM messages_sent
WHERE userid IS NOT NULL AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL
PRIMARY KEY (userid, timestamp, message_id)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE INDEX message_received_id_idx ON messages_received (message_id);
-- Materialized view for not deleted received messages
CREATE MATERIALIZED VIEW IF NOT EXISTS messages_active_received AS
SELECT *
FROM messages_received
WHERE recipient_id IS NOT NULL AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL
PRIMARY KEY (recipient_id, timestamp, message_id)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE INDEX message_received_read_idx ON messages_received (read);
-- Materialized view for unread and not deleted sent messages
CREATE MATERIALIZED VIEW IF NOT EXISTS messages_unread_sent AS
SELECT *
FROM messages_sent
WHERE userid IS NOT NULL AND read = false AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL
PRIMARY KEY (userid, timestamp, message_id)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE INDEX message_received_timestamp_idx ON messages_received (timestamp);
-- Materialized view for unread and not deleted received messages
CREATE MATERIALIZED VIEW IF NOT EXISTS messages_unread_received AS
SELECT *
FROM messages_received
WHERE recipient_id IS NOT NULL AND read = false AND deleted = false AND timestamp IS NOT NULL AND message_id IS NOT NULL
PRIMARY KEY (recipient_id, timestamp, message_id)
WITH CLUSTERING ORDER BY (timestamp DESC);
CREATE TABLE messages_sent (
from_id text,
message_id uuid,
body text,
deleted boolean,
from_name text,
opened boolean,
read boolean,
subject text,
timestamp timestamp,
with_attachments boolean,
addresses list<text>,
PRIMARY KEY (from_id, message_id)
) WITH CLUSTERING ORDER BY (message_id ASC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND memtable = 'default'
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
CREATE INDEX message_sent_deleted_idx ON messages_sent (deleted);
CREATE INDEX message_sent_id_idx ON messages_sent (message_id);
CREATE INDEX message_sent_read_idx ON messages_sent (read);
CREATE INDEX message_sent_timestamp_idx ON messages_sent (timestamp);

View File

@ -4,6 +4,8 @@ import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
@ -14,12 +16,8 @@ import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Massimiliano Assante ISTI-CNR
* @author Ahmed Ibrahim ISTI-CNR
@ -39,16 +37,37 @@ public class CassandraClusterConnection {
private static String keyspaceName;
private CqlSession myKeyspaceSession;
public static CassandraClusterConnection setCustomConfig(String config_host, String config_datacenterName,
String config_keyspaceName) throws Exception {
return setCustomConfig(config_host, config_datacenterName, config_keyspaceName, false, false);
}
public static CassandraClusterConnection setCustomConfig(String config_host, String config_datacenterName,
String config_keyspaceName, boolean dropSchema, boolean forceCreateNew) throws Exception {
RunningCluster cluster = RunningCluster.setCustomizedInstance(config_host, config_datacenterName,
config_keyspaceName);
// host = cluster.getHost();
hosts = cluster.getHosts();
datacenterName = cluster.getDatacenterName();
keyspaceName = cluster.getKeyspaceName();
_log.info("set custom config for CassandraClusterConnection. Hosts " + hosts + ", datacenterName"
+ datacenterName + ", CassandraClusterConnection" + keyspaceName);
return new CassandraClusterConnection(dropSchema, forceCreateNew);
}
/**
*
* @param dropSchema set true if you want do drop the current and set up new one
* the connection to cassandra cluster
* the connection to cassandra cluster
*/
protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew) throws Exception {
public CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew) throws Exception {
if (hosts == null || datacenterName == null || keyspaceName == null) {
RunningCluster cluster = RunningCluster.getInstance(null);
//host = cluster.getHost();
// host = cluster.getHost();
hosts = cluster.getHosts();
datacenterName = cluster.getDatacenterName();
keyspaceName = cluster.getKeyspaceName();
@ -63,12 +82,13 @@ public class CassandraClusterConnection {
/**
*
* @param dropSchema set true if you want to drop the current and set up new one
* the connection to cassandra cluster
* the connection to cassandra cluster
*/
protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew, String infrastructureName) throws Exception {
protected CassandraClusterConnection(boolean dropSchema, boolean forceCreateNew, String infrastructureName)
throws Exception {
if (hosts == null || datacenterName == null || keyspaceName == null) {
RunningCluster cluster = RunningCluster.getInstance(infrastructureName);
//host = cluster.getHost();
// host = cluster.getHost();
hosts = cluster.getHosts();
datacenterName = cluster.getDatacenterName();
keyspaceName = cluster.getKeyspaceName();
@ -79,8 +99,8 @@ public class CassandraClusterConnection {
_log.info("CONNECTED! using KeySpace: " + keyspaceName);
}
public CqlSession getKeyspaceSession(){
if (myKeyspaceSession.isClosed()){
public CqlSession getKeyspaceSession() {
if (myKeyspaceSession.isClosed()) {
myKeyspaceSession = connect(keyspaceName);
}
return myKeyspaceSession;
@ -88,15 +108,15 @@ public class CassandraClusterConnection {
/**
* @param dropSchema set true if you want to drop the current and set up new one
* the connection to cassandra cluster
* the connection to cassandra cluster
*/
public void SetUpKeySpaces(boolean dropSchema, boolean forceExecution) {
boolean createNew = false;
boolean found = false;
CqlSession session = connect();
CqlSession session = connect();
Metadata metaData = session.getMetadata();
for (KeyspaceMetadata meta : metaData.getKeyspaces().values()) {
if (meta.getName().toString().equals(keyspaceName)){
if (meta.getName().toString().equals(keyspaceName)) {
found = true;
break;
}
@ -133,7 +153,7 @@ public class CassandraClusterConnection {
/*
*
********************** CASSANDRA KEYSPACE CREATION ***********************
********************** CASSANDRA KEYSPACE CREATION ***********************
*
*/
private static CqlSession connect() {
@ -144,6 +164,7 @@ public class CassandraClusterConnection {
_log.info("[OK] Connected to Cassandra Cluster");
return cqlSession;
}
private static CqlSession connect(String KEYSPACE_NAME) {
CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
@ -155,31 +176,34 @@ public class CassandraClusterConnection {
}
public static void closeSession(CqlSession session) {
if (session != null) session.close();
if (session != null)
session.close();
_log.info("[OK]Session is now closed");
}
public void closeConnection(){
if(!myKeyspaceSession.isClosed()){
try{
public void closeConnection() {
if (!myKeyspaceSession.isClosed()) {
try {
_log.info("Closing connection");
closeSession(myKeyspaceSession);
_log.info("Connection closed!");
}catch(Exception e){
} catch (Exception e) {
_log.error("Unable to close connection", e);
}
}
}
private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder){
private static CqlSessionBuilder configBuilder(CqlSessionBuilder cqlSessionBuilder) {
return cqlSessionBuilder
.withConfigLoader(DriverConfigLoader.programmaticBuilder()
// Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out after PT2S
// Resolves the timeout query 'SELECT * FROM system_schema.tables' timed out
// after PT2S
.withDuration(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(240000))
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(240000))
.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(240000))
.build());
}
private static void createKeyspace(String keyspaceName, int replicationFactor) {
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
@ -195,7 +219,7 @@ public class CassandraClusterConnection {
}
}
private static ResultSet dropKeyspace(){
private static ResultSet dropKeyspace() {
ResultSet toreturn;
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
@ -207,7 +231,68 @@ public class CassandraClusterConnection {
}
return toreturn;
}
private void createTables(){
private void dropMessagesTables(CqlSession cqlSession) {
// try {
cqlSession.execute(
SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_RECEIVED_ACTIVE).ifExists().build());
cqlSession.execute(
SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_RECEIVED_READ).ifExists().build());
cqlSession
.execute(SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_SENT_ACTIVE).ifExists().build());
cqlSession.execute(SchemaBuilder.dropMaterializedView(Schema.TABLE_MESSAGES_SENT_READ).ifExists().build());
// } catch (Exception e) {
// e.printStackTrace();
// }
// try {
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_ID).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_READ).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_DELETED).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_SENT_TIMESTAMP).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_ID).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_READ).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_DELETED).ifExists().build());
cqlSession.execute(SchemaBuilder.dropIndex(Schema.IDX_MESSAGES_RECEIVED_TIMESTAMP).ifExists().build());
// } catch (Exception e) {
// e.printStackTrace();
// }
// try {
cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_SENT).ifExists().build());
// } catch (Exception e) {
// e.printStackTrace();
// }
// try {
cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_RECEIVED).ifExists().build());
// } catch (Exception e) {
// e.printStackTrace();
// }
}
private void createMessagesTables(CqlSession cqlSession) {
// dropMessagesTables(cqlSession);
// try {
createTableMessagesSent(cqlSession);
createTableMessagesReceived(cqlSession);
createTableMessagesAttachments(cqlSession);
// } catch (Exception e) {
// e.printStackTrace();
// }
// materialized views sono disattivate
// createViewMessagesActiveSent(cqlSession);
// createViewMessagesActiveReceived(cqlSession);
// createViewMessagesUnreadReceived(cqlSession);
// createViewMessagesUnreadSent(cqlSession);
}
private void createTables() {
try (CqlSession cqlSession = configBuilder(CqlSession.builder())
.addContactPoints(hosts)
.withLocalDatacenter(datacenterName)
@ -233,16 +318,14 @@ public class CassandraClusterConnection {
createTableNotifications(cqlSession);
createTablePosts(cqlSession);
createTableMessagesSent(cqlSession);
createTableMessagesReceived(cqlSession);
createViewMessagesActiveSent(cqlSession);
createViewMessagesActiveReceived(cqlSession);
createViewMessagesUnreadReceived(cqlSession);
createViewMessagesUnreadSent(cqlSession);
createMessagesTables(cqlSession);
closeSession(cqlSession);
} catch (Exception e) {
e.printStackTrace();
}
}
private void createTableUSERNotificationsPreferences(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserNotificationsPreferences")
.ifNotExists()
@ -254,49 +337,54 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsPreferences");
}
private void createTableUSERNotifications(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserNotifications")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("notid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERNotifications");
}
private void createTableVRETimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("VRETimeline")
.ifNotExists()
.withPartitionKey("vreid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "VRETimeline");
}
private void createTableAppTimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("AppTimeline")
.ifNotExists()
.withPartitionKey("appid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "AppTimeline");
}
private void createTableUSERTimeline(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserTimeline")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("postid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERTimeline");
}
private void createTableHashtaggedPosts(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtaggedPosts")
.ifNotExists()
@ -308,6 +396,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedPosts");
}
private void createTableHashtaggedComments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtaggedComments")
.ifNotExists()
@ -319,6 +408,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "HashtaggedComments");
}
private void createTableHashtagsCounter(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("HashtagsCounter")
.ifNotExists()
@ -330,16 +420,18 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "HashtagsCounter");
}
private void createTableUSERNotificationsUnread(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserUnreadNotifications")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withPartitionKey("timestamp", DataTypes.TIMESTAMP)
.withPartitionKey(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("notid", DataTypes.UUID)
.withCompactStorage()
.build());
_log.info("+ Table '{}' has been created (if needed).", "USERNotificationsUnread");
}
private void createTableUSERLikes(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("UserLikes")
.ifNotExists()
@ -351,6 +443,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "USERLikes");
}
private void createTableVREInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("VREInvites")
.ifNotExists()
@ -362,6 +455,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "VREInvites");
}
private void createTableEMAILInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("EmailInvites")
.ifNotExists()
@ -372,6 +466,7 @@ public class CassandraClusterConnection {
.build());
_log.info("+ Table '{}' has been created (if needed).", "EMAILInvites");
}
private void createTableAttachments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Attachments")
.ifNotExists()
@ -392,6 +487,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "Attachments");
}
private void createTableInvites(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Invites")
.ifNotExists()
@ -401,7 +497,7 @@ public class CassandraClusterConnection {
.withColumn("email", DataTypes.TEXT)
.withColumn("controlcode", DataTypes.TEXT)
.withColumn("status", DataTypes.TEXT)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("senderfullname", DataTypes.TEXT)
.withCompactStorage()
.build());
@ -413,6 +509,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "Invites");
}
private void createTableLikes(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Likes")
.ifNotExists()
@ -421,7 +518,7 @@ public class CassandraClusterConnection {
.withColumn("fullname", DataTypes.TEXT)
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("postid", DataTypes.UUID)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withCompactStorage()
.build());
cqlSession.execute(SchemaBuilder.createIndex("post_likes")
@ -432,6 +529,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "Likes");
}
private void createTableComments(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Comments")
.ifNotExists()
@ -441,7 +539,7 @@ public class CassandraClusterConnection {
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("comment", DataTypes.TEXT)
.withColumn("postid", DataTypes.UUID)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("isedit", DataTypes.BOOLEAN)
.withColumn("lastedittime", DataTypes.TIMESTAMP)
.withCompactStorage()
@ -454,6 +552,7 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "Comments");
}
private void createTableNotifications(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Notifications")
.ifNotExists()
@ -477,6 +576,7 @@ public class CassandraClusterConnection {
.build());
_log.info("+ Table '{}' has been created (if needed).", "Notifications");
}
private void createTablePosts(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("Posts")
.ifNotExists()
@ -487,7 +587,7 @@ public class CassandraClusterConnection {
.withColumn("likesno", DataTypes.BIGINT)
.withColumn("thumbnailurl", DataTypes.TEXT)
.withColumn("linkdescription", DataTypes.TEXT)
.withColumn("timestamp", DataTypes.TIMESTAMP)
.withColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP)
.withColumn("uri", DataTypes.TEXT)
.withColumn("isapplicationpost", DataTypes.BOOLEAN)
.withColumn("entityid", DataTypes.TEXT)
@ -510,169 +610,234 @@ public class CassandraClusterConnection {
_log.info("+ Table '{}' has been created (if needed).", "Posts");
}
// Method to create the messages_sent table
private void createTableMessagesSent(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("messages_sent")
.ifNotExists()
.withPartitionKey("userid", DataTypes.TEXT)
.withClusteringColumn("timestamp", DataTypes.TIMESTAMP)
.withClusteringColumn("message_id", DataTypes.UUID)
.withColumn("user_name", DataTypes.TEXT)
.withColumn("addresses", DataTypes.listOf(DataTypes.TEXT))
.withColumn("subject", DataTypes.TEXT)
.withColumn("body", DataTypes.TEXT)
.withColumn("with_attachments", DataTypes.BOOLEAN)
.withColumn("read", DataTypes.BOOLEAN)
.withColumn("opened", DataTypes.BOOLEAN)
.withColumn("deleted", DataTypes.BOOLEAN)
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
// Method to create the messages_sent table
private void createTableMessagesSent(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_SENT)
.ifNotExists()
.withPartitionKey(Schema.FROM_ID, DataTypes.TEXT) // Partition key on sender ID
.withClusteringColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) // Clustering by timestamp for ordering
.withClusteringColumn(Schema.MESSAGE_ID, DataTypes.UUID) // Clustering by message ID for uniqueness
.withColumn(Schema.FROM_NAME, DataTypes.TEXT) // Sender's name
.withColumn(Schema.RECIPIENTS, DataTypes.listOf(DataTypes.TEXT)) // List of recipients
.withColumn(Schema.SUBJECT, DataTypes.TEXT) // Message subject
.withColumn(Schema.BODY, DataTypes.TEXT) // Message body
.withColumn(Schema.WITH_ATTACH, DataTypes.BOOLEAN) // Boolean flag for attachments
.withColumn(Schema.ISREAD, DataTypes.BOOLEAN) // Boolean flag for read status
.withColumn(Schema.ISOPENED, DataTypes.BOOLEAN) // Boolean flag for opened status
.withColumn(Schema.ISDELETED, DataTypes.BOOLEAN) // Boolean flag for deleted status
.withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) // Descending order by timestamp
.build());
_log.info("+ Table '{}' has been created (if needed).", "messages_sent");
}
// Remove unnecessary indexes: no need for message_id or timestamp indexes
// Retain only the index for deleted and read messages
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_DELETED)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_SENT)
.andColumn(Schema.ISDELETED)
.build());
// Method to create the messages_received table
private void createTableMessagesReceived(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable("messages_received")
.ifNotExists()
.withPartitionKey("recipient_id", DataTypes.TEXT)
.withClusteringColumn("timestamp", DataTypes.TIMESTAMP)
.withClusteringColumn("message_id", DataTypes.UUID)
.withColumn("userid", DataTypes.TEXT)
.withColumn("user_name", DataTypes.TEXT)
.withColumn("addresses", DataTypes.listOf(DataTypes.TEXT))
.withColumn("subject", DataTypes.TEXT)
.withColumn("body", DataTypes.TEXT)
.withColumn("with_attachments", DataTypes.BOOLEAN)
.withColumn("read", DataTypes.BOOLEAN)
.withColumn("opened", DataTypes.BOOLEAN)
.withColumn("deleted", DataTypes.BOOLEAN)
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_READ)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_SENT)
.andColumn(Schema.ISREAD)
.build());
_log.info("+ Table '{}' has been created (if needed).", "messages_received");
}
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_SENT_ID)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_SENT)
.andColumn(Schema.MESSAGE_ID)
.build());
// Method to create the materialized view for non-deleted sent messages
private void createViewMessagesActiveSent(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createMaterializedView("messages_active_sent")
.ifNotExists()
.asSelectFrom("messages_sent")
.column("userid")
.column("timestamp")
.column("message_id")
.column("user_name")
.column("addresses")
.column("subject")
.column("body")
.column("with_attachments")
.column("read")
.column("opened")
.column("deleted")
.whereColumn("userid").isNotNull()
.whereColumn("deleted").isEqualTo(QueryBuilder.literal(false))
.whereColumn("timestamp").isNotNull()
.whereColumn("message_id").isNotNull()
.withPartitionKey("userid")
.withClusteringColumn("timestamp")
.withClusteringColumn("message_id")
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
_log.info("+ Table '{}' has been created (if needed).", "messages_sent");
}
_log.info("+ Materialized View '{}' has been created (if needed).", "messages_active_sent");
}
private void createTableMessagesAttachments(CqlSession cqlSession) {
// Method to create the materialized view for non-deleted received messages
private void createViewMessagesActiveReceived(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createMaterializedView("messages_active_received")
.ifNotExists()
.asSelectFrom("messages_received")
.column("recipient_id")
.column("timestamp")
.column("message_id")
.column("userid")
.column("user_name")
.column("addresses")
.column("subject")
.column("body")
.column("with_attachments")
.column("read")
.column("opened")
.column("deleted")
.whereColumn("recipient_id").isNotNull()
.whereColumn("deleted").isEqualTo(QueryBuilder.literal(false))
.whereColumn("timestamp").isNotNull()
.whereColumn("message_id").isNotNull()
.withPartitionKey("recipient_id")
.withClusteringColumn("timestamp")
.withClusteringColumn("message_id")
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
cqlSession.execute(SchemaBuilder.dropTable(Schema.TABLE_MESSAGES_ATTACHMENTS).ifExists().build());
_log.info("+ Materialized View '{}' has been created (if needed).", "messages_active_received");
}
cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_ATTACHMENTS)
.ifNotExists()
.withPartitionKey(Schema.MESSAGE_ID, DataTypes.UUID) // Partition key on MESSAGE ID
.withClusteringColumn(Schema.ATTACH_ID, DataTypes.UUID) // Clustering by message ID for uniqueness
// Method to create the materialized view for unread sent messages
private void createViewMessagesUnreadSent(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createMaterializedView("messages_unread_sent")
.ifNotExists()
.asSelectFrom("messages_sent")
.column("userid")
.column("timestamp")
.column("message_id")
.column("user_name")
.column("addresses")
.column("subject")
.column("body")
.column("with_attachments")
.column("read")
.column("opened")
.column("deleted")
.whereColumn("userid").isNotNull()
.whereColumn("read").isEqualTo(QueryBuilder.literal(false))
.whereColumn("deleted").isEqualTo(QueryBuilder.literal(false))
.whereColumn("timestamp").isNotNull()
.whereColumn("message_id").isNotNull()
.withPartitionKey("userid")
.withClusteringColumn("timestamp")
.withClusteringColumn("message_id")
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
.withColumn(Schema.URI, DataTypes.TEXT) // File URL
.withColumn(Schema.NAME, DataTypes.TEXT) // File Name
.withColumn(Schema.DESCRIPTION, DataTypes.TEXT) // File Description
.withColumn(Schema.URI_THUMBNAIL, DataTypes.TEXT) // File Thumb URI
.withColumn(Schema.MIME_TYPE, DataTypes.TEXT) // Boolean flag for attachments
.build());
}
_log.info("+ Materialized View '{}' has been created (if needed).", "messages_unread_sent");
}
// Method to create the messages_received table
private void createTableMessagesReceived(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createTable(Schema.TABLE_MESSAGES_RECEIVED)
.ifNotExists()
.withPartitionKey(Schema.RECIPIENT_ID, DataTypes.TEXT) // Partition key on recipient ID
.withClusteringColumn(Schema.TIMESTAMP, DataTypes.TIMESTAMP) // Clustering by timestamp for ordering
.withClusteringColumn(Schema.MESSAGE_ID, DataTypes.UUID) // Clustering by message ID for uniqueness
.withColumn(Schema.FROM_ID, DataTypes.TEXT) // Sender's ID
.withColumn(Schema.FROM_NAME, DataTypes.TEXT) // Sender's name
.withColumn(Schema.RECIPIENTS, DataTypes.listOf(DataTypes.TEXT)) // List of recipients
.withColumn(Schema.SUBJECT, DataTypes.TEXT) // Message subject
.withColumn(Schema.BODY, DataTypes.TEXT) // Message body
.withColumn(Schema.WITH_ATTACH, DataTypes.BOOLEAN) // Boolean flag for attachments
.withColumn(Schema.ISREAD, DataTypes.BOOLEAN) // Boolean flag for read status
.withColumn(Schema.ISOPENED, DataTypes.BOOLEAN) // Boolean flag for opened status
.withColumn(Schema.ISDELETED, DataTypes.BOOLEAN) // Boolean flag for deleted status
.withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC) // Descending order by timestamp
.build());
// Method to create the materialized view for unread received messages
private void createViewMessagesUnreadReceived(CqlSession cqlSession) {
cqlSession.execute(SchemaBuilder.createMaterializedView("messages_unread_received")
.ifNotExists()
.asSelectFrom("messages_received")
.column("recipient_id")
.column("timestamp")
.column("message_id")
.column("userid")
.column("user_name")
.column("addresses")
.column("subject")
.column("body")
.column("with_attachments")
.column("read")
.column("opened")
.column("deleted")
.whereColumn("recipient_id").isNotNull()
.whereColumn("read").isEqualTo(QueryBuilder.literal(false))
.whereColumn("deleted").isEqualTo(QueryBuilder.literal(false))
.whereColumn("timestamp").isNotNull()
.whereColumn("message_id").isNotNull()
.withPartitionKey("recipient_id")
.withClusteringColumn("timestamp")
.withClusteringColumn("message_id")
.withClusteringOrder("timestamp", ClusteringOrder.DESC)
.build());
// Remove unnecessary indexes: no need for message_id or timestamp indexes
// Retain only the index for deleted and read messages
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_DELETED)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_RECEIVED)
.andColumn(Schema.ISDELETED)
.build());
_log.info("+ Materialized View '{}' has been created (if needed).", "messages_unread_received");
}
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_READ)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_RECEIVED)
.andColumn(Schema.ISREAD)
.build());
cqlSession.execute(SchemaBuilder.createIndex(Schema.IDX_MESSAGES_RECEIVED_ID)
.ifNotExists()
.onTable(Schema.TABLE_MESSAGES_RECEIVED)
.andColumn(Schema.MESSAGE_ID)
.build());
_log.info("+ Table '{}' has been created (if needed).", "messages_received");
}
/*
* // Method to create the materialized view for non-deleted sent messages
* private void createViewMessagesActiveSent(CqlSession cqlSession) {
* cqlSession.execute(SchemaBuilder.createMaterializedView(Schema.
* TABLE_MESSAGES_SENT_ACTIVE)
* .ifNotExists()
* .asSelectFrom(Schema.TABLE_MESSAGES_SENT)
* .column(Schema.FROM_ID)
* .column(Schema.TIMESTAMP)
* .column(Schema.MESSAGE_ID)
* .column(Schema.FROM_NAME)
* .column(Schema.RECIPIENTS)
* .column(Schema.SUBJECT)
* .column(Schema.BODY)
* .column(Schema.WITH_ATTACH)
* .column(Schema.ISREAD)
* .column(Schema.ISOPENED)
* .column(Schema.ISDELETED)
* .whereColumn(Schema.FROM_ID).isNotNull()
* .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.TIMESTAMP).isNotNull()
* .whereColumn(Schema.MESSAGE_ID).isNotNull()
* .withPartitionKey(Schema.FROM_ID)
* .withClusteringColumn(Schema.TIMESTAMP)
* .withClusteringColumn(Schema.MESSAGE_ID)
* .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC)
* .build());
*
* _log.info("+ Materialized View '{}' has been created (if needed).",
* "messages_active_sent");
* }
*
* // Method to create the materialized view for non-deleted received messages
* private void createViewMessagesActiveReceived(CqlSession cqlSession) {
* cqlSession.execute(SchemaBuilder.createMaterializedView(Schema.
* TABLE_MESSAGES_RECEIVED_ACTIVE)
* .ifNotExists()
* .asSelectFrom(Schema.TABLE_MESSAGES_RECEIVED)
* .column(Schema.RECIPIENT_ID)
* .column(Schema.TIMESTAMP)
* .column(Schema.MESSAGE_ID)
* .column(Schema.FROM_ID)
* .column(Schema.FROM_NAME)
* .column(Schema.RECIPIENTS)
* .column(Schema.SUBJECT)
* .column(Schema.BODY)
* .column(Schema.WITH_ATTACH)
* .column(Schema.ISREAD)
* .column(Schema.ISOPENED)
* .column(Schema.ISDELETED)
* .whereColumn(Schema.RECIPIENT_ID).isNotNull()
* .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.TIMESTAMP).isNotNull()
* .whereColumn(Schema.MESSAGE_ID).isNotNull()
* .withPartitionKey(Schema.RECIPIENT_ID)
* .withClusteringColumn(Schema.TIMESTAMP)
* .withClusteringColumn(Schema.MESSAGE_ID)
* .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC)
* .build());
*
* _log.info("+ Materialized View '{}' has been created (if needed).",
* "messages_active_received");
* }
*
* // Method to create the materialized view for unread sent messages
* private void createViewMessagesUnreadSent(CqlSession cqlSession) {
* cqlSession.execute(SchemaBuilder.createMaterializedView(Schema.
* TABLE_MESSAGES_SENT_READ)
* .ifNotExists()
* .asSelectFrom(Schema.TABLE_MESSAGES_SENT)
* .column(Schema.FROM_ID)
* .column(Schema.TIMESTAMP)
* .column(Schema.MESSAGE_ID)
* .column(Schema.FROM_NAME)
* .column(Schema.RECIPIENTS)
* .column(Schema.SUBJECT)
* .column(Schema.BODY)
* .column(Schema.WITH_ATTACH)
* .column(Schema.ISREAD)
* .column(Schema.ISOPENED)
* .column(Schema.ISDELETED)
* .whereColumn(Schema.FROM_ID).isNotNull()
* .whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.TIMESTAMP).isNotNull()
* .whereColumn(Schema.MESSAGE_ID).isNotNull()
* .withPartitionKey(Schema.FROM_ID)
* .withClusteringColumn(Schema.TIMESTAMP)
* .withClusteringColumn(Schema.MESSAGE_ID)
* .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC)
* .build());
*
* _log.info("+ Materialized View '{}' has been created (if needed).",
* "messages_unread_sent");
* }
*
* // Method to create the materialized view for unread received messages
* private void createViewMessagesUnreadReceived(CqlSession cqlSession) {
* cqlSession.execute(SchemaBuilder.createMaterializedView(Schema.
* TABLE_MESSAGES_RECEIVED_READ)
* .ifNotExists()
* .asSelectFrom(Schema.TABLE_MESSAGES_RECEIVED)
* .column(Schema.RECIPIENT_ID)
* .column(Schema.TIMESTAMP)
* .column(Schema.MESSAGE_ID)
* .column(Schema.FROM_ID)
* .column(Schema.FROM_NAME)
* .column(Schema.RECIPIENTS)
* .column(Schema.SUBJECT)
* .column(Schema.BODY)
* .column(Schema.WITH_ATTACH)
* .column(Schema.ISREAD)
* .column(Schema.ISOPENED)
* .column(Schema.ISDELETED)
* .whereColumn(Schema.RECIPIENT_ID).isNotNull()
* .whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(false))
* .whereColumn(Schema.TIMESTAMP).isNotNull()
* .whereColumn(Schema.MESSAGE_ID).isNotNull()
* .withPartitionKey(Schema.RECIPIENT_ID)
* .withClusteringColumn(Schema.TIMESTAMP)
* .withClusteringColumn(Schema.MESSAGE_ID)
* .withClusteringOrder(Schema.TIMESTAMP, ClusteringOrder.DESC)
* .build());
*
* _log.info("+ Materialized View '{}' has been created (if needed).",
* "messages_unread_received");
* }
*
*/
}

View File

@ -62,7 +62,12 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.update.Update;
/**
* @author Massimiliano Assante ISTI-CNR
@ -87,6 +92,10 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
return conn;
}
public CqlSession getSession() {
return this.getConnection().getKeyspaceSession();
}
/**
* use this constructor carefully from test classes
*
@ -4547,160 +4556,293 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
return toReturn;
}
protected List<MessageSent> getSentMessagesByUser(String userId, String messageId, Date timestamp, Integer limit,
Boolean include_deleted, CqlSession session) {
/// MESSAGES /
if (session == null)
/**
* {@inheritDoc}
*
* @throws NotificationTypeNotFoundException
*/
public List<Notification> getAllNotifications(Date timestamp, Integer limit, CqlSession session)
throws NotificationTypeNotFoundException {
if (session == null) {
session = conn.getKeyspaceSession();
List<MessageSent> messages = new ArrayList<>();
String query = "SELECT message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted "
+ "FROM messages_received WHERE userid = ? ";
ArrayList<Object> params = new ArrayList<>();
params.add(userId);
if (messageId != null) {
query += " AND messageId = ? ";
params.add(messageId);
}
// Build the select query from the appropriate table (active or full sent
// messages)
Select select = QueryBuilder.selectFrom(Schema.NOTIFICATIONS)
.all();
if (limit != null) {
select = select.limit(limit);
}
// Add a condition to get messages with a timestamp less than the provided one
if (timestamp != null) {
query += " AND timestamp <= ? ";
params.add(timestamp);
select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant()));
select = select.allowFiltering();
}
ArrayList<Notification> toReturn = new ArrayList<Notification>();
// Converte la query in SimpleStatement
SimpleStatement statement = select.build();
if (!include_deleted) {
query += " AND deleted = false ";
}
if (limit != null && limit > 0) {
query += " LIMIT ? ";
params.add(limit);
}
SimpleStatement statement = SimpleStatement.builder(query)
.addPositionalValues(params)
.build();
// Visualizza la query CQL generata
// System.out.println("Generated CQL Query: " + statement.getQuery());
// Execute the query and get the result set
ResultSet resultSet = session.execute(statement);
for (Row row : resultSet) {
UUID message_id = row.getUuid("message_id");
String userName = row.getString("user_name");
List<String> addresses = row.getList("addresses", String.class);
String subject = row.getString("subject");
String body = row.getString("body");
Date msgTimestamp = Date.from(row.getInstant("timestamp"));
boolean withAttachments = row.getBoolean("with_attachments");
boolean isRead = row.getBoolean("read");
boolean isOpened = row.getBoolean("opened");
boolean isDeleted = row.getBoolean("deleted");
for (Row record : resultSet) {
toReturn.add(readNotificationFromRow(record));
}
return toReturn;
}
@Override
public List<MessageSent> getSentMessagesBySender(String fromId, String messageId, Date timestamp, Integer limit,
Boolean filter_deleted, Boolean filter_read, CqlSession session) {
if (fromId == null) {
throw new NullArgumentException("fromId cannot be null");
}
if (session == null) {
session = conn.getKeyspaceSession();
}
// Build the select query from the appropriate table (active or full sent
// messages)
Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_SENT)
.all()
.whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(fromId));
if (limit != null) {
select = select.limit(limit);
}
// Add a condition on messageId if provided
if (messageId != null) {
select = select.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId)));
}
// Add a condition to get messages with a timestamp less than the provided one
if (timestamp != null) {
select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant()));
}
// Add a condition to filter on deleted if filter_deleted is not null
if (filter_deleted != null) {
select = select.whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(filter_deleted));
}
// Add a condition to filter on read messages if filter_read is not null
if (filter_read != null) {
select = select.whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(filter_read));
}
// if (messageId == null) {
// // Add ordering by timestamp in descending order
// select = select.orderBy(Schema.TIMESTAMP, ClusteringOrder.DESC);
// }
if (filter_deleted != null || filter_read != null) {
select = select.allowFiltering();
}
// Converte la query in SimpleStatement
SimpleStatement statement = select.build();
// Visualizza la query CQL generata
// System.out.println("Generated CQL Query: " + statement.getQuery());
// Execute the query and get the result set
ResultSet resultSet = session.execute(statement);
// List to store the MessageSent objects
List<MessageSent> messagesSent = new ArrayList<>();
// Iterate through the result set and create MessageSent objects
for (Row row : resultSet) {
// Use the constructor of MessageSent to initialize the object
MessageSent message = new MessageSent(
message_id.toString(),
userId,
userName,
addresses,
subject,
body,
msgTimestamp,
withAttachments,
isRead,
isOpened,
isDeleted);
row.getUuid(Schema.MESSAGE_ID).toString(), // message ID
row.getString(Schema.FROM_ID), // from ID (user ID)
row.getString(Schema.FROM_NAME), // user name
row.getList(Schema.RECIPIENTS, String.class), // list of recipients
row.getString(Schema.SUBJECT), // subject of the message
row.getString(Schema.BODY), // body of the message
Date.from(row.getInstant(Schema.TIMESTAMP)), // timestamp of the message
row.getBoolean(Schema.WITH_ATTACH), // whether the message has attachments
row.getBoolean(Schema.ISREAD), // whether the message is read
row.getBoolean(Schema.ISOPENED), // whether the message is opened
row.getBoolean(Schema.ISDELETED) // whether the message is deleted
);
messages.add(message);
// Add the message to the list
messagesSent.add(message);
}
return messages;
// Return the list of messages
return messagesSent;
}
protected List<MessageReceived> getReceivedMessagesByUser(String recipientId, String messageId, Date timestamp,
Integer limit, Boolean include_deleted, CqlSession session) {
@Override
public List<MessageReceived> getReceivedMessagesByRecipient(String recipientId, String messageId, Date timestamp,
Integer limit, Boolean filter_deleted, Boolean filter_read, CqlSession session) {
if (recipientId == null) {
throw new NullArgumentException("recipientId cannot be null");
}
if (session == null) {
session = conn.getKeyspaceSession();
}
// Build the select query from the appropriate table (active or full received
// messages)
Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_RECEIVED)
.all()
.whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(recipientId));
if (limit != null) {
select = select.limit(limit);
}
// Add a condition on messageId if provided
if (messageId != null) {
select = select.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId)));
}
// Add a condition to get messages with a timestamp less than the provided one
if (timestamp != null) {
select = select.whereColumn(Schema.TIMESTAMP).isLessThan(QueryBuilder.literal(timestamp.toInstant()));
}
// Add a condition to filter on deleted if filter_deleted is not null
if (filter_deleted != null) {
select = select.whereColumn(Schema.ISDELETED).isEqualTo(QueryBuilder.literal(filter_deleted));
}
// Add a condition to filter on read messages if filter_read is not null
if (filter_read != null) {
select = select.whereColumn(Schema.ISREAD).isEqualTo(QueryBuilder.literal(filter_read));
}
// if (messageId == null) {
// // Add ordering by timestamp in descending order
// select = select.orderBy(Schema.TIMESTAMP, ClusteringOrder.DESC);
// }
if (filter_deleted != null || filter_read != null) {
select = select.allowFiltering();
}
// Converte la query in SimpleStatement
SimpleStatement statement = select.build();
// Visualizza la query CQL generata
// System.out.println("Generated CQL Query: " + statement.getQuery());
// Execute the query and get the result set
ResultSet resultSet = session.execute(statement);
// List to store the MessageReceived objects
List<MessageReceived> messagesReceived = new ArrayList<>();
// Iterate through the result set and create MessageReceived objects
for (Row row : resultSet) {
// Use the constructor of MessageReceived to initialize the object
MessageReceived message = new MessageReceived(
row.getUuid(Schema.MESSAGE_ID).toString(), // message ID
row.getString(Schema.RECIPIENT_ID), // recipient ID
row.getString(Schema.FROM_ID), // sender ID (user ID)
row.getString(Schema.FROM_NAME), // sender name
row.getList(Schema.RECIPIENTS, String.class), // list of recipients (addresses)
row.getString(Schema.SUBJECT), // subject of the message
row.getString(Schema.BODY), // body of the message
Date.from(row.getInstant(Schema.TIMESTAMP)), // timestamp of the message
row.getBoolean(Schema.WITH_ATTACH), // whether the message has attachments
row.getBoolean(Schema.ISREAD), // whether the message is read
row.getBoolean(Schema.ISOPENED), // whether the message is opened
row.getBoolean(Schema.ISDELETED) // whether the message is deleted
);
// Add the message to the list
messagesReceived.add(message);
}
// Return the list of messages
return messagesReceived;
}
protected List<Attachment> retrieveAttachmentsMessage(String messageId, String attachId, CqlSession session) {
// Inserting data
if (session == null)
session = conn.getKeyspaceSession();
// an entry in the Attachment CF
List<MessageReceived> messages = new ArrayList();
// Build the select query from the appropriate table (active or full received
// messages)
Select select = QueryBuilder.selectFrom(Schema.TABLE_MESSAGES_ATTACHMENTS)
.all()
.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId)));
String query = "SELECT message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted "
+ "FROM messages_received WHERE recipientid = ? ";
// Add a condition on attachName if provided
if (attachId != null) {
select = select.whereColumn(Schema.ATTACH_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(attachId)));
ArrayList<Object> params = new ArrayList<>();
params.add(recipientId);
if (messageId != null) {
query += " AND messageId = ? ";
params.add(messageId);
select = select.allowFiltering();
}
if (timestamp != null) {
query += " AND timestamp <= ? ";
params.add(timestamp);
}
if (!include_deleted) {
query += " AND deleted = false ";
}
if (limit != null && limit > 0) {
query += " LIMIT ? ";
params.add(limit);
}
SimpleStatement statement = SimpleStatement.builder(query)
.addPositionalValues(params)
.build();
// Converte la query in SimpleStatement
SimpleStatement statement = select.build();
ResultSet resultSet = session.execute(statement);
List<Attachment> attachments = new ArrayList<>();
// Iterate through the result set and create MessageReceived objects
for (Row row : resultSet) {
UUID message_id = row.getUuid("message_id");
String senderId = row.getString("userid");
String userName = row.getString("user_name");
List<String> addresses = row.getList("addresses", String.class);
String subject = row.getString("subject");
String body = row.getString("body");
Date msgTimestamp = Date.from(row.getInstant("timestamp"));
boolean withAttachments = row.getBoolean("with_attachments");
boolean isRead = row.getBoolean("read");
boolean isOpened = row.getBoolean("opened");
boolean isDeleted = row.getBoolean("deleted");
// Use the constructor of MessageReceived to initialize the object
Attachment attach = new Attachment(
// String id,
// String uri,
// String name,
// String description,
// String thumbnailURL,
// String mimeType
MessageReceived message = new MessageReceived(
message_id.toString(),
recipientId,
senderId,
userName,
addresses,
subject,
body,
msgTimestamp,
withAttachments,
isRead,
isOpened,
isDeleted);
// row.getUuid(Schema.MESSAGE_ID).toString(), // message ID
row.getUuid(Schema.ATTACH_ID).toString(), // message ID
row.getString(Schema.URI), // file uri
row.getString(Schema.NAME), // file name
row.getString(Schema.DESCRIPTION), // file description
row.getString(Schema.URI_THUMBNAIL), // file uri
row.getString(Schema.MIME_TYPE) // mime type
);
messages.add(message);
// Add the message to the list
attachments.add(attach);
}
return messages;
// Return the list of messages
return attachments;
}
protected MessageSent getSentMessageById(String userId, String messageId, Boolean include_deleted,
protected MessageSent getSentMessageById(String fromId, String messageId, Boolean filter_deleted,
Boolean filter_read,
CqlSession session) {
List<MessageSent> messages = getSentMessagesByUser(userId, messageId, null, 1, include_deleted, session);
List<MessageSent> messages = getSentMessagesBySender(fromId, messageId, null, 1, filter_deleted, null, session);
if (messages.size() > 0)
return messages.get(0);
else
return null;
}
public MessageReceived getReceivedMessageById(String recipientId, String messageId, Boolean include_deleted,
public MessageReceived getReceivedMessageById(String recipientId, String messageId, Boolean filter_deleted,
Boolean filter_read,
CqlSession session) {
List<MessageReceived> messages = getReceivedMessagesByUser(recipientId, messageId, null, 1, include_deleted,
session);
List<MessageReceived> messages = getReceivedMessagesByRecipient(recipientId, messageId, null, 1, filter_deleted,
filter_read, session);
if (messages.size() > 0)
return messages.get(0);
else
@ -4714,7 +4856,7 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
* @param session
* @return
*/
protected boolean checkAndDeleteAttachmenForDeletedMessages(Message message, CqlSession session) {
protected boolean deleteAttachmentForDeletedMessages(Message message, CqlSession session) {
if (!message.isWith_attachments()) {
return false;
@ -4726,14 +4868,15 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
// search for not deleted message_sent. if we found it, we cannot delete
// attachments
MessageSent messageSent = getSentMessageById(message.getUserid(), message.getId(), false, session);
MessageSent messageSent = getSentMessageById(message.getUserid(), message.getId(), false, null, session);
if (messageSent != null) {
return false;
}
// search for not deleted message_received. if we found any, we cannot delete
// attachments
List<MessageReceived> messagesReceived = getReceivedMessagesByUser(null, message.getId(), null, 1, false,
List<MessageReceived> messagesReceived = getReceivedMessagesByRecipient(null, message.getId(), null, 1, false,
null,
session);
if (messagesReceived.size() > 0) {
return false;
@ -4744,27 +4887,73 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
return false;
}
protected MessageSent deleteMessageSent(String userId, String messageId, Boolean checkAttachmentDeletion,
@Override
public Message deleteMessage(String messageId) {
CqlSession session = conn.getKeyspaceSession();
MessageSent message = getSentMessageById(null, messageId, false, null, session);
for (String recipientId : message.getAddresses()) {
deleteMessageReceived(recipientId, messageId, false, session);
}
deleteMessageSent(message.getUserid(), messageId, true, session);
return getSentMessageById(null, messageId, null, null, session);
}
protected MessageSent deleteMessageSent(String fromId, String messageId, Boolean checkAttachmentDeletion,
CqlSession session) {
if (session == null) {
session = conn.getKeyspaceSession();
}
SimpleStatement updateStatement = SimpleStatement.builder(
"UPDATE messages_sent SET deleted = true WHERE userid = ? AND message_id = ?")
.addPositionalValues(userId, UUID.fromString(messageId))
.build();
session.execute(updateStatement);
MessageSent message = getSentMessageById(userId, messageId, true, session);
if (checkAttachmentDeletion && message.isWith_attachments()) {
checkAndDeleteAttachmenForDeletedMessages(message, session);
// Recupera il messaggio per ottenere il timestamp necessario per l'update
MessageSent sentMessage = getSentMessageById(fromId, messageId);
if (sentMessage == null) {
throw new IllegalArgumentException(
"Message not found with fromId: " + fromId + " and messageId: " + messageId);
}
return message;
return deleteMessageSent(sentMessage, checkAttachmentDeletion, session);
}
protected MessageSent deleteMessageSent(MessageSent sentMessage, Boolean checkAttachmentDeletion,
CqlSession session) {
if (session == null) {
session = conn.getKeyspaceSession();
}
// Recupera il messaggio per ottenere il timestamp necessario per l'update
if (sentMessage == null) {
throw new IllegalArgumentException(
"Message not found");
}
// Costruisci la query di aggiornamento usando il QueryBuilder
Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_SENT)
.setColumn(Schema.ISDELETED, QueryBuilder.literal(true)) // Imposta il campo 'read' al valore fornito
.whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(sentMessage.getUserid())) // Filtro per
// 'from_id'
.whereColumn(Schema.TIMESTAMP).isEqualTo(QueryBuilder.literal(sentMessage.getTimestamp().toInstant()))
.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(sentMessage.getUUID())); // Filtro
// per
// 'message_id'
SimpleStatement statement = update.build();
String updateString = statement.getQuery();
// Esegui la query di aggiornamento
session.execute(statement);
// Retrieve the message after marking it as deleted
sentMessage = getSentMessageById(sentMessage.getUserid(), sentMessage.getId(), true, null, session);
// Check if attachment deletion is required and the message has attachments
if (checkAttachmentDeletion && sentMessage.isWith_attachments()) {
deleteAttachmentForDeletedMessages(sentMessage, session);
}
// Return the deleted message
return sentMessage;
}
protected MessageReceived deleteMessageReceived(String recipientId, String messageId,
@ -4772,76 +4961,103 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
if (session == null) {
session = conn.getKeyspaceSession();
}
SimpleStatement updateStatement = SimpleStatement.builder(
"UPDATE messages_received SET deleted = true WHERE recipientid = ? AND message_id = ?")
.addPositionalValues(recipientId, UUID.fromString(messageId))
.build();
session.execute(updateStatement);
MessageReceived message = getReceivedMessageById(recipientId, messageId, true, session);
if (checkAttachmentDeletion && message.isWith_attachments()) {
checkAndDeleteAttachmenForDeletedMessages(message, session);
// Recupera il messaggio per ottenere il timestamp necessario per l'update
MessageReceived receivedMessage = getReceivedMessageById(recipientId, messageId);
if (receivedMessage == null) {
throw new IllegalArgumentException(
"Received Message not found with fromId: " + recipientId + " and messageId: " + messageId);
}
return message;
return deleteMessageReceived(receivedMessage, checkAttachmentDeletion, session);
}
protected MessageSent saveSentMessage(Message message, CqlSession session) {
MessageSent messageSent = new MessageSent(message, false, false, false);
protected MessageReceived deleteMessageReceived(MessageReceived receivedMessage, Boolean checkAttachmentDeletion,
CqlSession session) {
if (session == null) {
session = conn.getKeyspaceSession();
}
// Recupera il messaggio per ottenere il timestamp necessario per l'update
if (receivedMessage == null) {
throw new IllegalArgumentException(
"Message not found");
}
SimpleStatement statement = SimpleStatement.builder(
"INSERT INTO messages_sent (message_id, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.addPositionalValues(
messageSent.getId(),
messageSent.getUserid(),
messageSent.getUser_name(),
messageSent.getAddresses(),
messageSent.getSubject(),
messageSent.getBody(),
messageSent.getTimestamp(),
messageSent.isWith_attachments(),
messageSent.isRead(),
messageSent.isOpened(),
messageSent.isDeleted())
.build();
// Costruisci la query di aggiornamento usando il QueryBuilder
Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_RECEIVED)
.setColumn(Schema.ISDELETED, QueryBuilder.literal(true)) // Imposta il campo 'read' al valore fornito
.whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(receivedMessage.getRecipientid())) // Filtro
// per
// 'from_id'
.whereColumn(Schema.TIMESTAMP)
.isEqualTo(QueryBuilder.literal(receivedMessage.getTimestamp().toInstant()))
.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(receivedMessage.getUUID())); // Filtro
// per
// 'message_id'
SimpleStatement statement = update.build();
String updateString = statement.getQuery();
// Esegui la query di aggiornamento
session.execute(statement);
_log.debug("Wrote sent message with id " + messageSent.getId());
// Retrieve the message after marking it as deleted
receivedMessage = getReceivedMessageById(receivedMessage.getRecipientid(), receivedMessage.getId(), true, null,
session);
return messageSent;
// Check if attachment deletion is required and the message has attachments
if (checkAttachmentDeletion && receivedMessage.isWith_attachments()) {
deleteAttachmentForDeletedMessages(receivedMessage, session);
}
// Return the deleted message
return receivedMessage;
}
protected MessageReceived saveReceivedMessage(Message message, String recipientid, CqlSession session) {
MessageReceived messageReceived = new MessageReceived(message, recipientid, false, false, false);
protected boolean saveSentMessage(Message message, CqlSession session) {
// MessageSent messageSent = new MessageSent(message, false, false, false);
SimpleStatement statement = SimpleStatement.builder(
"INSERT INTO messages_received (message_id, recipientid, userid, user_name, addresses, subject, body, timestamp, with_attachments, read, opened, deleted) "
+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.addPositionalValues(
messageReceived.getId(),
messageReceived.getRecipientid(),
messageReceived.getUserid(),
messageReceived.getUser_name(),
messageReceived.getAddresses(),
messageReceived.getSubject(),
messageReceived.getBody(),
messageReceived.getTimestamp(),
messageReceived.isWith_attachments(),
messageReceived.isRead(),
messageReceived.isOpened(),
messageReceived.isDeleted())
.build();
RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_SENT)
.value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(message.getId())))
.value(Schema.FROM_ID, QueryBuilder.literal(message.getUserid()))
.value(Schema.FROM_NAME, QueryBuilder.literal(message.getUser_name()))
.value(Schema.RECIPIENTS, QueryBuilder.literal(message.getAddresses()))
.value(Schema.SUBJECT, QueryBuilder.literal(message.getSubject()))
.value(Schema.BODY, QueryBuilder.literal(message.getBody()))
.value(Schema.TIMESTAMP, QueryBuilder.literal(message.getTimestamp().toInstant()))
.value(Schema.ISREAD, QueryBuilder.literal(false))
.value(Schema.ISOPENED, QueryBuilder.literal(false))
.value(Schema.ISDELETED, QueryBuilder.literal(false));
session.execute(statement);
session.execute(insert.build());
_log.debug("Wrote sent message with id " + messageReceived.getId());
_log.debug("Wrote sent message with id " + message.getId());
return messageReceived;
return true;
}
protected boolean saveReceivedMessage(Message message, String recipientId, CqlSession session) {
// MessageReceived messageReceived = new MessageReceived(message, recipientId,
// false, false, false);
RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_RECEIVED)
.value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(message.getId())))
.value(Schema.RECIPIENT_ID, QueryBuilder.literal(recipientId))
.value(Schema.FROM_ID, QueryBuilder.literal(message.getUserid()))
.value(Schema.FROM_NAME, QueryBuilder.literal(message.getUser_name()))
.value(Schema.RECIPIENTS, QueryBuilder.literal(message.getAddresses()))
.value(Schema.SUBJECT, QueryBuilder.literal(message.getSubject()))
.value(Schema.BODY, QueryBuilder.literal(message.getBody()))
.value(Schema.TIMESTAMP, QueryBuilder.literal(message.getTimestamp().toInstant()))
.value(Schema.ISREAD, QueryBuilder.literal(false))
.value(Schema.ISOPENED, QueryBuilder.literal(false))
.value(Schema.ISDELETED, QueryBuilder.literal(false));
session.execute(insert.build());
_log.debug("Wrote sent message with id " + message.getId());
return true;
}
protected boolean saveAttachmentMessageEntry(String messageKey, Attachment toSave, CqlSession session) {
@ -4849,24 +5065,20 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
if (session == null)
session = conn.getKeyspaceSession();
// an entry in the Attachment CF
try {
List<BoundStatement> boundStatements = insertIntoAttachments(session, toSave, "message_" + messageKey);
BatchStatement writeBatch = getBatch().addAll(boundStatements);
// boundStatements.forEach(stmt->writeBatch.add(stmt));
ResultSet res = session.execute(writeBatch);
_log.debug(res.getExecutionInfos().toString());
_log.debug("" + res.wasApplied());
/*
* session.execute(createNewaAttachEntry(session).bind(
* UUID.fromString(toSave.getId()),
* UUID.fromString(feedId),
* toSave.getUri(),
* toSave.getName(),
* toSave.getDescription(),
* toSave.getThumbnailURL(),
* toSave.getMimeType()
* ));
*/
RegularInsert insert = QueryBuilder.insertInto(Schema.TABLE_MESSAGES_ATTACHMENTS)
.value(Schema.MESSAGE_ID, QueryBuilder.literal(UUID.fromString(messageKey)))
.value(Schema.ATTACH_ID, QueryBuilder.literal(UUID.fromString(toSave.getId())))
.value(Schema.URI, QueryBuilder.literal(toSave.getUri()))
.value(Schema.NAME, QueryBuilder.literal(toSave.getName()))
.value(Schema.DESCRIPTION, QueryBuilder.literal(toSave.getDescription()))
.value(Schema.URI_THUMBNAIL, QueryBuilder.literal(toSave.getThumbnailURL()))
.value(Schema.MIME_TYPE, QueryBuilder.literal(toSave.getMimeType()));
session.execute(insert.build());
_log.debug("Wrote attachment " + toSave.getName() + " for message with id " + messageKey);
} catch (Exception e) {
@ -4876,78 +5088,114 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
return true;
}
protected MessageSent saveNewMessage(Message message, List<Attachment> attachments, CqlSession session) {
protected boolean saveNewMessage(Message message, List<Attachment> attachments, CqlSession session)
throws IllegalArgumentException {
if (message.getId() == null)
message.setId(Message.generateUUID()); // Generate UUID for the message
boolean has_attachments = attachments != null && !attachments.isEmpty();
message.setWith_attachments(has_attachments);
if (message.getTimestamp() == null) {
message.setTimestamp(new Date());
}
if (message == null) {
throw new IllegalArgumentException("message cannot be null or empty");
}
if (message.getUserid() == null) {
throw new IllegalArgumentException("message from cannot be null");
}
if (message.getId() == null) {
throw new IllegalArgumentException("message id cannot be null");
}
if (message.getAddresses() == null || message.getAddresses().isEmpty()) {
throw new IllegalArgumentException("Recipient addresses cannot be null or empty");
}
if (session == null)
session = conn.getKeyspaceSession();
MessageSent sentMessage = saveSentMessage(message, session);
if (sentMessage == null) {
boolean sent = saveSentMessage(message, session);
if (!sent) {
_log.error("Sent Message failed to be saved: from " + message.getUserid());
return null;
return false;
}
ArrayList<MessageReceived> messagesReceived = new ArrayList<>();
for (String recipientId : message.getAddresses()) {
MessageReceived received = saveReceivedMessage(message, recipientId, session);
boolean received = saveReceivedMessage(message, recipientId, session);
if (received == null) {
if (!received) {
_log.error("Received message failed to be saved: from " + message.getUserid());
// TODO: Manage error
}
messagesReceived.add(received);
// messagesReceived.add(received);
}
_log.debug("Message " + message.getId() + "has been saved");
String postkey = message.getKey();
for (Attachment attachment : attachments) {
boolean attachSaveResult = saveAttachmentMessageEntry(postkey, attachment, session);
if (!attachSaveResult)
_log.error("Some of the attachments failed to be saved: " + attachment.getName());
else {
_log.debug("Attachment " + attachment.getName() + "has been saved for message " + message.getId());
if (attachments != null) {
for (Attachment attachment : attachments) {
boolean attachSaveResult = saveAttachmentMessageEntry(postkey, attachment, session);
if (!attachSaveResult)
_log.error("Some of the attachments failed to be saved: " + attachment.getName());
else {
_log.debug("Attachment " + attachment.getName() + "has been saved for message " + message.getId());
}
}
}
return sentMessage;
return true;
}
@Override
public MessageSent sendMessage(String fromId, List<String> addresses, String subject, String body,
List<Attachment> attachments) {
public Message sendMessage(String fromId, List<String> addresses, String subject, String body,
List<Attachment> attachments) throws IllegalArgumentException {
String messageId = UUID.randomUUID().toString(); // Generate UUID for the message
boolean has_attachments = attachments != null && !attachments.isEmpty();
Message message = new Message(messageId, fromId, null, addresses, subject, body, new Date(), has_attachments);
MessageSent sent = saveNewMessage(message, attachments, null);
if (sent == null)
return null;
return sent;
boolean saved = saveNewMessage(message, attachments, null);
return message;
}
public Message sendMessage(Message message,
List<Attachment> attachments, CqlSession session) throws IllegalArgumentException {
boolean saved = saveNewMessage(message, attachments, session);
return message;
}
@Override
public List<MessageSent> getSentMessagesByUser(String userId, Date timestamp, Integer limit) {
return getSentMessagesByUser(userId, null, timestamp, limit, false, null);
public List<MessageSent> getSentMessagesBySender(String userId, Date timestamp, Integer limit) {
return getSentMessagesBySender(userId, null, timestamp, limit, false, null, null);
}
@Override
public List<MessageReceived> getReceivedMessagesByUser(String recipientId, Date timestamp, Integer limit) {
return getReceivedMessagesByUser(recipientId, null, timestamp, limit, false, null);
public List<MessageReceived> getReceivedMessagesByRecipient(String recipientId, Date timestamp, Integer limit) {
return getReceivedMessagesByRecipient(recipientId, null, timestamp, limit, false, null, null);
}
@Override
public MessageSent getSentMessageById(String userId, String messageId) {
return getSentMessageById(userId, messageId, false, null);
public MessageSent getSentMessageById(String fromId, String messageId) {
return getSentMessageById(fromId, messageId, false, null, null);
}
@Override
public MessageReceived getReceivedMessageById(String recipientId, String messageId) {
return getReceivedMessageById(recipientId, messageId, false, null);
return getReceivedMessageById(recipientId, messageId, false, null, null);
}
@Override
public MessageSent deleteMessageSent(String userId, String messageId) {
return deleteMessageSent(userId, messageId, true, null);
public MessageSent deleteMessageSent(String fromId, String messageId) {
return deleteMessageSent(fromId, messageId, true, null);
}
@Override
@ -4956,57 +5204,84 @@ public final class DBCassandraAstyanaxImpl implements DatabookStore {
}
@Override
public MessageSent setSentMessageRead(String userId, String messageId, boolean set_read) {
public MessageSent setSentMessageRead(String fromId, String messageId, boolean set_read) {
CqlSession session = conn.getKeyspaceSession();
SimpleStatement updateStatement = SimpleStatement.builder(
"UPDATE messages_sent SET read = ? WHERE userid = ? AND message_id = ?")
.addPositionalValues(set_read, userId, UUID.fromString(messageId))
.build();
// Recupera il messaggio per ottenere il timestamp necessario per l'update
MessageSent sentMessage = getSentMessageById(fromId, messageId);
session.execute(updateStatement);
if (sentMessage == null) {
throw new IllegalArgumentException(
"Message not found with fromId: " + fromId + " and messageId: " + messageId);
}
return getSentMessageById(userId, messageId, true, session);
// Costruisci la query di aggiornamento usando il QueryBuilder
Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_SENT)
.setColumn(Schema.ISREAD, QueryBuilder.literal(set_read)) // Imposta il campo 'read' al valore fornito
.whereColumn(Schema.FROM_ID).isEqualTo(QueryBuilder.literal(fromId)) // Filtro per 'from_id'
.whereColumn(Schema.TIMESTAMP).isEqualTo(QueryBuilder.literal(sentMessage.getTimestamp().toInstant()))
.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId))); // Filtro
// per
// 'message_id'
SimpleStatement statement = update.build();
String updateString = statement.getQuery();
// Esegui la query di aggiornamento
session.execute(statement);
// Ritorna il messaggio aggiornato
return getSentMessageById(fromId, messageId, null, null, session);
}
@Override
public MessageReceived setReceivedMessageRead(String recipientId, String messageId, boolean set_read) {
CqlSession session = conn.getKeyspaceSession();
MessageReceived receivedMessage = getReceivedMessageById(recipientId, messageId);
SimpleStatement updateStatement = SimpleStatement.builder(
"UPDATE messages_received SET read = ? WHERE recipientid = ? AND message_id = ?")
.addPositionalValues(set_read, recipientId, UUID.fromString(messageId))
.build();
// Build the update query using QueryBuilder
Update update = QueryBuilder.update(Schema.TABLE_MESSAGES_RECEIVED)
.setColumn(Schema.ISREAD, QueryBuilder.literal(set_read)) // Set the 'read' column to the provided value
.whereColumn(Schema.RECIPIENT_ID).isEqualTo(QueryBuilder.literal(recipientId))
.whereColumn(Schema.TIMESTAMP)
.isEqualTo(QueryBuilder.literal(receivedMessage.getTimestamp().toInstant()))
.whereColumn(Schema.MESSAGE_ID).isEqualTo(QueryBuilder.literal(UUID.fromString(messageId)));
session.execute(updateStatement);
// Execute the update query
session.execute(update.build());
return getReceivedMessageById(recipientId, messageId, true, session);
// Return the updated message
return getReceivedMessageById(recipientId, messageId);
// Return the updated message
// return getReceivedMessageById(recipientId, messageId, null, null, session);
}
@Override
public Attachment getMessageAttachmentById(String messageid) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'getMessageAttachmentById'");
}
public Attachment getMessageAttachmentById(String messageid, String attachid) {
@Override
public Message getMessageById(String messageId) {
return getSentMessageById(null, messageId);
}
@Override
public Message deleteMessage(String messageId) {
CqlSession session = conn.getKeyspaceSession();
MessageSent message = getSentMessageById(null, messageId, true, session);
for (String recipientId : message.getAddresses()) {
deleteMessageReceived(recipientId, messageId, false, session);
if (attachid == null) {
throw new NullArgumentException("the timeInMillis must be before today");
}
List<Attachment> results = retrieveAttachmentsMessage(messageid, attachid, null);
if (results.size() > 1) {
throw new IllegalArgumentException("more than 1 attachment retrieved");
}
deleteMessageSent(message.getUserid(), messageId, true, session);
getSentMessageById(null, messageId);
return getSentMessageById(null, messageId, true, session);
if (results.size() > 0) {
return results.get(0);
}
return null;
}
@Override
public List<Attachment> getMessageAttachmentsById(String messageId) {
return retrieveAttachmentsMessage(messageId, null, null);
}
// @Override
// public Message getMessageById(String messageId) {
// return getSentMessageById(null, messageId);
// }
}

View File

@ -1,9 +1,6 @@
package org.gcube.portal.databook.server;
import java.util.UUID;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class DatabookCassandraTest {

View File

@ -679,23 +679,30 @@ public interface DatabookStore {
*/
void closeConnection();
public MessageSent sendMessage(String fromId, List<String> addresses, String subject, String body, List<Attachment> attachments);
public Message sendMessage(String fromId, List<String> addresses, String subject, String body, List<Attachment> attachments);
public List<MessageSent> getSentMessagesByUser(String fromId, Date timestamp, Integer limit);
public List<MessageReceived> getReceivedMessagesByUser(String recipientId, Date timestamp, Integer limit);
public List<MessageSent> getSentMessagesBySender(String fromId, Date timestamp, Integer limit);
public List<MessageReceived> getReceivedMessagesByRecipient(String recipientId, Date timestamp, Integer limit);
public Message getMessageById(String messageId);
public MessageSent getSentMessageById(String messageId, String fromId);
// public Message getMessageById(String messageId);
public MessageSent getSentMessageById(String fromId, String messageId);
public MessageReceived getReceivedMessageById(String messageId, String recipientId);
public Message deleteMessage(String messageId);
public MessageSent deleteMessageSent(String messageId, String fromId);
public MessageSent deleteMessageSent(String fromId, String messageId);
public MessageReceived deleteMessageReceived(String messageId, String recipientId);
public MessageSent setSentMessageRead(String messageId, String fromId, boolean set_read);
public MessageReceived setReceivedMessageRead(String messageId, String recipientId, boolean set_read);
public MessageSent setSentMessageRead(String fromId, String messageId, boolean set_read);
public MessageReceived setReceivedMessageRead(String recipientId, String messageId, boolean set_read);
public Attachment getMessageAttachmentById(String messageId);
public Attachment getMessageAttachmentById(String messageId, String filename);
public List<Attachment> getMessageAttachmentsById(String messageId);
public List<MessageSent> getSentMessagesBySender(String fromId, String messageId, Date timestamp, Integer limit,
Boolean filter_deleted, Boolean filter_read, CqlSession session);
public List<MessageReceived> getReceivedMessagesByRecipient(String recipientId, String messageId, Date timestamp, Integer limit,
Boolean filter_deleted, Boolean filter_read, CqlSession session);
// public MessageSent saveSentMessage(Message message, CqlSession session);
// public MessageReceived saveReceivedMessage(Message message, String recipientId, CqlSession session);

View File

@ -58,6 +58,22 @@ public class RunningCluster implements Serializable {
private static final String DEFAULT_CONFIGURATION = "/org/gcube/portal/databook/server/resources/databook.properties";
private static RunningCluster singleton;
public static synchronized RunningCluster setCustomizedInstance(String host, String datacenterName,
String keyspaceName) {
singleton = new RunningCluster(host, datacenterName, keyspaceName);
String[] params = {
singleton.host, singleton.datacenterName, singleton.keyspaceName
};
_log.info(
"socialdb will use custom configuration host:{}, datacenter:{}, keyspace: {}",
params);
return singleton;
}
/**
* Host
*/
@ -158,6 +174,10 @@ public class RunningCluster implements Serializable {
}
} catch (Exception e) {
e.printStackTrace();
_log.error("Error getting configuration from IS: {} - {}", e.getMessage());
_log.error("Error {}", e);
// throw e;
}
/*

View File

@ -1,5 +1,7 @@
package org.gcube.portal.databook.server;
import com.datastax.oss.driver.api.core.CqlIdentifier;
/**
* @author Massimiliano Assante ISTI-CNR
* @author Ahmed Ibrahim ISTI-CNR
@ -73,4 +75,51 @@ public class Schema {
public static final String COMMENTS_NO = "commentsno"; //big int
public static final String LINK_TITLE = "linktitle"; //text
// messages
public static final String TABLE_MESSAGES_SENT = "messages_sent";
public static final String TABLE_MESSAGES_RECEIVED = "messages_received";
public static final String TABLE_MESSAGES_ATTACHMENTS = "messages_attachments";
public static final String IDX_MESSAGES_SENT_ID = "message_sent_id_idx";
public static final String IDX_MESSAGES_SENT_READ = "message_sent_read_idx";
public static final String IDX_MESSAGES_SENT_DELETED = "message_sent_deleted_idx";
public static final String IDX_MESSAGES_SENT_TIMESTAMP = "message_sent_timestamp_idx";
public static final String IDX_MESSAGES_RECEIVED_ID = "message_received_id_idx";
public static final String IDX_MESSAGES_RECEIVED_READ = "message_received_read_idx";
public static final String IDX_MESSAGES_RECEIVED_DELETED = "message_received_deleted_idx";
public static final String IDX_MESSAGES_RECEIVED_TIMESTAMP = "message_received_timestamp_idx";
public static final String TABLE_MESSAGES_SENT_ACTIVE = "messages_active_sent";
public static final String TABLE_MESSAGES_RECEIVED_ACTIVE = "messages_active_received";
public static final String TABLE_MESSAGES_SENT_READ = "messages_unread_sent";
public static final String TABLE_MESSAGES_RECEIVED_READ = "messages_unread_received";
// public static final String URI = "uri"; //text
// public static final String URI_THUMBNAIL = "urithumbnail"; //text
// public static final String NAME = "name"; //text
// public static final String DESCRIPTION = "description"; //text
// public static final String MIME_TYPE = "mimetype"; //text
public static final String FROM_ID = "from_id"; //text
public static final String FROM_NAME = "from_name"; //text
// public static final String TIMESTAMP = "timestamp"; //text
public static final String MESSAGE_ID = "message_id"; //text
public static final String RECIPIENT_ID = "recipient_id"; //text
public static final String RECIPIENTS = "addresses"; //text
// public static final String RECIPIENTS = "recipients"; //text
public static final String SUBJECT = "subject"; //text
public static final String BODY = "body"; //text
public static final String WITH_ATTACH = "with_attachments"; //text
public static final String ISREAD = "read"; //text
public static final String ISOPENED = "opened"; //text
public static final String ISDELETED = "deleted"; //text
}

View File

@ -1,53 +0,0 @@
package org.gcube.portal.databook.server;
import org.gcube.portal.databook.shared.*;
import org.gcube.portal.databook.shared.ex.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author Massimiliano Assante ISTI-CNR
* @author Ahmed Ibrahim ISTI-CNR
*
* @version 2.0.0 October 2023
*
*/
public class Tester {
private static DBCassandraAstyanaxImpl store;
private static Logger LOGGER = LoggerFactory.getLogger(Tester.class);
public Tester() {
store = new DBCassandraAstyanaxImpl("gcube"); //set to true if you want to drop the KeySpace and recreate it
}
public static void main(String[] args) throws ColumnNameNotFoundException, PrivacyLevelTypeNotFoundException, FeedIDNotFoundException, FeedTypeNotFoundException {
Tester test = new Tester();
//test.getComment();
test.testFunc();
System.exit(0);
}
public void testFunc() throws ColumnNameNotFoundException, PrivacyLevelTypeNotFoundException, FeedIDNotFoundException, FeedTypeNotFoundException {
String postIdToUpdate = "047c601d-2291-4974-9224-d6732b1fbe26";
Post read = store.readPost(postIdToUpdate);
List<Comment> readC = store.getAllCommentByPost("047c601d-2291-4974-9224-d6732b1fbe26");
System.out.println(read);
readC.forEach(c -> System.out.println(c.getText()));
}
public void getComment(){
String uuid = "820969b2-4632-4197-9fd6-5aafab781faa";
Comment c;
try {
c = store.readCommentById(uuid);
System.out.println(c);
} catch (CommentIDNotFoundException e) {
// TODO Auto-generated catch block
System.err.println(e.toString());
}
}
}

View File

@ -3,6 +3,7 @@ package org.gcube.portal.databook.shared;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
import java.util.UUID;
/**
*
@ -57,6 +58,10 @@ public class Message implements Serializable, Comparable<Message> {
// protected Date creation_time;
protected Date timestamp;
public UUID getUUID() {
return UUID.fromString(getId());
}
public String getId() {
return id;
}
@ -130,6 +135,10 @@ public class Message implements Serializable, Comparable<Message> {
super();
}
public static String generateUUID(){
UUID uuid = UUID.randomUUID();
return uuid.toString();
}
public Message(String id, String userid, String user_name, List<String> addresses, String subject, String body,
Date timestamp, boolean with_attachments ) {
@ -139,7 +148,7 @@ public class Message implements Serializable, Comparable<Message> {
this.user_name = user_name;
this.addresses = addresses;
this.subject = subject;
this.body = subject;
this.body = body;
this.timestamp = timestamp;
this.with_attachments = with_attachments;
}

View File

@ -69,7 +69,7 @@ public class BaseDbTest {
assertNotNull(store);
}
public CassandraClusterConnection getConnection() throws Exception {
return new CassandraClusterConnection(false, null);
return new CassandraClusterConnection(false, false, null);
}
@Test

View File

@ -0,0 +1,662 @@
package org.gcube.portal.databook.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.gcube.portal.databook.shared.Attachment;
import org.gcube.portal.databook.shared.Message;
import org.gcube.portal.databook.shared.MessageReceived;
import org.gcube.portal.databook.shared.MessageSent;
import org.gcube.portal.databook.shared.Notification;
import org.gcube.portal.databook.shared.ex.NotificationTypeNotFoundException;
import org.junit.Before;
import org.junit.Test;
import com.datastax.oss.driver.api.core.CqlSession;
public class MessagesTest extends BaseDbTest{
private CassandraClusterConnection connection;
private DBCassandraAstyanaxImpl store;
private String testFrom;
private String recipient;
private String recipient2;
private String subject;
private String body;
private List<String> recipients;
@Before
public void setUp() throws Exception {
// connection = DbTest.getConnection();
testFrom = "alfredo.oliviero@isti.cnr.it";
recipient = "massimiliano.assante@isti.cnr.it";
recipient2 = "andrea.rossi@isti.cnr.it";
subject = "test subject";
body = "test body";
recipients = Arrays.asList(recipient, recipient2);
}
@Test
public void empty() {
System.out.print("Testing works");
return;
}
@Test
public void testCreateMessage() throws Exception {
Message message = store.sendMessage(testFrom, recipients, subject, body, null);
assertNotNull("Message should not be null after creation", message);
assertEquals("User ID does not match", testFrom, message.getUserid());
assertEquals("Recipient does not match", recipient, message.getAddresses().get(0));
assertEquals("Subject does not match", subject, message.getSubject());
assertEquals("Body does not match", body, message.getBody());
MessageSent messageSent = store.getSentMessageById(testFrom, message.getId());
assertNotNull("Sent message should not be null", messageSent);
assertEquals("User ID of sent message does not match", testFrom, messageSent.getUserid());
assertEquals("Recipient of sent message does not match", recipient, messageSent.getAddresses().get(0));
assertEquals("Subject of sent message does not match", subject, messageSent.getSubject());
assertEquals("Body of sent message does not match", body, messageSent.getBody());
MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId());
assertNotNull("Received message should not be null", messageReceived);
assertEquals("User ID of received message does not match", testFrom, messageReceived.getUserid());
assertEquals("Recipient of received message does not match", recipient, messageReceived.getRecipientid());
assertEquals("Subject of received message does not match", subject, messageReceived.getSubject());
assertEquals("Body of received message does not match", body, messageReceived.getBody());
MessageReceived messageReceived2 = store.getReceivedMessageById(recipient2, message.getId());
assertNotNull("Received2 message should not be null", messageReceived2);
assertEquals("User ID of received2 message does not match", testFrom, messageReceived2.getUserid());
assertEquals("Recipient of received2 message does not match", recipient2, messageReceived2.getRecipientid());
assertEquals("Subject of received2 message does not match", subject, messageReceived2.getSubject());
assertEquals("Body of received2 message does not match", body, messageReceived2.getBody());
}
@Test
public void testMessagesReceivedByRecipient() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testMessagesReceivedByRecipient", body, null);
List<MessageReceived> receiveds = store.getReceivedMessagesByRecipient(recipient, null, null);
MessageReceived received = receiveds.get(0);
assertNotNull(received);
}
@Test
public void testUnreadMessagesReceivedByRecipient() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testUnreadMessagesReceivedByRecipient", body, null);
List<MessageReceived> receiveds = store.getReceivedMessagesByRecipient(recipient, null, null, 1, false, false,
null);
MessageReceived received = receiveds.get(0);
assertFalse(received.isRead());
assertNotNull(received);
}
@Test
public void testMessageReceivedById() throws Exception {
// String msg_id = "d78e096e-df88-4783-9e8e-32036158f54b";
Message message = store.sendMessage(testFrom, recipients, "testMessageReceivedById", body, null);
String msg_id = message.getId();
MessageReceived received = store.getReceivedMessageById(recipient, msg_id);
assertNotNull(received);
assertEquals(received.getId(), msg_id);
}
@Test
public void testSetSentMessageReadUnread() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testSetSentMessageReadUnread", body, null);
assertNotNull(message);
String recipient = recipients.get(0);
String recipient1 = recipients.get(1);
MessageSent messageSent = store.getSentMessageById(testFrom, message.getId());
MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId());
MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageSent);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
MessageSent sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), true);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(sentMessageUpdate);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertTrue(messageSent.isRead());
assertTrue(sentMessageUpdate.isRead());
assertFalse(messageReceived.isRead());
assertFalse(messageReceived1.isRead());
sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), false);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(sentMessageUpdate);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertFalse(messageSent.isRead());
assertFalse(sentMessageUpdate.isRead());
assertFalse(messageReceived.isRead());
assertFalse(messageReceived1.isRead());
sentMessageUpdate = store.setSentMessageRead(testFrom, message.getId(), true);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(sentMessageUpdate);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertTrue(messageSent.isRead());
assertTrue(sentMessageUpdate.isRead());
assertFalse(messageReceived.isRead());
assertFalse(messageReceived1.isRead());
}
@Test
public void testSetReceivedMessageReadUnread() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testSetReceivedMessageReadUnread", body, null);
assertNotNull(message);
String recipient = recipients.get(0);
String recipient1 = recipients.get(1);
MessageSent messageSent = store.getSentMessageById(testFrom, message.getId());
MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId());
MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageSent);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
// set read for recipient
MessageReceived messageUpdated = store.setReceivedMessageRead(recipient, message.getId(), true);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageUpdated);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertTrue(messageReceived.isRead());
assertTrue(messageUpdated.isRead());
assertFalse(messageSent.isRead());
assertFalse(messageReceived1.isRead());
// set not read for recipient
messageUpdated = store.setReceivedMessageRead(recipient, message.getId(), false);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageUpdated);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertFalse(messageReceived.isRead());
assertFalse(messageUpdated.isRead());
assertFalse(messageSent.isRead());
assertFalse(messageReceived1.isRead());
// set read for recipient2
messageUpdated = store.setReceivedMessageRead(recipient2, message.getId(), true);
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageUpdated);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertTrue(messageReceived1.isRead());
assertTrue(messageUpdated.isRead());
assertFalse(messageSent.isRead());
assertFalse(messageReceived.isRead());
}
@Test
public void testDeleteSentMessage() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testDeleteSentMessage", body, null);
assertNotNull(message);
String recipient = recipients.get(0);
String recipient1 = recipients.get(1);
MessageSent messageSent = store.getSentMessageById(testFrom, message.getId());
MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId());
MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageSent);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertFalse(messageSent.isDeleted());
assertFalse(messageReceived.isDeleted());
assertFalse(messageReceived.isDeleted());
MessageSent deletedMessage = store.deleteMessageSent(testFrom, message.getId());
assertNotNull(deletedMessage);
assertTrue(deletedMessage.isDeleted());
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNull(messageSent);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
}
@Test
public void testDeleteReceivedMessage() throws Exception {
Message message = store.sendMessage(testFrom, recipients, "testDeleteReceivedMessage", body, null);
assertNotNull(message);
String recipient = recipients.get(0);
String recipient1 = recipients.get(1);
MessageSent messageSent = store.getSentMessageById(testFrom, message.getId());
MessageReceived messageReceived = store.getReceivedMessageById(recipient, message.getId());
MessageReceived messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageSent);
assertNotNull(messageReceived);
assertNotNull(messageReceived1);
assertFalse(messageSent.isDeleted());
assertFalse(messageReceived.isDeleted());
assertFalse(messageReceived.isDeleted());
MessageReceived deletedMessage = store.deleteMessageReceived(recipient, message.getId());
assertNotNull(deletedMessage);
assertTrue(deletedMessage.isDeleted());
messageSent = store.getSentMessageById(testFrom, message.getId());
messageReceived = store.getReceivedMessageById(recipient, message.getId());
messageReceived1 = store.getReceivedMessageById(recipient1, message.getId());
assertNotNull(messageSent);
assertNull(messageReceived);
assertNotNull(messageReceived1);
}
@Test(expected = IllegalArgumentException.class)
public void testGetInvalidIdMessage() {
store.getSentMessageById(testFrom, "not-valid-id");
}
@Test
public void testGetNonExistentMessage() {
MessageSent messageSent = store.getSentMessageById(testFrom, "ffffaaaa-9d90-4cc8-a634-bf158c7cc068");
assertNull("Expected null for non-existent message", messageSent);
}
@Test
public void testGetNonExistentFrom() {
MessageSent messageSent = store.getSentMessageById("aaaa", "ffffaaaa-9d90-4cc8-a634-bf158c7cc068");
assertNull("Expected null for non-existent message", messageSent);
}
// @Test
// public void testSaveNewMessageWithNullMessage() {
// store.saveNewMessage(null, null, null); // Message is null
// assertEquals("message cannot be null or empty", exception.getMessage());
// }
@Test(expected = IllegalArgumentException.class)
public void testSaveNewMessageWithNullUserId() {
store.sendMessage(null, recipients, subject, body, null); // User ID is null
}
@Test(expected = IllegalArgumentException.class)
public void testSaveNewMessageWithNullAddresses() {
store.sendMessage(testFrom, null, subject, body, null); // Addresses are null
}
@Test(expected = IllegalArgumentException.class)
public void testSaveNewMessageWithEmptyAddresses() {
store.sendMessage(testFrom, Arrays.asList(), subject, body, null); // Addresses are empty
}
@Test
public void testGetSentMessagesOnlyNonDeleted() throws Exception {
// Invia un messaggio normale
Message message1 = store.sendMessage(testFrom, recipients, subject, body, null);
// Invia un altro messaggio che verrà cancellato
Message message2 = store.sendMessage(testFrom, recipients, subject + " deleted", body, null);
// Cancella il secondo messaggio
store.deleteMessageSent(testFrom, message2.getId());
// Recupera tutti i messaggi inviati dall'utente
List<MessageSent> sentMessages = store.getSentMessagesBySender(testFrom, null, 10);
// Verifica che non contenga il messaggio cancellato
assertNotNull("Sent messages list should not be null", sentMessages);
assertFalse("Sent messages list should not contain deleted messages",
sentMessages.stream().anyMatch(m -> m.getId().equals(message2.getId())));
// Verifica che il messaggio non cancellato sia ancora presente
assertTrue("Sent messages list should contain non-deleted message",
sentMessages.stream().anyMatch(m -> m.getId().equals(message1.getId())));
}
@Test
public void testSentMessagesOrderedByTimestampDescending() throws Exception {
// Invia tre messaggi con timestamp differenti
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
Thread.sleep(1000); // Attendi un altro secondo
Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null);
// Recupera tutti i messaggi inviati dall'utente
List<MessageSent> sentMessages = store.getSentMessagesBySender(testFrom, null, 10);
// Verifica che la lista non sia nulla e contenga almeno tre messaggi
assertNotNull("Sent messages list should not be null", sentMessages);
assertTrue("Sent messages list should contain at least 3 messages", sentMessages.size() >= 3);
// Verifica che i messaggi siano ordinati in modo decrescente in base al
// timestamp
Date timestamp1 = sentMessages.get(0).getTimestamp();
Date timestamp2 = sentMessages.get(1).getTimestamp();
Date timestamp3 = sentMessages.get(2).getTimestamp();
assertTrue("First message should be newer than second", timestamp1.after(timestamp2));
assertTrue("Second message should be newer than third", timestamp2.after(timestamp3));
}
@Test
public void testReceivedMessagesOrderedByTimestampDescending() throws Exception {
// Invia tre messaggi con timestamp differenti
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
Thread.sleep(1000); // Attendi un altro secondo
Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null);
// Recupera tutti i messaggi ricevuti dal destinatario
List<MessageReceived> receivedMessages = store.getReceivedMessagesByRecipient(recipient, null, 10);
// Verifica che la lista non sia nulla e contenga almeno tre messaggi
assertNotNull("Received messages list should not be null", receivedMessages);
assertTrue("Received messages list should contain at least 3 messages", receivedMessages.size() >= 3);
// Verifica che i messaggi siano ordinati in modo decrescente in base al
// timestamp
Date timestamp1 = receivedMessages.get(0).getTimestamp();
Date timestamp2 = receivedMessages.get(1).getTimestamp();
Date timestamp3 = receivedMessages.get(2).getTimestamp();
assertTrue("First message should be newer than second", timestamp1.after(timestamp2));
assertTrue("Second message should be newer than third", timestamp2.after(timestamp3));
}
@Test
public void testUnreadMessagesOnlyReturned() throws Exception {
// Invia un messaggio normale
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
// Invia un altro messaggio che verrà marcato come letto
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
store.setReceivedMessageRead(recipient, message2.getId(), true); // Marca come letto
// Recupera solo i messaggi non letti
List<MessageReceived> unreadMessages = store.getReceivedMessagesByRecipient(recipient, null, null, 10, false,
false, null);
assertNotNull("Unread messages list should not be null", unreadMessages);
assertTrue("Unread messages list should contain only unread messages",
unreadMessages.stream().allMatch(m -> !m.isRead()));
}
@Test
public void testMessagesWithLimitAndOrder() throws Exception {
// Invia tre messaggi
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
Thread.sleep(1000); // Attendi 1 secondo
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
Thread.sleep(1000);
Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null);
// Recupera i primi due messaggi inviati, ordinati in modo decrescente per
// timestamp
List<MessageSent> sentMessages = store.getSentMessagesBySender(testFrom, null, 2);
assertNotNull("Sent messages list should not be null", sentMessages);
assertEquals("Sent messages list should contain 2 messages", 2, sentMessages.size());
// Verifica che i messaggi siano ordinati per timestamp decrescente
Date timestamp1 = sentMessages.get(0).getTimestamp();
Date timestamp2 = sentMessages.get(1).getTimestamp();
assertTrue("First message should be newer than second", timestamp1.after(timestamp2));
}
@Test
public void testRetrieveMessagesWithLimit() throws Exception {
int count = 4;
for (int i = 0; i < count; i++) {
store.sendMessage(testFrom, recipients, subject + " " + i, body, null);
}
;
// Recupera solo 2 messaggi
List<MessageSent> sentMessages = store.getSentMessagesBySender(testFrom, null, 2);
assertNotNull("Sent messages list should not be null", sentMessages);
assertEquals("Only 2 messages should be returned", 2, sentMessages.size());
}
@Test
public void testGetSentMessagesBySenderWithTimestampFilter() throws Exception {
// Invia tre messaggi con timestamp differenti
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
Thread.sleep(1000); // Attendi un altro secondo
Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null);
// Usa il timestamp del secondo messaggio per il test
Date filterTimestamp = message2.getTimestamp();
// Recupera tutti i messaggi con timestamp <= filterTimestamp
List<MessageSent> sentMessages = store.getSentMessagesBySender(testFrom, filterTimestamp, 10);
// Verifica che la lista non sia nulla
assertNotNull("Sent messages list should not be null", sentMessages);
// Verifica che i messaggi abbiano timestamp <= filterTimestamp
for (MessageSent message : sentMessages) {
assertTrue("Message timestamp should be less than or equal to the filter timestamp",
message.getTimestamp().compareTo(filterTimestamp) <= 0);
}
// Verifica che il terzo messaggio (con timestamp maggiore) non sia nella lista
assertFalse("Message with timestamp greater than filterTimestamp should not be returned",
sentMessages.stream().anyMatch(m -> m.getId().equals(message3.getId())));
}
@Test
public void testGetReceivedMessagesByRecipientWithTimestampFilter() throws Exception {
// Invia tre messaggi con timestamp differenti
Message message1 = store.sendMessage(testFrom, recipients, subject + "1", body, null);
Thread.sleep(1000); // Attendi 1 secondo per garantire un timestamp diverso
Message message2 = store.sendMessage(testFrom, recipients, subject + "2", body, null);
Thread.sleep(1000); // Attendi un altro secondo
Message message3 = store.sendMessage(testFrom, recipients, subject + "3", body, null);
// Usa il timestamp del secondo messaggio per il test
Date filterTimestamp = message2.getTimestamp();
// Recupera tutti i messaggi ricevuti con timestamp <= filterTimestamp
List<MessageReceived> receivedMessages = store.getReceivedMessagesByRecipient(recipient, filterTimestamp, 10);
// Verifica che la lista non sia nulla
assertNotNull("Received messages list should not be null", receivedMessages);
// Verifica che i messaggi abbiano timestamp <= filterTimestamp
for (MessageReceived message : receivedMessages) {
assertTrue("Message timestamp should be less than or equal to the filter timestamp",
message.getTimestamp().compareTo(filterTimestamp) <= 0);
}
// Verifica che il terzo messaggio (con timestamp maggiore) non sia nella lista
assertFalse("Message with timestamp greater than filterTimestamp should not be returned",
receivedMessages.stream().anyMatch(m -> m.getId().equals(message3.getId())));
}
@Test
public void testGetEmptyAttachmentByMessageId() throws Exception {
Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, null);
List<Attachment> attachs = store.getMessageAttachmentsById(msg.getId());
assertEquals("attachs list should be empty", attachs.size(), 0);
}
@Test
public void testGetSingleAttachmentByMessageId() throws Exception {
Attachment attach = new Attachment();
String attachId = UUID.randomUUID().toString(); // Generate UUID for the message
String filename = "file name";
String description = "file description";
String uri = "uri";
String thumb_url = "thumb url";
attach.setId(attachId);
attach.setName(filename);
attach.setDescription(description);
attach.setUri(uri);
attach.setThumbnailURL(thumb_url);
List<Attachment> attachments = Arrays.asList(attach);
Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, attachments);
List<Attachment> attachs = store.getMessageAttachmentsById(msg.getId());
assertEquals("attachs list should have 1 element", 1, attachs.size());
Attachment obtained_attach = attachs.get(0);
assertEquals("attachs list should have 1 element", attach.getId(), obtained_attach.getId());
checkEqualAttachments(attach, obtained_attach);
}
@Test
public void testMultiSingleAttachmentByMessageId() throws Exception {
String filename = "file name";
String description = "file description";
String uri = "uri";
String thumb_url = "thumb url";
List<Attachment> attachments = new ArrayList<Attachment>();
int count = 10;
for (int i = 0; i < count; i++) {
Attachment attach = new Attachment();
String attachId = UUID.randomUUID().toString(); // Generate UUID for the message
attach.setId(attachId);
attach.setName(filename + " " + i);
attach.setDescription(description + " " + i);
attach.setUri(uri + " " + i);
attach.setThumbnailURL(thumb_url + " " + i);
attachments.add(attach);
}
Message msg = store.sendMessage(testFrom, recipients, "testGetAttachmentByMessageId", body, attachments);
String msg_id = msg.getId();
List<Attachment> attachs_by_message_id = store.getMessageAttachmentsById(msg_id);
assertEquals("attachs list should have element", count, attachs_by_message_id.size());
for (Attachment attach : attachments) {
String attach_id = attach.getId();
Attachment by_message_id = attachs_by_message_id.stream().filter(c -> c.getId().equals(attach_id))
.findFirst().orElse(null);
assertNotNull("cannot obtain attach ", attach_id + "by msg id " + msg_id);
checkEqualAttachments(attach, by_message_id);
Attachment by_attach_id = store.getMessageAttachmentById(msg_id, attach_id);
assertNotNull("cannot obtain attach by id", attach_id);
checkEqualAttachments(attach, by_attach_id);
}
}
private boolean checkEqualAttachments(Attachment attach, Attachment obtained_attach) {
assertEquals("wrong attachId", attach.getId(), obtained_attach.getId());
assertEquals("wrong filename", attach.getName(), obtained_attach.getName());
assertEquals("wrong description", attach.getDescription(), obtained_attach.getDescription());
assertEquals("wrong uri", attach.getUri(), obtained_attach.getUri());
assertEquals("wrong thumb uri", attach.getThumbnailURL(), obtained_attach.getThumbnailURL());
return true;
}
@Test
public void populate_messages() throws NotificationTypeNotFoundException {
CqlSession session = store.getSession();
Date timestamp = null;
// Integer limit = 100000;
Integer limit = 100;
List<String> destinatari = Arrays.asList("alfredo.oliviero", "massimiliano.assante", "andrea.rossi");
List<Notification> notifications = store.getAllNotifications(timestamp, limit, session);
for (Notification n : notifications) {
// protected MessageUser sender;
String sender = n.getSenderid();
Message m = new Message(Message.generateUUID(), n.getSenderid(), n.getSenderFullName(), destinatari,
n.getSubjectid().toString(), n.getDescription(), n.getTime(), true);
Attachment attach = new Attachment(Message.generateUUID(), n.getUri(), m.getId(), n.getDescription(),
n.getSenderThumbnail(), "image/png");
store.sendMessage(m, Arrays.asList(attach), session);
if (!destinatari.contains(sender))
destinatari.set(0, sender);
System.out.println("created message " + m);
}
}
}