from screener.user_input import get_interval_choice, get_date_range import os from datetime import datetime, timedelta import pandas as pd from db.db_connection import create_client from trading.position_calculator import PositionCalculator from utils.data_utils import get_stock_data, validate_signal_date, print_signal, save_signals_to_csv from indicators.three_atr_ema import ThreeATREMAIndicator def check_entry_signal(df: pd.DataFrame) -> list: """ Check for entry signals based on Three ATR EMA strategy throughout the date range Args: df (pd.DataFrame): DataFrame with price data Returns: list: List of tuples (signal, date, signal_data) for each signal found """ if len(df) < 2: # Need at least 2 bars for comparison return [] indicator = ThreeATREMAIndicator() results = indicator.calculate(df) if len(results) < 2: return [] signals = [] # Start from index 1 to compare with previous for i in range(1, len(df)): current = df.iloc[i] previous = df.iloc[i-1] # Get indicator values ema = results['ema'].iloc[i] lower_band = results['lower_band'].iloc[i] prev_lower_band = results['lower_band'].iloc[i-1] # Entry conditions: # 1. Previous close was below the lower band # 2. Current close is at or above the lower band # 3. Price is moving up (current close > previous close) # 4. Price is still below EMA (maintaining downtrend context) entry_signal = ( previous['close'] < prev_lower_band and current['close'] >= lower_band and current['close'] > previous['close'] and current['close'] < ema ) if entry_signal: signal_data = { 'price': current['close'], 'ema': ema, 'lower_band': lower_band } signals.append((True, current['date'], signal_data)) return signals def run_atr_ema_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None: print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}") # Get time interval interval = get_interval_choice() start_date, end_date = get_date_range() start_ts = int(start_date.timestamp() * 1000000000) end_ts = int(end_date.timestamp() * 1000000000) try: with create_client() as client: query = f""" WITH filtered_data AS ( SELECT ticker, window_start, close, volume, toDate(toDateTime(window_start/1000000000)) as trade_date FROM stock_db.stock_prices WHERE window_start BETWEEN {start_ts} AND {end_ts} AND toDateTime(window_start/1000000000) <= now() ), daily_data AS ( SELECT ticker, trade_date, argMax(close, window_start) as daily_close, sum(volume) as daily_volume FROM filtered_data GROUP BY ticker, trade_date HAVING daily_close BETWEEN {min_price} AND {max_price} AND daily_volume >= {min_volume} ), latest_data AS ( SELECT ticker, argMax(daily_close, trade_date) as last_close, sum(daily_volume) as total_volume, max(trade_date) as last_update FROM daily_data GROUP BY ticker ) SELECT ticker, last_close, total_volume, last_update FROM latest_data ORDER BY ticker """ result = client.query(query) qualified_stocks = [(row[0], row[1], row[2], row[3]) for row in result.result_rows] if not qualified_stocks: print("No stocks found matching criteria.") return print(f"\nFound {len(qualified_stocks)} stocks matching criteria") # Initialize indicators indicator = ThreeATREMAIndicator() calculator = None if portfolio_size and portfolio_size > 0: calculator = PositionCalculator( account_size=portfolio_size, risk_percentage=1.0, stop_loss_percentage=7.0 # Explicitly set 7% stop ) bullish_signals = [] for ticker, current_price, current_volume, last_update in qualified_stocks: try: # Get historical data based on interval df = get_stock_data(ticker, start_date, end_date, interval) if df.empty or len(df) < 50: # Need at least 50 bars for the indicator continue results = indicator.calculate(df) # Check for signals throughout the date range signals = check_entry_signal(df) for signal, signal_date, signal_data in signals: entry_data = { 'ticker': ticker, 'entry_price': signal_data['price'], 'target_price': signal_data['ema'], 'volume': current_volume, 'signal_date': signal_date, 'last_update': datetime.fromtimestamp(last_update/1000000000) } if calculator: position = calculator.calculate_position_size(entry_data['entry_price']) potential_profit = (entry_data['target_price'] - entry_data['entry_price']) * position['shares'] entry_data.update({ 'shares': position['shares'], 'position_size': position['position_value'], 'stop_loss': position['stop_loss'], 'risk_amount': position['potential_loss'], 'profit_amount': potential_profit, 'risk_reward_ratio': abs(potential_profit / position['potential_loss']) if position['potential_loss'] != 0 else 0 }) bullish_signals.append(entry_data) print_signal(entry_data, "🟢") except Exception as e: print(f"Error processing {ticker}: {str(e)}") continue save_signals_to_csv(bullish_signals, 'atr_ema') except Exception as e: print(f"Error during scan: {str(e)}")