From 869802c6e0191a4cac76fd6f90aee9d2163e7928 Mon Sep 17 00:00:00 2001 From: Alfredo Oliviero Date: Wed, 31 Jul 2024 14:00:24 +0200 Subject: [PATCH] fixing synchronized import. still not working --- scripts/setup.sh | 116 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 9 deletions(-) diff --git a/scripts/setup.sh b/scripts/setup.sh index 1b443fd..8d1185d 100755 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -10,20 +10,75 @@ log "RUNNING SETUP" KEYSPACE="dev_keyspace_1" DUMP_DIR="/dump" # Ensure DUMP_DIR is defined -SNAPSHOT_DIR="$DUMP_DIR/snapshot" # Ensure DUMP_DIR is defined SCHEMA_PATH="$DUMP_DIR/schema/${KEYSPACE}_schema.cql" # Ensure DUMP_DIR is defined -DUMP_TAG="dev_keyspace_1" - -# Determine the IP address of the current node +CASSANDRA_SEEDS="cassandra1,cassandra2,cassandra3" +STATUS_DIR="/var/log/cassandra" IP_ADDRESS=$(hostname -I | awk '{print $1}') -# Log the IP address for debugging -log "Node IP Address: $IP_ADDRESS" +# Initialize SEEDS array +SEEDS=(${CASSANDRA_SEEDS//,/ }) + +# Function to wait for all nodes to be in the 'UN' state +wait_for_all_nodes_up() { + SEEDS=(${CASSANDRA_SEEDS//,/ }) + while true; do + all_up=true + for seed in "${SEEDS[@]}"; do + NODE_STATUS=$(nodetool status -r) + if ! echo "$NODE_STATUS" | grep -E "^UN.*$seed" > /dev/null; then + if [ "$seed" = "$CASSANDRA_RPC_ADDRESS" ]; then + NODE_STATUS=$(nodetool status) + if ! echo "$NODE_STATUS" | grep -E "^UN.*$(hostname -I | awk '{print $1}')" > /dev/null; then + log "Node $seed (self) is not up yet..." + all_up=false + break + fi + else + log "Node $seed is not up yet..." + all_up=false + break + fi + fi + done + if [ "$all_up" = true ]; then + log "All nodes are up." + break + else + sleep 5 + fi + done +} + +# Function to wait for schema agreement across all nodes +wait_for_schema_agreement() { + while true; do + if cqlsh $IP_ADDRESS -e "DESCRIBE KEYSPACE $KEYSPACE;" > /dev/null 2>&1; then + if nodetool describecluster | grep -q "Schema versions:"; then + SCHEMA_COUNT=$(nodetool describecluster | grep -A 1 "Schema versions:" | wc -l) + if [ "$SCHEMA_COUNT" -eq 2 ]; then + log "Schema agreement reached." + break + else + log "Waiting for schema agreement..." + fi + fi + else + log "Waiting for keyspace $KEYSPACE to be available..." + fi + sleep 5 + done +} + +log "setup KEYSPACE: $KEYSPACE" +log "setup DUMP_DIR: $DUMP_DIR" +log "setup SCHEMA_PATH: $SCHEMA_PATH" +log "setup CASSANDRA_SEEDS: $CASSANDRA_SEEDS" +log "setup STATUS_DIR: $STATUS_DIR" # Wait for cassandra1 to be ready if this is not the primary node if [ "$PRIMARY_NODE" != "true" ]; then - log "Waiting for cassandra1 to be ready..." - /wait-for-it.sh cassandra1:9042 -t 60 -- log "cassandra1 is ready" + log "Waiting for cassandra1 to be ready..." + /wait-for-it.sh cassandra1:9042 -t 60 -- log "cassandra1 is ready" fi # Start Cassandra in the background @@ -32,12 +87,55 @@ cassandra -R & # Wait for Cassandra to be ready log "Waiting for Cassandra to start..." until cqlsh $IP_ADDRESS -e "SHOW HOST" > /dev/null 2>&1; do - sleep 2 + sleep 2 done # Log the value of PRIMARY_NODE for debugging log "PRIMARY_NODE is set to: $PRIMARY_NODE" +# Step 1: Wait for all nodes to be up and ready +log "Waiting for all nodes to be up and ready..." +wait_for_all_nodes_up + +# Step 2: Create keyspace and schema on the primary node +if [ "$PRIMARY_NODE" = "true" ]; then + log "Checking if keyspace $KEYSPACE exists..." + if ! cqlsh $IP_ADDRESS -e "DESCRIBE KEYSPACE $KEYSPACE;" > /dev/null 2>&1; then + log "Keyspace $KEYSPACE does not exist. Creating keyspace and tables..." + cqlsh $IP_ADDRESS -f "$SCHEMA_PATH" + else + log "Keyspace $KEYSPACE already exists. Ensuring tables exist..." + fi + + # Signal to secondary nodes that schema creation is complete + touch $STATUS_DIR/schema_created +fi + +# Step 3: Wait for schema to be created and agreed upon across all nodes +log "Waiting for schema agreement across all nodes..." +wait_for_schema_agreement + +# Step 4: Import data using sstableloader if not previously imported +if [ "$PRIMARY_NODE" = "true" ]; then + log "Importing snapshots using sstableloader..." + for TABLE_DIR in $(ls $DUMP_DIR); do + TABLE_NAME=$(basename $TABLE_DIR) # Extract table name from directory name + log "Importing table: $TABLE_NAME from directory: $DUMP_DIR/$TABLE_DIR" + sstableloader -d "$CASSANDRA_SEEDS" -v -k "$KEYSPACE" "$DUMP_DIR/$TABLE_DIR" + cqlsh $IP_ADDRESS -k "$KEYSPACE" -e "select count(*) from $TABLE_NAME;" >&2 + done + + # Signal to secondary nodes that import is complete + touch $STATUS_DIR/import_complete +else + # Wait for import completion signal from primary node + log "Waiting for import completion signal from primary node..." + while [ ! -f "$STATUS_DIR/import_complete" ]; do + sleep 5 + done +fi + +log "FINISHED IMPORT" # Keep the container running tail -f /dev/null