stock_system/src/utils/data_utils.py

173 lines
6.2 KiB
Python

import pandas as pd
from datetime import datetime, timedelta
from db.db_connection import create_client
def validate_signal_date(signal_date: datetime) -> datetime:
"""
Validate and adjust signal date if needed
Args:
signal_date (datetime): Signal date to validate
Returns:
datetime: Valid signal date (not in future)
"""
current_date = datetime.now()
if signal_date > current_date:
return current_date
return signal_date
def print_signal(signal_data: dict, signal_type: str = "🔍") -> None:
"""
Print standardized signal output
Args:
signal_data (dict): Dictionary containing signal information
signal_type (str): Emoji indicator for signal type (default: 🔍)
"""
try:
print(f"\n{signal_type} {signal_data['ticker']} @ ${signal_data['entry_price']:.2f} on {signal_data['signal_date'].strftime('%Y-%m-%d %H:%M')}")
print(f" Size: {signal_data['shares']} shares (${signal_data['position_size']:.2f})")
print(f" Stop: ${signal_data['stop_loss']:.2f} (7%) | Target: ${signal_data['target_price']:.2f}")
print(f" Risk/Reward: 1:{signal_data['risk_reward_ratio']:.1f} | Risk: ${abs(signal_data['risk_amount']):.2f}")
print(f" Potential Profit: ${signal_data['profit_amount']:.2f}")
except KeyError as e:
print(f"Error printing signal: Missing key {e}")
def save_signals_to_csv(signals: list, scanner_name: str) -> None:
"""
Save signals to CSV file with standardized format and naming
Args:
signals (list): List of signal dictionaries
scanner_name (str): Name of the scanner for file naming
"""
if not signals:
print("\nNo signals found")
return
output_dir = 'reports'
os.makedirs(output_dir, exist_ok=True)
output_date = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f'{output_dir}/{scanner_name}_{output_date}.csv'
df_signals = pd.DataFrame(signals)
df_signals.to_csv(output_file, index=False)
print(f"\nSaved {len(signals)} signals to {output_file}")
def get_stock_data(ticker: str, start_date: datetime, end_date: datetime, interval: str) -> pd.DataFrame:
"""
Fetch stock data from the database with enhanced fallback logic
Args:
ticker (str): Stock ticker symbol
start_date (datetime): Start date for data fetch
end_date (datetime): End date for data fetch
interval (str): Time interval for data ('daily', '5min', etc.)
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
try:
client = create_client()
# Expand window to 90 days for more data robustness
start_date = start_date - timedelta(days=90)
# First try primary data source
if interval == "daily":
table = "stock_prices_daily"
else:
table = "stock_prices"
# Unified query format
query = f"""
SELECT
toDateTime(window_start/1000000000) as date,
open,
high,
low,
close,
volume
FROM stock_db.stock_prices
WHERE ticker = '{ticker}'
AND window_start BETWEEN
{int(start_date.timestamp() * 1e9)} AND
{int(end_date.timestamp() * 1e9)}
AND toDateTime(window_start/1000000000) <= now()
ORDER BY date ASC
"""
result = client.query(query)
# Fallback to intraday data if needed
if not result.result_rows and interval == "daily":
print(f"⚠️ No daily data for {ticker}, resampling from intraday data")
intraday_query = f"""
SELECT
toDateTime(window_start/1000000000) as date,
first_value(open) AS open,
max(high) AS high,
min(low) AS low,
last_value(close) AS close,
sum(volume) AS volume
FROM stock_db.stock_prices
WHERE ticker = '{ticker}'
AND window_start BETWEEN
{int(start_date.timestamp() * 1e9)} AND
{int(end_date.timestamp() * 1e9)}
AND toYear(toDateTime(window_start/1000000000)) <= toYear(now())
AND toYear(toDateTime(window_start/1000000000)) >= (toYear(now()) - 1)
GROUP BY date
ORDER BY date ASC
"""
result = client.query(intraday_query)
# Fallback to different intervals if still empty
if not result.result_rows:
print(f"⚠️ No {interval} data for {ticker}, trying weekly")
weekly_query = f"""
SELECT
toStartOfWeek(window_start) AS date,
first_value(open) AS open,
max(high) AS high,
min(low) AS low,
last_value(close) AS close,
sum(volume) AS volume
FROM stock_db.stock_prices
WHERE ticker = '{ticker}'
GROUP BY date
ORDER BY date ASC
"""
result = client.query(weekly_query)
if not result.result_rows:
return pd.DataFrame()
df = pd.DataFrame(
result.result_rows,
columns=['date', 'open', 'high', 'low', 'close', 'volume']
)
# Convert numeric columns
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Handle null values
if df['close'].isnull().any():
print(f"Warning: Found null values in close prices")
df = df.dropna(subset=['close'])
if df.empty or 'close' not in df.columns:
return pd.DataFrame()
if df['date'].dtype == object:
df['date'] = pd.to_datetime(df['date'])
return df
except Exception as e:
print(f"Error fetching {ticker} data: {str(e)}")
return pd.DataFrame()