feat: add connection pool for managing multiple database connections
This commit is contained in:
parent
125273f22b
commit
a5cef08fd2
@ -1,7 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
from queue import Queue
|
||||||
|
from threading import Lock
|
||||||
import clickhouse_connect
|
import clickhouse_connect
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
@ -12,9 +13,14 @@ load_dotenv()
|
|||||||
logging.basicConfig(level=logging.WARNING)
|
logging.basicConfig(level=logging.WARNING)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def create_client():
|
# Connection pool settings
|
||||||
|
MAX_POOL_SIZE = 5
|
||||||
|
connection_pool = Queue(maxsize=MAX_POOL_SIZE)
|
||||||
|
pool_lock = Lock()
|
||||||
|
|
||||||
|
def create_base_client():
|
||||||
"""
|
"""
|
||||||
Create a ClickHouse client without any session management
|
Create a base ClickHouse client without session management
|
||||||
"""
|
"""
|
||||||
clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD")
|
clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD")
|
||||||
if not clickhouse_password:
|
if not clickhouse_password:
|
||||||
@ -42,14 +48,14 @@ def create_client():
|
|||||||
'database': 'stock_db',
|
'database': 'stock_db',
|
||||||
'max_execution_time': 300,
|
'max_execution_time': 300,
|
||||||
'mutations_sync': 0,
|
'mutations_sync': 0,
|
||||||
'use_session_id': 0 # Disable sessions completely
|
'use_session_id': 0
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Test the connection with a simple query
|
# Test the connection
|
||||||
try:
|
try:
|
||||||
client.query('SELECT 1')
|
client.query('SELECT 1')
|
||||||
logger.debug(f"Successfully established connection") # Changed to debug level
|
logger.debug(f"Successfully established connection")
|
||||||
return client
|
return client
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Connection test failed: {str(e)}")
|
logger.error(f"Connection test failed: {str(e)}")
|
||||||
@ -59,7 +65,6 @@ def create_client():
|
|||||||
last_exception = e
|
last_exception = e
|
||||||
error_message = str(e)
|
error_message = str(e)
|
||||||
|
|
||||||
# For any errors, use normal retry logic
|
|
||||||
if attempt < max_retries - 1:
|
if attempt < max_retries - 1:
|
||||||
wait_time = retry_delay * (2 ** attempt)
|
wait_time = retry_delay * (2 ** attempt)
|
||||||
logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}")
|
logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}")
|
||||||
@ -67,6 +72,55 @@ def create_client():
|
|||||||
time.sleep(wait_time)
|
time.sleep(wait_time)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If we've exhausted all retries, log the final error and raise
|
|
||||||
logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}")
|
logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}")
|
||||||
raise last_exception
|
raise last_exception
|
||||||
|
|
||||||
|
def initialize_pool():
|
||||||
|
"""Initialize the connection pool"""
|
||||||
|
while not connection_pool.full():
|
||||||
|
try:
|
||||||
|
client = create_base_client()
|
||||||
|
connection_pool.put(client)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error initializing pool connection: {str(e)}")
|
||||||
|
break
|
||||||
|
|
||||||
|
def get_client():
|
||||||
|
"""
|
||||||
|
Get a client from the pool or create a new one if needed
|
||||||
|
"""
|
||||||
|
# Initialize pool if empty
|
||||||
|
with pool_lock:
|
||||||
|
if connection_pool.empty():
|
||||||
|
initialize_pool()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try to get a connection from the pool
|
||||||
|
client = connection_pool.get_nowait()
|
||||||
|
|
||||||
|
# Test the connection
|
||||||
|
try:
|
||||||
|
client.query('SELECT 1')
|
||||||
|
return client
|
||||||
|
except Exception:
|
||||||
|
# Connection is dead, create a new one
|
||||||
|
logger.warning("Replacing dead connection in pool")
|
||||||
|
client = create_base_client()
|
||||||
|
return client
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# Pool is empty or other error, create new connection
|
||||||
|
logger.warning("Creating new connection (pool empty or error)")
|
||||||
|
return create_base_client()
|
||||||
|
|
||||||
|
def return_client(client):
|
||||||
|
"""
|
||||||
|
Return a client to the pool if possible
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if not connection_pool.full():
|
||||||
|
connection_pool.put(client)
|
||||||
|
else:
|
||||||
|
client.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error returning client to pool: {str(e)}")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user