diff --git a/src/db/db_connection.py b/src/db/db_connection.py index cfeb7e7..167c306 100644 --- a/src/db/db_connection.py +++ b/src/db/db_connection.py @@ -1,7 +1,8 @@ import logging import os import time - +from queue import Queue +from threading import Lock import clickhouse_connect from dotenv import load_dotenv @@ -12,9 +13,14 @@ load_dotenv() logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) -def create_client(): +# Connection pool settings +MAX_POOL_SIZE = 5 +connection_pool = Queue(maxsize=MAX_POOL_SIZE) +pool_lock = Lock() + +def create_base_client(): """ - Create a ClickHouse client without any session management + Create a base ClickHouse client without session management """ clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD") if not clickhouse_password: @@ -42,14 +48,14 @@ def create_client(): 'database': 'stock_db', 'max_execution_time': 300, 'mutations_sync': 0, - 'use_session_id': 0 # Disable sessions completely + 'use_session_id': 0 } ) - # Test the connection with a simple query + # Test the connection try: client.query('SELECT 1') - logger.debug(f"Successfully established connection") # Changed to debug level + logger.debug(f"Successfully established connection") return client except Exception as e: logger.error(f"Connection test failed: {str(e)}") @@ -59,7 +65,6 @@ def create_client(): last_exception = e error_message = str(e) - # For any errors, use normal retry logic if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}") @@ -67,6 +72,55 @@ def create_client(): time.sleep(wait_time) continue - # If we've exhausted all retries, log the final error and raise logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}") raise last_exception + +def initialize_pool(): + """Initialize the connection pool""" + while not connection_pool.full(): + try: + client = create_base_client() + connection_pool.put(client) + except Exception as e: + logger.error(f"Error initializing pool connection: {str(e)}") + break + +def get_client(): + """ + Get a client from the pool or create a new one if needed + """ + # Initialize pool if empty + with pool_lock: + if connection_pool.empty(): + initialize_pool() + + try: + # Try to get a connection from the pool + client = connection_pool.get_nowait() + + # Test the connection + try: + client.query('SELECT 1') + return client + except Exception: + # Connection is dead, create a new one + logger.warning("Replacing dead connection in pool") + client = create_base_client() + return client + + except Exception: + # Pool is empty or other error, create new connection + logger.warning("Creating new connection (pool empty or error)") + return create_base_client() + +def return_client(client): + """ + Return a client to the pool if possible + """ + try: + if not connection_pool.full(): + connection_pool.put(client) + else: + client.close() + except Exception as e: + logger.warning(f"Error returning client to pool: {str(e)}")