stock_system/src/screener/t_sunnyband.py

486 lines
19 KiB
Python

import os
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
from db.db_connection import create_client
from indicators.sunny_bands import SunnyBands
from trading.position_calculator import PositionCalculator
def get_interval_choice() -> str:
"""Get user's preferred time interval"""
print("\nSelect Time Interval:")
print("1. Daily")
print("2. 5 minute")
print("3. 15 minute")
print("4. 30 minute")
print("5. 1 hour")
while True:
choice = input("\nEnter your choice (1-5): ")
if choice == "1":
return "daily"
elif choice == "2":
return "5min"
elif choice == "3":
return "15min"
elif choice == "4":
return "30min"
elif choice == "5":
return "1hour"
else:
print("Invalid choice. Please try again.")
def get_stock_data(ticker: str, start_date: datetime, end_date: datetime, interval: str) -> pd.DataFrame:
"""Fetch stock data from the database"""
client = create_client()
# Calculate proper date range (looking back from today)
end_date = datetime.now()
start_date = end_date - timedelta(days=60) # 60 days of history
if interval == "daily":
table = "stock_prices_daily"
date_col = "date"
query = f"""
SELECT
{date_col} as date,
open,
high,
low,
close,
volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND {date_col} BETWEEN '{start_date.date()}' AND '{end_date.date()}'
ORDER BY date ASC
"""
else:
table = "stock_prices"
date_col = "window_start"
minutes_map = {
"5min": 5,
"15min": 15,
"30min": 30,
"1hour": 60
}
minutes = minutes_map[interval]
# Get 5-minute bars and resample them to the desired interval
query = f"""
SELECT
fromUnixTimestamp(intDiv(window_start/1000000000, 300) * 300) as interval_start,
min(open) as open,
max(high) as high,
min(low) as low,
argMax(close, window_start) as close,
sum(volume) as volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND window_start/1000000000 BETWEEN
toUnixTimestamp('{start_date.date()}') AND
toUnixTimestamp('{end_date.date()}')
GROUP BY interval_start
ORDER BY interval_start ASC
"""
try:
result = client.query(query)
if not result.result_rows:
print(f"No data found for {ticker}")
return pd.DataFrame()
df = pd.DataFrame(
result.result_rows,
columns=['date', 'open', 'high', 'low', 'close', 'volume']
)
if interval != "daily" and interval != "5min":
# Resample to desired interval
df.set_index('date', inplace=True)
minutes = minutes_map[interval]
rule = f'{minutes}T'
df = df.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
df.reset_index(inplace=True)
return df
except Exception as e:
print(f"Error fetching data for {ticker}: {str(e)}")
return pd.DataFrame()
def get_valid_tickers(min_price: float, max_price: float, min_volume: int, interval: str) -> list:
"""Get tickers that meet the price and volume criteria"""
client = create_client()
# Get the most recent trading day
today = datetime.now().date()
yesterday = today - timedelta(days=1)
# First get valid tickers from daily data
daily_query = f"""
SELECT DISTINCT ticker
FROM stock_db.stock_prices_daily
WHERE date = '{yesterday}'
AND close BETWEEN {min_price} AND {max_price}
AND volume >= {min_volume}
ORDER BY ticker ASC
"""
try:
result = client.query(daily_query)
tickers = [row[0] for row in result.result_rows]
print(f"\nFound {len(tickers)} stocks matching price and volume criteria")
if interval != "daily":
# Now verify these tickers have intraday data
# Convert to Unix timestamp in nanoseconds
start_ts = int(datetime.combine(yesterday, datetime.strptime("09:30", "%H:%M").time()).timestamp() * 1000000000)
end_ts = int(datetime.combine(yesterday, datetime.strptime("16:00", "%H:%M").time()).timestamp() * 1000000000)
intraday_query = f"""
SELECT DISTINCT ticker
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
AND window_start BETWEEN {start_ts} AND {end_ts}
GROUP BY ticker
HAVING count() >= 50 -- Ensure we have enough data points for the indicator
"""
result = client.query(intraday_query)
tickers = [row[0] for row in result.result_rows]
print(f"Of those, {len(tickers)} have recent intraday data")
return tickers
except Exception as e:
print(f"Error fetching tickers: {str(e)}")
return []
def view_stock_details(ticker: str, interval: str, start_date: datetime, end_date: datetime) -> None:
"""Display detailed data for a single stock"""
print(f"\n📊 Detailed Analysis for {ticker}")
print(f"Interval: {interval}")
print(f"Date Range: {start_date.date()} to {end_date.date()}")
try:
# Construct query
client = create_client()
today = datetime.now().date()
start_date = today - timedelta(days=60)
if interval == "daily":
table = "stock_prices_daily"
date_col = "date"
query = f"""
SELECT
{date_col} as date,
open,
high,
low,
close,
volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND {date_col} BETWEEN '{start_date}' AND '{today}'
ORDER BY date ASC
"""
else:
table = "stock_prices"
date_col = "window_start"
minutes_map = {
"5min": 5,
"15min": 15,
"30min": 30,
"1hour": 60
}
minutes = minutes_map[interval]
query = f"""
SELECT
fromUnixTimestamp(intDiv({date_col}, 300) * 300) as interval_start,
min(open) as open,
max(high) as high,
min(low) as low,
argMax(close, {date_col}) as close,
sum(volume) as volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND {date_col} BETWEEN toUnixTimestamp('{start_date}') AND toUnixTimestamp('{today}')
GROUP BY interval_start
ORDER BY interval_start ASC
"""
# Print the actual query being executed
print("\nExecuting Query:")
print(query)
# Execute query and get results
result = client.query(query)
if not result.result_rows:
print("\nNo data found for this stock")
return
# Convert to DataFrame
df = pd.DataFrame(
result.result_rows,
columns=['date', 'open', 'high', 'low', 'close', 'volume']
)
# Print raw query results
print("\nRaw Query Results:")
print(f"Number of rows returned: {len(result.result_rows)}")
print("\nFirst 5 rows of raw data:")
for i, row in enumerate(result.result_rows[:5]):
print(f"Row {i + 1}: {row}")
print("\nLast 5 rows of raw data:")
for i, row in enumerate(result.result_rows[-5:]):
print(f"Row {len(result.result_rows) - 4 + i}: {row}")
# Calculate SunnyBands
sunny = SunnyBands()
results = sunny.calculate(df)
# Get last values
last_price = df.iloc[-1]
last_bands = results.iloc[-1]
print("\nLatest Values:")
print(f"Date: {last_price['date']}")
print(f"Open: ${last_price['open']:.2f}")
print(f"High: ${last_price['high']:.2f}")
print(f"Low: ${last_price['low']:.2f}")
print(f"Close: ${last_price['close']:.2f}")
print(f"Volume: {last_price['volume']:,}")
print(f"\nSunnyBands Indicators:")
print(f"DMA: ${last_bands['dma']:.2f}")
print(f"Upper Band: ${last_bands['upper_band']:.2f}")
print(f"Lower Band: ${last_bands['lower_band']:.2f}")
print(f"Bullish Signal: {'Yes' if last_bands['bullish_signal'] else 'No'}")
print(f"Bearish Signal: {'Yes' if last_bands['bearish_signal'] else 'No'}")
except Exception as e:
print(f"Error analyzing {ticker}: {str(e)}")
def run_sunny_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None:
"""Run the SunnyBand scanner and save results"""
print(f"\nInitializing scan for stocks between ${min_price:.2f} and ${max_price:.2f}")
print(f"Minimum volume: {min_volume:,}")
# Get user's preferred interval
interval = get_interval_choice()
# Set date range to look back from current time
end_date = datetime.now()
start_date = end_date - timedelta(days=1) # Look at last trading day for signals
lookback_start = end_date - timedelta(days=60) # For DMA calculation
print(f"\nAnalyzing data from {lookback_start.date()} to {end_date.date()}")
print(f"Looking for signals in the last trading day")
# Get valid tickers
print("\nFetching qualified stocks...")
tickers = get_valid_tickers(min_price, max_price, min_volume, interval)
if not tickers:
print("No stocks found matching your criteria.")
return
print(f"\nFound {len(tickers)} stocks to scan")
print("Looking for SunnyBand crossovers...")
print("This may take a few minutes...")
# Initialize results lists
bullish_signals = []
bearish_signals = []
errors = []
# Initialize SunnyBands indicator and position calculator
sunny = SunnyBands()
calculator = None
if portfolio_size and portfolio_size > 0:
calculator = PositionCalculator(account_size=portfolio_size)
print(f"\nInitialized position calculator with portfolio size: ${portfolio_size:,.2f}")
# Track progress
total = len(tickers)
processed = 0
# Scan each ticker
for ticker in tickers:
processed += 1
if processed % 10 == 0: # Show progress every 10 stocks
print(f"Progress: {processed}/{total} stocks processed ({(processed/total)*100:.1f}%)")
try:
# Get price data
df = get_stock_data(ticker, start_date, end_date, interval)
if df.empty:
continue
if len(df) < 50: # Need enough data for the indicator
continue
# Calculate SunnyBands
results = sunny.calculate(df)
# Check last day's signals
last_day = df.iloc[-1]
if results['bullish_signal'].iloc[-1]:
print("\nDebug: Processing bullish signal") # Debug line
signal_data = {
'ticker': ticker,
'price': last_day['close'],
'volume': last_day['volume'],
'date': last_day['date'],
'dma': results['dma'].iloc[-1],
'lower_band': results['lower_band'].iloc[-1],
'upper_band': results['upper_band'].iloc[-1]
}
# Add position sizing if calculator exists
if calculator:
print(f"Debug: Calculator exists, calculating position for price: ${last_day['close']:.2f}") # Debug line
try:
entry_price = last_day['close']
upper_band = results['upper_band'].iloc[-1]
print(f"Debug: Entry: ${entry_price:.2f}, Upper Band: ${upper_band:.2f}") # Debug line
position = calculator.calculate_position_size(
entry_price=entry_price,
target_price=upper_band
)
# Format debug position output with rounded values
debug_position = {k: round(float(v), 2) if isinstance(v, (float, np.float64)) else v
for k, v in position.items()}
print(f"Debug: Position calculation result: {debug_position}") # Debug line
signal_data.update({
'shares': position['shares'],
'position_value': position['position_value'],
'stop_loss': position['stop_loss'],
'potential_profit': position['potential_profit'],
'potential_loss': position['potential_loss'],
'risk_reward_ratio': position['risk_reward_ratio']
})
except ValueError as e:
print(f"Position sizing error for {ticker}: {str(e)}")
bullish_signals.append(signal_data)
print(f"🟢 Bullish Signal: {ticker} at ${last_day['close']:.2f}")
elif results['bearish_signal'].iloc[-1]:
signal_data = {
'ticker': ticker,
'price': last_day['close'],
'volume': last_day['volume'],
'date': last_day['date'],
'dma': results['dma'].iloc[-1],
'upper_band': results['upper_band'].iloc[-1]
}
bearish_signals.append(signal_data)
print(f"🔴 Bearish Signal: {ticker} at ${last_day['close']:.2f}")
except Exception as e:
errors.append(f"{ticker}: {str(e)}")
continue
# Save and display results
output_date = datetime.now().strftime("%Y%m%d")
print(f"\nScan Complete! Processed {total} stocks.")
if errors:
print(f"\nEncountered {len(errors)} errors during scan:")
for error in errors[:5]: # Show first 5 errors
print(error)
if len(errors) > 5:
print(f"...and {len(errors) - 5} more errors")
if bullish_signals:
print(f"\n🟢 Found {len(bullish_signals)} Bullish Signals:")
df_bullish = pd.DataFrame(bullish_signals)
# Create reports directory if it doesn't exist
os.makedirs('reports', exist_ok=True)
bullish_file = f'reports/sunny_bullish_{output_date}.csv'
df_bullish.to_csv(bullish_file, index=False)
print(f"Saved to {bullish_file}")
for signal in bullish_signals:
print(f"\n{signal['ticker']}:")
print(f"Entry Price: ${signal['price']:.2f}")
print(f"Volume: {signal['volume']:,}")
print(f"Target (Upper Band): ${signal['upper_band']:.2f}")
if 'shares' in signal:
# Convert numpy float64 to regular float and round to 2 decimal places
position_value = round(float(signal['position_value']), 2)
stop_loss = round(float(signal['stop_loss']), 2)
potential_loss = round(float(signal['potential_loss']), 2)
potential_profit = round(float(signal['potential_profit']), 2)
risk_reward = round(float(signal['risk_reward_ratio']), 2)
target_price = round(float(signal['upper_band']), 2)
# Calculate percentage gains/losses and round to 1 decimal place
profit_percentage = round((potential_profit / position_value) * 100, 1)
loss_percentage = round((abs(potential_loss) / position_value) * 100, 1)
print("\nPosition Details:")
print(f"Shares: {signal['shares']:,}")
print(f"Position Size: ${position_value:,.2f}")
print(f"Entry Price: ${signal['price']:.2f}")
print(f"Stop Loss: ${stop_loss:.2f} (-6%)")
print(f"Target Price: ${target_price:.2f}")
print(f"Risk Amount: ${abs(potential_loss):,.2f} ({loss_percentage:.1f}%)")
print(f"Potential Profit: ${potential_profit:,.2f} ({profit_percentage:.1f}%)")
print(f"Risk/Reward Ratio: {risk_reward:.2f}")
if bearish_signals:
print(f"\n🔴 Found {len(bearish_signals)} Bearish Signals:")
df_bearish = pd.DataFrame(bearish_signals)
# Create reports directory if it doesn't exist
os.makedirs('reports', exist_ok=True)
bearish_file = f'reports/sunny_bearish_{output_date}.csv'
df_bearish.to_csv(bearish_file, index=False)
print(f"Saved to {bearish_file}")
for signal in bearish_signals:
print(f"\n{signal['ticker']}:")
print(f"Price: ${signal['price']:.2f}")
print(f"Volume: {signal['volume']:,}")
print(f"DMA: ${signal['dma']:.2f}")
print(f"Upper Band: ${signal['upper_band']:.2f}")
if not bullish_signals and not bearish_signals:
print("\nNo signals found for today.")
else:
while True:
view_choice = input("\nWould you like to view detailed data for a stock? (Enter ticker or 'n' to exit): ").upper()
if view_choice == 'N':
break
# Check if ticker is in our signals
found = False
for signals in [bullish_signals, bearish_signals]:
for signal in signals:
if signal['ticker'] == view_choice:
found = True
view_stock_details(view_choice, interval, start_date, end_date)
break
if found:
break
if not found:
print(f"Ticker {view_choice} not found in signals list.")