From 572ab3a3e1f03a4a57a9efabcccf9887e18b76c5 Mon Sep 17 00:00:00 2001 From: "Bobby (aider)" Date: Sun, 9 Feb 2025 12:18:40 -0800 Subject: [PATCH] refactor: Standardize scanner implementations using utility functions --- src/screener/t_atr_ema.py | 74 +++++++-------------------------- src/screener/t_sunnyband.py | 83 ++++++++----------------------------- 2 files changed, 34 insertions(+), 123 deletions(-) diff --git a/src/screener/t_atr_ema.py b/src/screener/t_atr_ema.py index 6b1919e..e8531da 100644 --- a/src/screener/t_atr_ema.py +++ b/src/screener/t_atr_ema.py @@ -1,10 +1,8 @@ -from screener.user_input import get_interval_choice, get_date_range -import os -from datetime import datetime, timedelta -import pandas as pd -from db.db_connection import create_client -from trading.position_calculator import PositionCalculator -from utils.data_utils import get_stock_data, validate_signal_date, print_signal, save_signals_to_csv, get_qualified_stocks +from utils.data_utils import ( + get_stock_data, validate_signal_date, print_signal, + save_signals_to_csv, get_qualified_stocks, + initialize_scanner, process_signal_data +) from indicators.three_atr_ema import ThreeATREMAIndicator def check_entry_signal(df: pd.DataFrame) -> list: @@ -61,74 +59,34 @@ def check_entry_signal(df: pd.DataFrame) -> list: return signals def run_atr_ema_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None: - print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}") - - # Get time interval - interval = get_interval_choice() - - start_date, end_date = get_date_range() - start_ts = int(start_date.timestamp() * 1000000000) - end_ts = int(end_date.timestamp() * 1000000000) - try: - qualified_stocks = get_qualified_stocks(start_date, end_date, min_price, max_price, min_volume) + # Initialize scanner components + interval, start_date, end_date, qualified_stocks, calculator = initialize_scanner( + min_price, max_price, min_volume, portfolio_size + ) if not qualified_stocks: - print("No stocks found matching criteria.") return - - print(f"\nFound {len(qualified_stocks)} stocks matching criteria") - - # Initialize indicators - indicator = ThreeATREMAIndicator() - calculator = None - if portfolio_size and portfolio_size > 0: - calculator = PositionCalculator( - account_size=portfolio_size, - risk_percentage=1.0, - stop_loss_percentage=7.0 # Explicitly set 7% stop - ) bullish_signals = [] for ticker, current_price, current_volume, last_update, stock_type in qualified_stocks: try: - # Get historical data based on interval df = get_stock_data(ticker, start_date, end_date, interval) - if df.empty or len(df) < 50: # Need at least 50 bars for the indicator + if df.empty or len(df) < 21: # Need at least 21 bars for EMA continue - results = indicator.calculate(df) - - # Check for signals throughout the date range signals = check_entry_signal(df) for signal, signal_date, signal_data in signals: - entry_data = { - 'ticker': ticker, - 'entry_price': signal_data['price'], - 'target_price': signal_data['ema'], - 'volume': current_volume, - 'signal_date': signal_date, - 'stock_type': stock_type, # Add stock type - 'last_update': datetime.fromtimestamp(last_update/1000000000) - } - - if calculator: - position = calculator.calculate_position_size(entry_data['entry_price']) - potential_profit = (entry_data['target_price'] - entry_data['entry_price']) * position['shares'] - entry_data.update({ - 'shares': position['shares'], - 'position_size': position['position_value'], - 'stop_loss': position['stop_loss'], - 'risk_amount': position['potential_loss'], - 'profit_amount': potential_profit, - 'risk_reward_ratio': abs(potential_profit / position['potential_loss']) if position['potential_loss'] != 0 else 0 - }) - + signal_data['date'] = signal_date + entry_data = process_signal_data( + ticker, signal_data, current_volume, + last_update, stock_type, calculator + ) bullish_signals.append(entry_data) print_signal(entry_data, "🟢") - + except Exception as e: print(f"Error processing {ticker}: {str(e)}") continue diff --git a/src/screener/t_sunnyband.py b/src/screener/t_sunnyband.py index 7007a84..990237a 100644 --- a/src/screener/t_sunnyband.py +++ b/src/screener/t_sunnyband.py @@ -1,8 +1,8 @@ -import os -import numpy as np -from datetime import datetime, timedelta -import pandas as pd -from db.db_connection import create_client +from utils.data_utils import ( + get_stock_data, validate_signal_date, print_signal, + save_signals_to_csv, get_qualified_stocks, + initialize_scanner, process_signal_data +) from indicators.sunny_bands import SunnyBands from trading.position_calculator import PositionCalculator from screener.user_input import get_interval_choice, get_date_range @@ -200,86 +200,39 @@ def view_stock_details(ticker: str, interval: str, start_date: datetime, end_dat print(f"Error analyzing {ticker}: {str(e)}") def run_sunny_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None: - print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}") - - interval = get_interval_choice() - - # Get date range from user input - start_date, end_date = get_date_range() - - # First get qualified stocks from database - # Convert dates to Unix timestamp in nanoseconds - end_ts = int(end_date.timestamp() * 1000000000) - start_ts = int(start_date.timestamp() * 1000000000) - try: - qualified_stocks = get_qualified_stocks(start_date, end_date, min_price, max_price, min_volume) + # Initialize scanner components + interval, start_date, end_date, qualified_stocks, calculator = initialize_scanner( + min_price, max_price, min_volume, portfolio_size + ) if not qualified_stocks: - print("No stocks found matching criteria.") return - - print(f"\nFound {len(qualified_stocks)} stocks matching criteria") - - # Initialize indicators - sunny = SunnyBands() - calculator = None - if portfolio_size and portfolio_size > 0: - calculator = PositionCalculator( - account_size=portfolio_size, - risk_percentage=1.0, - stop_loss_percentage=7.0 # Explicit 7% stop loss - ) - + bullish_signals = [] - bearish_signals = [] - # Process each qualified stock for ticker, current_price, current_volume, last_update, stock_type in qualified_stocks: try: - # Get historical data based on interval df = get_stock_data(ticker, start_date, end_date, interval) if df.empty or len(df) < 50: # Need at least 50 bars for the indicator continue - # Check for signals throughout the date range signals = check_entry_signal(df) for signal, signal_date, signal_data in signals: - if calculator: - try: - position = calculator.calculate_position_size( - entry_price=signal_data['price'], - target_price=signal_data['upper_band'] - ) - if position['shares'] > 0: - entry_data = { - 'ticker': ticker, - 'entry_price': signal_data['price'], - 'target_price': signal_data['upper_band'], - 'signal_date': signal_date, - 'volume': current_volume, - 'stock_type': stock_type, - 'last_update': datetime.fromtimestamp(last_update/1000000000), - 'shares': position['shares'], - 'position_size': position['position_value'], - 'stop_loss': signal_data['price'] * 0.93, # 7% stop loss - 'risk_amount': position['potential_loss'], - 'profit_amount': position['potential_profit'], - 'risk_reward_ratio': position['risk_reward_ratio'] - } - bullish_signals.append(entry_data) - print_signal(entry_data, "🟢") - - except ValueError as e: - print(f"Skipping {ticker} position: {str(e)}") - continue + signal_data['date'] = signal_date + entry_data = process_signal_data( + ticker, signal_data, current_volume, + last_update, stock_type, calculator + ) + bullish_signals.append(entry_data) + print_signal(entry_data, "🟢") except Exception as e: print(f"Error processing {ticker}: {str(e)}") continue save_signals_to_csv(bullish_signals, 'sunny') - + except Exception as e: print(f"Error during scan: {str(e)}")