import logging import os import time import uuid import clickhouse_connect from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def create_client(): """ Create a ClickHouse client with enhanced session management and retry logic """ clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD") if not clickhouse_password: raise ValueError("CLICKHOUSE_PASSWORD environment variable not set.") max_retries = 5 retry_delay = 1 last_exception = None for attempt in range(max_retries): try: # Generate a new UUID for each attempt session_id = str(uuid.uuid4()) client = clickhouse_connect.get_client( host="clickhouse.abellana.work", port=443, username="default", password=clickhouse_password, secure=True, session_id=session_id, settings={ 'session_timeout': 3600, # Increased to 1 hour 'session_check': 1 }, connect_timeout=10, # Added explicit connect timeout send_receive_timeout=300 # Added send/receive timeout ) # Test the connection with a simple query try: client.query('SELECT 1') logger.info(f"Successfully established connection with session {session_id}") return client except Exception as e: if "SESSION_NOT_FOUND" in str(e) or "SESSION_IS_LOCKED" in str(e): raise # Re-raise these specific errors to trigger retry logger.error(f"Connection test failed: {str(e)}") raise except Exception as e: last_exception = e error_message = str(e) # Handle specific error codes if "SESSION_NOT_FOUND" in error_message: if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) logger.warning(f"Session not found, retrying in {wait_time} seconds...") time.sleep(wait_time) continue elif "SESSION_IS_LOCKED" in error_message: if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) logger.warning(f"Session locked, retrying in {wait_time} seconds...") time.sleep(wait_time) continue else: # For other errors, use normal retry logic if attempt < max_retries - 1: logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}") time.sleep(retry_delay) continue # If we've exhausted all retries, log the final error and raise logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}") raise last_exception