feat: Enhance ClickHouse client with improved session management and error handling
This commit is contained in:
parent
1c23444e5b
commit
58841c9a08
@ -9,17 +9,20 @@ from dotenv import load_dotenv
|
|||||||
# Load environment variables from .env file
|
# Load environment variables from .env file
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def create_client():
|
def create_client():
|
||||||
"""
|
"""
|
||||||
Create a ClickHouse client with a unique session and retry logic,
|
Create a ClickHouse client with enhanced session management and retry logic
|
||||||
using the password loaded from environment variables.
|
|
||||||
"""
|
"""
|
||||||
clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD")
|
clickhouse_password = os.getenv("CLICKHOUSE_PASSWORD")
|
||||||
if not clickhouse_password:
|
if not clickhouse_password:
|
||||||
raise ValueError("CLICKHOUSE_PASSWORD environment variable not set.")
|
raise ValueError("CLICKHOUSE_PASSWORD environment variable not set.")
|
||||||
|
|
||||||
max_retries = 5 # Increased from 3 to 5
|
max_retries = 5
|
||||||
retry_delay = 1 # Reduced initial delay to 1 second
|
retry_delay = 1
|
||||||
|
|
||||||
last_exception = None
|
last_exception = None
|
||||||
|
|
||||||
@ -36,32 +39,49 @@ def create_client():
|
|||||||
secure=True,
|
secure=True,
|
||||||
session_id=session_id,
|
session_id=session_id,
|
||||||
settings={
|
settings={
|
||||||
'session_timeout': 60,
|
'session_timeout': 3600, # Increased to 1 hour
|
||||||
'session_check': 1
|
'session_check': 1,
|
||||||
}
|
'keep_alive_timeout': 3600 # Added keep-alive timeout
|
||||||
|
},
|
||||||
|
connect_timeout=10, # Added explicit connect timeout
|
||||||
|
send_receive_timeout=300 # Added send/receive timeout
|
||||||
)
|
)
|
||||||
|
|
||||||
# Test the connection
|
# Test the connection with a simple query
|
||||||
|
try:
|
||||||
client.query('SELECT 1')
|
client.query('SELECT 1')
|
||||||
|
logger.info(f"Successfully established connection with session {session_id}")
|
||||||
return client
|
return client
|
||||||
|
except Exception as e:
|
||||||
|
if "SESSION_NOT_FOUND" in str(e) or "SESSION_IS_LOCKED" in str(e):
|
||||||
|
raise # Re-raise these specific errors to trigger retry
|
||||||
|
logger.error(f"Connection test failed: {str(e)}")
|
||||||
|
raise
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
last_exception = e
|
last_exception = e
|
||||||
|
error_message = str(e)
|
||||||
|
|
||||||
# Check specifically for session locked error
|
# Handle specific error codes
|
||||||
if "SESSION_IS_LOCKED" in str(e):
|
if "SESSION_NOT_FOUND" in error_message:
|
||||||
if attempt < max_retries - 1:
|
if attempt < max_retries - 1:
|
||||||
# Use exponential backoff for session locks
|
|
||||||
wait_time = retry_delay * (2 ** attempt)
|
wait_time = retry_delay * (2 ** attempt)
|
||||||
logging.warning(f"Session locked, retrying in {wait_time} seconds...")
|
logger.warning(f"Session not found, retrying in {wait_time} seconds...")
|
||||||
time.sleep(wait_time)
|
time.sleep(wait_time)
|
||||||
continue
|
continue
|
||||||
|
elif "SESSION_IS_LOCKED" in error_message:
|
||||||
|
if attempt < max_retries - 1:
|
||||||
|
wait_time = retry_delay * (2 ** attempt)
|
||||||
|
logger.warning(f"Session locked, retrying in {wait_time} seconds...")
|
||||||
|
time.sleep(wait_time)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
# For other errors, use normal retry logic
|
# For other errors, use normal retry logic
|
||||||
elif attempt < max_retries - 1:
|
if attempt < max_retries - 1:
|
||||||
logging.warning(f"Connection attempt {attempt + 1} failed: {str(e)}")
|
logger.warning(f"Connection attempt {attempt + 1} failed: {error_message}")
|
||||||
time.sleep(retry_delay)
|
time.sleep(retry_delay)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# If we've exhausted all retries, raise the last exception
|
# If we've exhausted all retries, log the final error and raise
|
||||||
|
logger.error(f"Failed to establish connection after {max_retries} attempts: {error_message}")
|
||||||
raise last_exception
|
raise last_exception
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user