fixing synchronized import. still not working

This commit is contained in:
Alfredo Oliviero 2024-07-31 14:00:24 +02:00
parent c5c34c0697
commit 869802c6e0
1 changed files with 107 additions and 9 deletions

View File

@ -10,20 +10,75 @@ log "RUNNING SETUP"
KEYSPACE="dev_keyspace_1" KEYSPACE="dev_keyspace_1"
DUMP_DIR="/dump" # Ensure DUMP_DIR is defined DUMP_DIR="/dump" # Ensure DUMP_DIR is defined
SNAPSHOT_DIR="$DUMP_DIR/snapshot" # Ensure DUMP_DIR is defined
SCHEMA_PATH="$DUMP_DIR/schema/${KEYSPACE}_schema.cql" # Ensure DUMP_DIR is defined SCHEMA_PATH="$DUMP_DIR/schema/${KEYSPACE}_schema.cql" # Ensure DUMP_DIR is defined
DUMP_TAG="dev_keyspace_1" CASSANDRA_SEEDS="cassandra1,cassandra2,cassandra3"
STATUS_DIR="/var/log/cassandra"
# Determine the IP address of the current node
IP_ADDRESS=$(hostname -I | awk '{print $1}') IP_ADDRESS=$(hostname -I | awk '{print $1}')
# Log the IP address for debugging # Initialize SEEDS array
log "Node IP Address: $IP_ADDRESS" SEEDS=(${CASSANDRA_SEEDS//,/ })
# Function to wait for all nodes to be in the 'UN' state
wait_for_all_nodes_up() {
SEEDS=(${CASSANDRA_SEEDS//,/ })
while true; do
all_up=true
for seed in "${SEEDS[@]}"; do
NODE_STATUS=$(nodetool status -r)
if ! echo "$NODE_STATUS" | grep -E "^UN.*$seed" > /dev/null; then
if [ "$seed" = "$CASSANDRA_RPC_ADDRESS" ]; then
NODE_STATUS=$(nodetool status)
if ! echo "$NODE_STATUS" | grep -E "^UN.*$(hostname -I | awk '{print $1}')" > /dev/null; then
log "Node $seed (self) is not up yet..."
all_up=false
break
fi
else
log "Node $seed is not up yet..."
all_up=false
break
fi
fi
done
if [ "$all_up" = true ]; then
log "All nodes are up."
break
else
sleep 5
fi
done
}
# Function to wait for schema agreement across all nodes
wait_for_schema_agreement() {
while true; do
if cqlsh $IP_ADDRESS -e "DESCRIBE KEYSPACE $KEYSPACE;" > /dev/null 2>&1; then
if nodetool describecluster | grep -q "Schema versions:"; then
SCHEMA_COUNT=$(nodetool describecluster | grep -A 1 "Schema versions:" | wc -l)
if [ "$SCHEMA_COUNT" -eq 2 ]; then
log "Schema agreement reached."
break
else
log "Waiting for schema agreement..."
fi
fi
else
log "Waiting for keyspace $KEYSPACE to be available..."
fi
sleep 5
done
}
log "setup KEYSPACE: $KEYSPACE"
log "setup DUMP_DIR: $DUMP_DIR"
log "setup SCHEMA_PATH: $SCHEMA_PATH"
log "setup CASSANDRA_SEEDS: $CASSANDRA_SEEDS"
log "setup STATUS_DIR: $STATUS_DIR"
# Wait for cassandra1 to be ready if this is not the primary node # Wait for cassandra1 to be ready if this is not the primary node
if [ "$PRIMARY_NODE" != "true" ]; then if [ "$PRIMARY_NODE" != "true" ]; then
log "Waiting for cassandra1 to be ready..." log "Waiting for cassandra1 to be ready..."
/wait-for-it.sh cassandra1:9042 -t 60 -- log "cassandra1 is ready" /wait-for-it.sh cassandra1:9042 -t 60 -- log "cassandra1 is ready"
fi fi
# Start Cassandra in the background # Start Cassandra in the background
@ -32,12 +87,55 @@ cassandra -R &
# Wait for Cassandra to be ready # Wait for Cassandra to be ready
log "Waiting for Cassandra to start..." log "Waiting for Cassandra to start..."
until cqlsh $IP_ADDRESS -e "SHOW HOST" > /dev/null 2>&1; do until cqlsh $IP_ADDRESS -e "SHOW HOST" > /dev/null 2>&1; do
sleep 2 sleep 2
done done
# Log the value of PRIMARY_NODE for debugging # Log the value of PRIMARY_NODE for debugging
log "PRIMARY_NODE is set to: $PRIMARY_NODE" log "PRIMARY_NODE is set to: $PRIMARY_NODE"
# Step 1: Wait for all nodes to be up and ready
log "Waiting for all nodes to be up and ready..."
wait_for_all_nodes_up
# Step 2: Create keyspace and schema on the primary node
if [ "$PRIMARY_NODE" = "true" ]; then
log "Checking if keyspace $KEYSPACE exists..."
if ! cqlsh $IP_ADDRESS -e "DESCRIBE KEYSPACE $KEYSPACE;" > /dev/null 2>&1; then
log "Keyspace $KEYSPACE does not exist. Creating keyspace and tables..."
cqlsh $IP_ADDRESS -f "$SCHEMA_PATH"
else
log "Keyspace $KEYSPACE already exists. Ensuring tables exist..."
fi
# Signal to secondary nodes that schema creation is complete
touch $STATUS_DIR/schema_created
fi
# Step 3: Wait for schema to be created and agreed upon across all nodes
log "Waiting for schema agreement across all nodes..."
wait_for_schema_agreement
# Step 4: Import data using sstableloader if not previously imported
if [ "$PRIMARY_NODE" = "true" ]; then
log "Importing snapshots using sstableloader..."
for TABLE_DIR in $(ls $DUMP_DIR); do
TABLE_NAME=$(basename $TABLE_DIR) # Extract table name from directory name
log "Importing table: $TABLE_NAME from directory: $DUMP_DIR/$TABLE_DIR"
sstableloader -d "$CASSANDRA_SEEDS" -v -k "$KEYSPACE" "$DUMP_DIR/$TABLE_DIR"
cqlsh $IP_ADDRESS -k "$KEYSPACE" -e "select count(*) from $TABLE_NAME;" >&2
done
# Signal to secondary nodes that import is complete
touch $STATUS_DIR/import_complete
else
# Wait for import completion signal from primary node
log "Waiting for import completion signal from primary node..."
while [ ! -f "$STATUS_DIR/import_complete" ]; do
sleep 5
done
fi
log "FINISHED IMPORT"
# Keep the container running # Keep the container running
tail -f /dev/null tail -f /dev/null