stock_system/src/screener/t_sunnyband.py

315 lines
12 KiB
Python

import os
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
from db.db_connection import create_client
from indicators.sunny_bands import SunnyBands
from trading.position_calculator import PositionCalculator
from screener.user_input import get_interval_choice, get_date_range
from utils.data_utils import get_stock_data, validate_signal_date, print_signal, save_signals_to_csv
def check_entry_signal(df: pd.DataFrame) -> list:
"""
Check for entry signals based on Sunny Bands strategy throughout the date range
Args:
df (pd.DataFrame): DataFrame with price data
Returns:
list: List of tuples (signal, date, signal_data) for each signal found
"""
if len(df) < 2: # Need at least 2 bars for comparison
return []
sunny = SunnyBands()
results = sunny.calculate(df)
if len(results) < 2:
return []
signals = []
# Start from index 1 to compare with previous
for i in range(1, len(df)):
current = df.iloc[i]
current_bands = results.iloc[i]
# Check for bullish signal
if current_bands['bullish_signal']:
signal_data = {
'price': current['close'],
'upper_band': current_bands['upper_band'],
'lower_band': current_bands['lower_band'],
'dma': current_bands['dma']
}
signals.append((True, current['date'], signal_data))
return signals
def get_valid_tickers(min_price: float, max_price: float, min_volume: int, interval: str) -> list:
"""Get tickers that meet the price and volume criteria"""
# Get the most recent trading day
today = datetime.now().date()
yesterday = today - timedelta(days=1)
try:
with create_client() as client:
# First get valid tickers from daily data
daily_query = f"""
SELECT DISTINCT ticker
FROM stock_db.stock_prices_daily
WHERE date = '{yesterday}'
AND close BETWEEN {min_price} AND {max_price}
AND volume >= {min_volume}
ORDER BY ticker ASC
"""
try:
result = client.query(daily_query)
tickers = [row[0] for row in result.result_rows]
print(f"\nFound {len(tickers)} stocks matching price and volume criteria")
if interval != "daily":
# Now verify these tickers have intraday data
# Convert to Unix timestamp in nanoseconds
start_ts = int(datetime.combine(yesterday, datetime.strptime("09:30", "%H:%M").time()).timestamp() * 1000000000)
end_ts = int(datetime.combine(yesterday, datetime.strptime("16:00", "%H:%M").time()).timestamp() * 1000000000)
intraday_query = f"""
SELECT DISTINCT ticker
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
AND window_start BETWEEN {start_ts} AND {end_ts}
GROUP BY ticker
HAVING count() >= 50 -- Ensure we have enough data points for the indicator
"""
result = client.query(intraday_query)
tickers = [row[0] for row in result.result_rows]
print(f"Of those, {len(tickers)} have recent intraday data")
return tickers
except Exception as e:
print(f"Error fetching tickers: {str(e)}")
return []
def view_stock_details(ticker: str, interval: str, start_date: datetime, end_date: datetime) -> None:
"""Display detailed data for a single stock"""
print(f"\n📊 Detailed Analysis for {ticker}")
print(f"Interval: {interval}")
print(f"Date Range: {start_date.date()} to {end_date.date()}")
try:
with create_client() as client:
# Construct query
today = datetime.now().date()
start_date = today - timedelta(days=60)
if interval == "daily":
table = "stock_prices_daily"
date_col = "date"
query = f"""
SELECT
{date_col} as date,
open,
high,
low,
close,
volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND {date_col} BETWEEN '{start_date}' AND '{today}'
ORDER BY date ASC
"""
else:
table = "stock_prices"
date_col = "window_start"
minutes_map = {
"5min": 5,
"15min": 15,
"30min": 30,
"1hour": 60
}
minutes = minutes_map[interval]
query = f"""
SELECT
fromUnixTimestamp(intDiv({date_col}, 300) * 300) as interval_start,
min(open) as open,
max(high) as high,
min(low) as low,
argMax(close, {date_col}) as close,
sum(volume) as volume
FROM stock_db.{table}
WHERE ticker = '{ticker}'
AND {date_col} BETWEEN toUnixTimestamp('{start_date}') AND toUnixTimestamp('{today}')
GROUP BY interval_start
ORDER BY interval_start ASC
"""
# Execute query and get results
result = client.query(query)
if not result.result_rows:
print("\nNo data found for this stock")
return
# Convert to DataFrame
df = pd.DataFrame(
result.result_rows,
columns=['date', 'open', 'high', 'low', 'close', 'volume']
)
# Print raw query results
print("\nRaw Query Results:")
print(f"Number of rows returned: {len(result.result_rows)}")
print("\nFirst 5 rows of raw data:")
for i, row in enumerate(result.result_rows[:5]):
print(f"Row {i + 1}: {row}")
print("\nLast 5 rows of raw data:")
for i, row in enumerate(result.result_rows[-5:]):
print(f"Row {len(result.result_rows) - 4 + i}: {row}")
# Calculate SunnyBands
sunny = SunnyBands()
results = sunny.calculate(df)
# Get last values
last_price = df.iloc[-1]
last_bands = results.iloc[-1]
print("\nLatest Values:")
print(f"Date: {last_price['date']}")
print(f"Open: ${last_price['open']:.2f}")
print(f"High: ${last_price['high']:.2f}")
print(f"Low: ${last_price['low']:.2f}")
print(f"Close: ${last_price['close']:.2f}")
print(f"Volume: {last_price['volume']:,}")
print(f"\nSunnyBands Indicators:")
print(f"DMA: ${last_bands['dma']:.2f}")
print(f"Upper Band: ${last_bands['upper_band']:.2f}")
print(f"Lower Band: ${last_bands['lower_band']:.2f}")
print(f"Bullish Signal: {'Yes' if last_bands['bullish_signal'] else 'No'}")
print(f"Bearish Signal: {'Yes' if last_bands['bearish_signal'] else 'No'}")
except Exception as e:
print(f"Error analyzing {ticker}: {str(e)}")
def run_sunny_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None:
print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}")
interval = get_interval_choice()
# Get date range from user input
start_date, end_date = get_date_range()
# First get qualified stocks from database
# Convert dates to Unix timestamp in nanoseconds
end_ts = int(end_date.timestamp() * 1000000000)
start_ts = int(start_date.timestamp() * 1000000000)
try:
with create_client() as client:
# Query to get stocks meeting criteria with their latest data
query = f"""
WITH latest_data AS (
SELECT
ticker,
argMax(close, window_start) as last_close,
sum(volume) as total_volume,
max(window_start) as last_update
FROM stock_db.stock_prices
WHERE window_start BETWEEN {start_ts} AND {end_ts}
AND toDateTime(window_start/1000000000) <= now()
GROUP BY ticker
HAVING last_close BETWEEN {min_price} AND {max_price}
AND total_volume >= {min_volume}
)
SELECT
ticker,
last_close,
total_volume,
last_update
FROM latest_data
ORDER BY ticker
"""
try:
result = client.query(query)
qualified_stocks = [(row[0], row[1], row[2], row[3]) for row in result.result_rows]
qualified_stocks = [(row[0], row[1], row[2], row[3]) for row in result.result_rows]
if not qualified_stocks:
print("No stocks found matching criteria.")
return
print(f"\nFound {len(qualified_stocks)} stocks matching criteria")
# Initialize indicators
sunny = SunnyBands()
calculator = None
if portfolio_size and portfolio_size > 0:
calculator = PositionCalculator(
account_size=portfolio_size,
risk_percentage=1.0,
stop_loss_percentage=7.0 # Explicit 7% stop loss
)
bullish_signals = []
bearish_signals = []
# Process each qualified stock
for ticker, current_price, current_volume, last_update in qualified_stocks:
try:
# Get historical data based on interval
df = get_stock_data(ticker, start_date, end_date, interval)
if df.empty or len(df) < 50: # Need at least 50 bars for the indicator
continue
# Check for signals throughout the date range
signals = check_entry_signal(df)
for signal, signal_date, signal_data in signals:
if calculator:
try:
position = calculator.calculate_position_size(
entry_price=signal_data['price'],
target_price=signal_data['upper_band']
)
if position['shares'] > 0:
entry_data = {
'ticker': ticker,
'entry_price': signal_data['price'],
'target_price': signal_data['upper_band'],
'signal_date': signal_date,
'volume': current_volume,
'last_update': datetime.fromtimestamp(last_update/1000000000),
'shares': position['shares'],
'position_size': position['position_value'],
'stop_loss': signal_data['price'] * 0.93, # 7% stop loss
'risk_amount': position['potential_loss'],
'profit_amount': position['potential_profit'],
'risk_reward_ratio': position['risk_reward_ratio']
}
bullish_signals.append(entry_data)
print_signal(entry_data, "🟢")
except ValueError as e:
print(f"Skipping {ticker} position: {str(e)}")
continue
except Exception as e:
print(f"Error processing {ticker}: {str(e)}")
continue
save_signals_to_csv(bullish_signals, 'sunny')
except Exception as e:
print(f"Error during scan: {str(e)}")