stock_system/src/utils/data_utils.py

208 lines
7.5 KiB
Python

import os
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from trading.position_calculator import PositionCalculator
from utils.common_utils import get_user_input, get_stock_data, get_qualified_stocks
from typing import Optional
def get_float_input(prompt: str) -> Optional[float]:
return get_user_input(prompt, float)
def get_current_prices(tickers: list) -> dict:
"""Get current prices for multiple tickers using yfinance"""
try:
# Create a space-separated string of tickers
ticker_str = " ".join(tickers)
# Get data for all tickers at once
data = yf.download(ticker_str, period="1d", interval="1m", group_by='ticker')
prices = {}
if len(tickers) == 1:
# Handle single ticker case
if 'Close' in data.columns:
prices[tickers[0]] = data['Close'].iloc[-1]
else:
print(f"No close price found for {tickers[0]}")
else:
# Handle multiple tickers
for ticker in tickers:
try:
if isinstance(data, pd.DataFrame) and (ticker, 'Close') in data.columns:
prices[ticker] = data[ticker]['Close'].iloc[-1]
else:
print(f"No close price found for {ticker}")
except Exception as e:
print(f"Error getting price for {ticker}: {e}")
return prices
except Exception as e:
print(f"Error fetching current prices: {e}")
print(f"Data structure received: {type(data)}")
if isinstance(data, pd.DataFrame):
print(f"Available columns: {data.columns}")
return {}
def validate_signal_date(signal_date: datetime) -> datetime:
"""
Validate and adjust signal date if needed
Args:
signal_date (datetime): Signal date to validate
Returns:
datetime: Valid signal date (not in future)
"""
current_date = datetime.now()
if signal_date > current_date:
return current_date
return signal_date
def print_signal(signal_data: dict, signal_type: str = "🔍") -> None:
"""
Print standardized signal output
Args:
signal_data (dict): Dictionary containing signal information
signal_type (str): Emoji indicator for signal type (default: 🔍)
"""
try:
print(f"\n{signal_type} {signal_data['ticker']} ({signal_data['stock_type']}) @ ${signal_data['entry_price']:.2f} on {signal_data['signal_date'].strftime('%Y-%m-%d %H:%M')}")
print(f" Size: {signal_data['shares']} shares (${signal_data['position_size']:.2f})")
print(f" Stop: ${signal_data['stop_loss']:.2f} (7%) | Target: ${signal_data['target_price']:.2f}")
print(f" Risk/Reward: 1:{signal_data['risk_reward_ratio']:.1f} | Risk: ${abs(signal_data['risk_amount']):.2f}")
print(f" Potential Profit: ${signal_data['profit_amount']:.2f}")
except KeyError as e:
print(f"Error printing signal for {signal_data.get('ticker', 'Unknown')}: Missing key {e}")
# Print available keys for debugging
print(f"Available keys: {list(signal_data.keys())}")
def save_signals_to_csv(signals: list, scanner_name: str) -> None:
"""
Save signals to CSV file with standardized format and naming
Args:
signals (list): List of signal dictionaries
scanner_name (str): Name of the scanner for file naming
"""
if not signals:
print("\nNo signals found")
return
output_dir = 'reports'
os.makedirs(output_dir, exist_ok=True)
output_date = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f'{output_dir}/{scanner_name}_{output_date}.csv'
df_signals = pd.DataFrame(signals)
df_signals.to_csv(output_file, index=False)
print(f"\nSaved {len(signals)} signals to {output_file}")
def process_signal_data(ticker: str, signal_data: dict, current_volume: int,
last_update: int, stock_type: str, calculator: PositionCalculator = None) -> dict:
"""
Process and format signal data consistently
Args:
ticker (str): Stock ticker
signal_data (dict): Raw signal data
current_volume (int): Current trading volume
last_update (int): Last update timestamp
stock_type (str): Stock type/label
calculator (PositionCalculator, optional): Position calculator instance
Returns:
dict: Processed signal data
"""
entry_data = {
'ticker': ticker,
'entry_price': signal_data['price'],
'target_price': signal_data.get('ema', signal_data.get('upper_band')), # Handle both ATR and Sunny
'volume': current_volume,
'signal_date': signal_data.get('date', datetime.now()),
'stock_type': stock_type,
'last_update': datetime.fromtimestamp(last_update/1000000000)
}
if calculator:
position = calculator.calculate_position_size(entry_data['entry_price'])
potential_profit = (entry_data['target_price'] - entry_data['entry_price']) * position['shares']
entry_data.update({
'shares': position['shares'],
'position_size': position['position_value'],
'stop_loss': position['stop_loss'],
'risk_amount': position['potential_loss'],
'profit_amount': potential_profit,
'risk_reward_ratio': abs(potential_profit / position['potential_loss']) if position['potential_loss'] != 0 else 0
})
return entry_data
result = client.query(query)
if not result.result_rows:
return pd.DataFrame()
# Create base DataFrame
df = pd.DataFrame(
result.result_rows,
columns=['date', 'open', 'high', 'low', 'close', 'volume']
)
# Convert numeric columns
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
for col in numeric_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
# Convert date column
df['date'] = pd.to_datetime(df['date'])
# Set date as index for resampling
df.set_index('date', inplace=True)
# Resample based on interval
if interval == 'daily':
rule = '1D'
elif interval == '5min':
rule = '5T'
elif interval == '15min':
rule = '15T'
elif interval == '30min':
rule = '30T'
elif interval == '1hour':
rule = '1H'
else:
rule = '1D' # Default to daily
resampled = df.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
# Reset index to get date as column
resampled.reset_index(inplace=True)
# Filter to requested date range
mask = (resampled['date'] >= start_date + timedelta(days=89)) & (resampled['date'] <= end_date)
resampled = resampled.loc[mask]
# Handle null values
if resampled['close'].isnull().any():
print(f"Warning: Found null values in close prices")
resampled = resampled.dropna(subset=['close'])
if resampled.empty or 'close' not in resampled.columns:
return pd.DataFrame()
return resampled
except Exception as e:
print(f"Error fetching {ticker} data: {str(e)}")
return pd.DataFrame()