diff --git a/src/screener/t_atr_ema_v2.py b/src/screener/t_atr_ema_v2.py index 36c7da1..9bbdf5a 100644 --- a/src/screener/t_atr_ema_v2.py +++ b/src/screener/t_atr_ema_v2.py @@ -1,235 +1,166 @@ -from screener.user_input import get_interval_choice -import os from datetime import datetime, timedelta import pandas as pd +import os from db.db_connection import create_client from trading.position_calculator import PositionCalculator from screener.t_sunnyband import get_stock_data +from screener.user_input import get_interval_choice from indicators.three_atr_ema import ThreeATREMAIndicator -def run_atr_ema_target_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None): - print(f"\n🔍 Scanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}") +def check_entry_signal(df: pd.DataFrame) -> bool: + """ + Check for entry signal based on Three ATR EMA strategy + + Args: + df (pd.DataFrame): DataFrame with price data + + Returns: + bool: True if entry signal is present, False otherwise + """ + if len(df) < 2: # Need at least 2 bars for comparison + return False + + indicator = ThreeATREMAIndicator() + results = indicator.calculate(df) + + if len(results) < 2: + return False + + # Get latest values + current = df.iloc[-1] + previous = df.iloc[-2] + + # Get indicator values + ema = results['ema'].iloc[-1] + lower_band = results['lower_band'].iloc[-1] + prev_lower_band = results['lower_band'].iloc[-2] + + # Entry conditions from Pine script: + # 1. Price is below EMA + # 2. Previous close was at or below lower band + # 3. Current close is higher than previous close + entry_signal = ( + current['close'] < ema and + previous['close'] <= prev_lower_band and + current['close'] > previous['close'] + ) + + return entry_signal + +def run_atr_ema_scanner_v2(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None: + """ + Scan for stocks meeting Three ATR EMA entry criteria + + Args: + min_price (float): Minimum stock price + max_price (float): Maximum stock price + min_volume (int): Minimum trading volume + portfolio_size (float, optional): Portfolio size for position sizing + """ + print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}") interval = get_interval_choice() - end_date = datetime.now() - start_date = end_date - timedelta(days=90) - # Convert to Unix timestamps (nanoseconds) - start_ts = int(start_date.timestamp() * 1e9) - end_ts = int(end_date.timestamp() * 1e9) - + end_date = datetime.now() + start_date = end_date - timedelta(days=1) + start_ts = int(start_date.timestamp() * 1000000000) + end_ts = int(end_date.timestamp() * 1000000000) + client = create_client() - + try: + # Query to get qualified stocks query = f""" WITH latest_data AS ( SELECT ticker, - sum(volume) AS total_volume, - argMax(open, window_start) AS last_open, - argMax(close, window_start) AS last_close, - argMax(high, window_start) AS last_high, - argMax(low, window_start) AS last_low, - max(window_start) AS last_update, - sum(transactions) AS transaction_count + argMax(close, window_start) as last_close, + sum(volume) as total_volume, + max(window_start) as last_update FROM stock_db.stock_prices WHERE window_start BETWEEN {start_ts} AND {end_ts} - AND toYear(toDateTime(window_start/1000000000)) <= toYear(now()) - AND toYear(toDateTime(window_start/1000000000)) >= (toYear(now()) - 1) GROUP BY ticker HAVING last_close BETWEEN {min_price} AND {max_price} AND total_volume >= {min_volume} ) SELECT ticker, - total_volume, - last_open, last_close, - last_high, - last_low, - last_update, - transaction_count + total_volume, + last_update FROM latest_data ORDER BY ticker """ result = client.query(query) - stocks = result.result_rows - - if not stocks: - print("❌ No stocks found matching criteria.") + qualified_stocks = [(row[0], row[1], row[2], row[3]) for row in result.result_rows] + + if not qualified_stocks: + print("No stocks found matching criteria.") return - print(f"\n✅ Found {len(stocks)} stocks matching criteria") - - # **Correct column order as per ClickHouse output** - columns = ["ticker", "volume", "open", "close", "high", "low", "window_start", "transactions"] - df_stocks = pd.DataFrame(stocks, columns=columns) - - # **Convert timestamps from nanoseconds to readable datetime** - df_stocks["window_start"] = pd.to_datetime(df_stocks["window_start"], unit="ns") - - # Debugging: Check if columns exist - print("\n📊 Data Sample from ClickHouse Query:") - print(df_stocks.head()) - - indicator = ThreeATREMAIndicator() - calculator = PositionCalculator(portfolio_size, risk_percentage=1.0, stop_loss_percentage=7.0) if portfolio_size else None - bullish_signals = [] - - for _, row in df_stocks.iterrows(): - ticker = row["ticker"] - current_price = row["close"] - current_volume = row["volume"] - last_update = row["window_start"] - + print(f"\nFound {len(qualified_stocks)} stocks matching criteria") + + # Initialize position calculator if portfolio size provided + calculator = None + if portfolio_size and portfolio_size > 0: + calculator = PositionCalculator( + account_size=portfolio_size, + risk_percentage=1.0, + stop_loss_percentage=7.0 + ) + + entry_signals = [] + + for ticker, current_price, current_volume, last_update in qualified_stocks: try: - # Get historical data df = get_stock_data(ticker, start_date, end_date, interval) - # Add date validation - if not df.empty: - max_date = df['date'].max() - min_date = df['date'].min() - if max_date.year > end_date.year or min_date.year < (end_date.year - 1): - print(f"⚠️ {ticker}: Invalid date range ({min_date.date()} to {max_date.date()})") - continue - - # Enhanced validation with debugging - if df.empty: - print(f"⚠️ {ticker}: Empty DataFrame") - continue - - # Debug data - print(f"\nProcessing {ticker}") - print(f"Columns present: {df.columns.tolist()}") - print(f"Data types: {df.dtypes}") - print(f"First row: {df.iloc[0]}") - print(f"Last row: {df.iloc[-1]}") - - # Ensure DataFrame has required columns and proper types - required_columns = ['date', 'open', 'high', 'low', 'close', 'volume'] - if not all(col in df.columns for col in required_columns): - missing = set(required_columns) - set(df.columns) - print(f"⚠️ {ticker}: Missing columns: {missing}") + if df.empty or len(df) < 21: # Need at least 21 bars for EMA continue - # Convert columns to numeric and handle any conversion errors - for col in ['open', 'high', 'low', 'close', 'volume']: - try: - df[col] = pd.to_numeric(df[col], errors='coerce') - except Exception as e: - print(f"⚠️ {ticker}: Error converting {col} to numeric: {str(e)}") - print(f"Sample of problematic column: {df[col].head()}") - raise ValueError(f"Data conversion error in {col}") - - # Verify data validity after conversion - if df['close'].isnull().any(): - null_rows = df[df['close'].isnull()] - print(f"⚠️ {ticker}: Contains {len(null_rows)} null values in close price") - print("First few null rows:") - print(null_rows.head()) - continue - - if len(df) < 50: - print(f"⚠️ {ticker}: Insufficient data points ({len(df)})") - continue - - # Make a clean copy for indicator calculation - calc_df = df.copy() - - # Make a clean copy for indicator calculation - calc_df = df.copy() - - # Resample to daily data if we have intraday - if interval != "daily": - calc_df = calc_df.resample('D', on='date').agg({ - 'open': 'first', - 'high': 'max', - 'low': 'min', - 'close': 'last', - 'volume': 'sum' - }).dropna() - - # Ensure minimum data points after resampling - if len(calc_df) < 50: - print(f"⚠️ {ticker}: Insufficient data points after resampling ({len(calc_df)})") - continue - - # Calculate indicator with validated data - try: - results = indicator.calculate(calc_df) - if results.empty: - print(f"⚠️ {ticker}: No valid indicator results") - continue - - # Get the last two rows for signal checking - last_row = results.iloc[-1] - prev_row = results.iloc[-2] - - bullish_entry = ( - last_row["close"] < last_row["ema"] and - prev_row["close"] <= prev_row["lower_band"] and - last_row["close"] > prev_row["close"] - ) - - except Exception as e: - print(f"⚠️ {ticker}: Error in indicator calculation:") - print(f"Error details: {str(e)}") - print(f"Data shape: {calc_df.shape}") - print(f"Sample data:\n{calc_df.head()}") - continue - - if bullish_entry: - entry_price = last_row["close"] - target_1 = entry_price * 1.10 # 10% profit - target_2 = entry_price * 1.20 # 20% profit - - # Trailing stop logic - trail_stop = None - trail_active = False - - if last_row["close"] >= last_row["upper_band"]: - trail_active = True - highest_price = max(results["high"].iloc[-5:]) # Last 5 days - trail_stop = highest_price * 0.98 # 2% below high - - # Position sizing - position = calculator.calculate_position_size(entry_price, target_2) if calculator else None - position_size = position["position_value"] if position else None - - # Save signal + if check_entry_signal(df): signal_data = { - "ticker": ticker, - "entry_price": entry_price, - "target_1": target_1, - "target_2": target_2, - "volume": current_volume, - "last_update": last_update, - "trail_stop": trail_stop, - "position_size": position_size + 'ticker': ticker, + 'price': current_price, + 'volume': current_volume, + 'last_update': datetime.fromtimestamp(last_update/1000000000) } - bullish_signals.append(signal_data) - - # Print result - print(f"\n🟢 {ticker} @ ${entry_price:.2f}") - print(f" 🎯 Target 1: ${target_1:.2f} | Target 2: ${target_2:.2f}") - if trail_active: - print(f" 🚨 Trailing Stop: ${trail_stop:.2f}") - + + if calculator: + position = calculator.calculate_position_size(current_price) + signal_data.update({ + 'shares': position['shares'], + 'position_size': position['position_value'], + 'stop_loss': position['stop_loss'], + 'risk': position['potential_loss'] + }) + + entry_signals.append(signal_data) + + # Print signal information + print(f"\n🔍 {ticker} @ ${current_price:.2f}") + if calculator: + print(f" Size: {signal_data['shares']} shares (${signal_data['position_size']:.2f})") + print(f" Stop: ${signal_data['stop_loss']:.2f} (7%)") + print(f" Risk: ${abs(signal_data['risk']):.2f}") + except Exception as e: - print(f"❌ Error processing {ticker}: {e}") + print(f"Error processing {ticker}: {str(e)}") continue - + # Save results - if bullish_signals: - output_dir = "reports" + if entry_signals: + output_dir = 'reports' os.makedirs(output_dir, exist_ok=True) - output_file = f"{output_dir}/atr_ema_targets_{datetime.now().strftime('%Y%m%d_%H%M')}.csv" - pd.DataFrame(bullish_signals).to_csv(output_file, index=False) - print(f"\n📁 Saved bullish signals to {output_file}") + output_date = datetime.now().strftime("%Y%m%d_%H%M") + output_file = f'{output_dir}/atr_ema_v2_{output_date}.csv' + + df_signals = pd.DataFrame(entry_signals) + df_signals.to_csv(output_file, index=False) + print(f"\nSaved {len(entry_signals)} signals to {output_file}") else: - print("❌ No bullish signals found.") - + print("\nNo entry signals found") + except Exception as e: - print(f"❌ Error during scan: {e}") + print(f"Error during scan: {str(e)}")