From 58841c9a087dc9ff9833bd99572521ede08c678c Mon Sep 17 00:00:00 2001 From: "Bobby (aider)" Date: Sat, 8 Feb 2025 17:38:58 -0800 Subject: [PATCH] feat: Enhance ClickHouse client with improved session management and error handling --- src/db/db_connection.py | 62 +++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/src/db/db_connection.py b/src/db/db_connection.py index ea070e4..a3efcff 100644 --- a/src/db/db_connection.py +++ b/src/db/db_connection.py @@ -9,17 +9,20 @@ from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + def create_client(): """ - Create a ClickHouse client with a unique session and retry logic, - using the password loaded from environment variables. + Create a ClickHouse client with enhanced session management and retry logic """ clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD") if not clickhouse_password: raise ValueError("CLICKHOUSE_PASSWORD environment variable not set.") - max_retries = 5 # Increased from 3 to 5 - retry_delay = 1 # Reduced initial delay to 1 second + max_retries = 5 + retry_delay = 1 last_exception = None @@ -36,32 +39,49 @@ def create_client(): secure=True, session_id=session_id, settings={ - 'session_timeout': 60, - 'session_check': 1 - } + 'session_timeout': 3600, # Increased to 1 hour + 'session_check': 1, + 'keep_alive_timeout': 3600 # Added keep-alive timeout + }, + connect_timeout=10, # Added explicit connect timeout + send_receive_timeout=300 # Added send/receive timeout ) - # Test the connection - client.query('SELECT 1') - return client + # Test the connection with a simple query + try: + client.query('SELECT 1') + logger.info(f"Successfully established connection with session {session_id}") + return client + except Exception as e: + if "SESSION_NOT_FOUND" in str(e) or "SESSION_IS_LOCKED" in str(e): + raise # Re-raise these specific errors to trigger retry + logger.error(f"Connection test failed: {str(e)}") + raise except Exception as e: last_exception = e + error_message = str(e) - # Check specifically for session locked error - if "SESSION_IS_LOCKED" in str(e): + # Handle specific error codes + if "SESSION_NOT_FOUND" in error_message: if attempt < max_retries - 1: - # Use exponential backoff for session locks wait_time = retry_delay * (2 ** attempt) - logging.warning(f"Session locked, retrying in {wait_time} seconds...") + logger.warning(f"Session not found, retrying in {wait_time} seconds...") time.sleep(wait_time) continue + elif "SESSION_IS_LOCKED" in error_message: + if attempt < max_retries - 1: + wait_time = retry_delay * (2 ** attempt) + logger.warning(f"Session locked, retrying in {wait_time} seconds...") + time.sleep(wait_time) + continue + else: + # For other errors, use normal retry logic + if attempt < max_retries - 1: + logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}") + time.sleep(retry_delay) + continue - # For other errors, use normal retry logic - elif attempt < max_retries - 1: - logging.warning(f"Connection attempt {attempt + 1} failed: {str(e)}") - time.sleep(retry_delay) - continue - - # If we've exhausted all retries, raise the last exception + # If we've exhausted all retries, log the final error and raise + logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}") raise last_exception