refactor: Standardize scanner implementations using utility functions

This commit is contained in:
Bobby (aider) 2025-02-09 12:18:40 -08:00
parent f7ab8d6513
commit 572ab3a3e1
2 changed files with 34 additions and 123 deletions

View File

@ -1,10 +1,8 @@
from screener.user_input import get_interval_choice, get_date_range
import os
from datetime import datetime, timedelta
import pandas as pd
from db.db_connection import create_client
from trading.position_calculator import PositionCalculator
from utils.data_utils import get_stock_data, validate_signal_date, print_signal, save_signals_to_csv, get_qualified_stocks
from utils.data_utils import (
get_stock_data, validate_signal_date, print_signal,
save_signals_to_csv, get_qualified_stocks,
initialize_scanner, process_signal_data
)
from indicators.three_atr_ema import ThreeATREMAIndicator
def check_entry_signal(df: pd.DataFrame) -> list:
@ -61,74 +59,34 @@ def check_entry_signal(df: pd.DataFrame) -> list:
return signals
def run_atr_ema_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None:
print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}")
# Get time interval
interval = get_interval_choice()
start_date, end_date = get_date_range()
start_ts = int(start_date.timestamp() * 1000000000)
end_ts = int(end_date.timestamp() * 1000000000)
try:
qualified_stocks = get_qualified_stocks(start_date, end_date, min_price, max_price, min_volume)
# Initialize scanner components
interval, start_date, end_date, qualified_stocks, calculator = initialize_scanner(
min_price, max_price, min_volume, portfolio_size
)
if not qualified_stocks:
print("No stocks found matching criteria.")
return
print(f"\nFound {len(qualified_stocks)} stocks matching criteria")
# Initialize indicators
indicator = ThreeATREMAIndicator()
calculator = None
if portfolio_size and portfolio_size > 0:
calculator = PositionCalculator(
account_size=portfolio_size,
risk_percentage=1.0,
stop_loss_percentage=7.0 # Explicitly set 7% stop
)
bullish_signals = []
for ticker, current_price, current_volume, last_update, stock_type in qualified_stocks:
try:
# Get historical data based on interval
df = get_stock_data(ticker, start_date, end_date, interval)
if df.empty or len(df) < 50: # Need at least 50 bars for the indicator
if df.empty or len(df) < 21: # Need at least 21 bars for EMA
continue
results = indicator.calculate(df)
# Check for signals throughout the date range
signals = check_entry_signal(df)
for signal, signal_date, signal_data in signals:
entry_data = {
'ticker': ticker,
'entry_price': signal_data['price'],
'target_price': signal_data['ema'],
'volume': current_volume,
'signal_date': signal_date,
'stock_type': stock_type, # Add stock type
'last_update': datetime.fromtimestamp(last_update/1000000000)
}
if calculator:
position = calculator.calculate_position_size(entry_data['entry_price'])
potential_profit = (entry_data['target_price'] - entry_data['entry_price']) * position['shares']
entry_data.update({
'shares': position['shares'],
'position_size': position['position_value'],
'stop_loss': position['stop_loss'],
'risk_amount': position['potential_loss'],
'profit_amount': potential_profit,
'risk_reward_ratio': abs(potential_profit / position['potential_loss']) if position['potential_loss'] != 0 else 0
})
signal_data['date'] = signal_date
entry_data = process_signal_data(
ticker, signal_data, current_volume,
last_update, stock_type, calculator
)
bullish_signals.append(entry_data)
print_signal(entry_data, "🟢")
except Exception as e:
print(f"Error processing {ticker}: {str(e)}")
continue

View File

@ -1,8 +1,8 @@
import os
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
from db.db_connection import create_client
from utils.data_utils import (
get_stock_data, validate_signal_date, print_signal,
save_signals_to_csv, get_qualified_stocks,
initialize_scanner, process_signal_data
)
from indicators.sunny_bands import SunnyBands
from trading.position_calculator import PositionCalculator
from screener.user_input import get_interval_choice, get_date_range
@ -200,86 +200,39 @@ def view_stock_details(ticker: str, interval: str, start_date: datetime, end_dat
print(f"Error analyzing {ticker}: {str(e)}")
def run_sunny_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None) -> None:
print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}")
interval = get_interval_choice()
# Get date range from user input
start_date, end_date = get_date_range()
# First get qualified stocks from database
# Convert dates to Unix timestamp in nanoseconds
end_ts = int(end_date.timestamp() * 1000000000)
start_ts = int(start_date.timestamp() * 1000000000)
try:
qualified_stocks = get_qualified_stocks(start_date, end_date, min_price, max_price, min_volume)
# Initialize scanner components
interval, start_date, end_date, qualified_stocks, calculator = initialize_scanner(
min_price, max_price, min_volume, portfolio_size
)
if not qualified_stocks:
print("No stocks found matching criteria.")
return
print(f"\nFound {len(qualified_stocks)} stocks matching criteria")
# Initialize indicators
sunny = SunnyBands()
calculator = None
if portfolio_size and portfolio_size > 0:
calculator = PositionCalculator(
account_size=portfolio_size,
risk_percentage=1.0,
stop_loss_percentage=7.0 # Explicit 7% stop loss
)
bullish_signals = []
bearish_signals = []
# Process each qualified stock
for ticker, current_price, current_volume, last_update, stock_type in qualified_stocks:
try:
# Get historical data based on interval
df = get_stock_data(ticker, start_date, end_date, interval)
if df.empty or len(df) < 50: # Need at least 50 bars for the indicator
continue
# Check for signals throughout the date range
signals = check_entry_signal(df)
for signal, signal_date, signal_data in signals:
if calculator:
try:
position = calculator.calculate_position_size(
entry_price=signal_data['price'],
target_price=signal_data['upper_band']
)
if position['shares'] > 0:
entry_data = {
'ticker': ticker,
'entry_price': signal_data['price'],
'target_price': signal_data['upper_band'],
'signal_date': signal_date,
'volume': current_volume,
'stock_type': stock_type,
'last_update': datetime.fromtimestamp(last_update/1000000000),
'shares': position['shares'],
'position_size': position['position_value'],
'stop_loss': signal_data['price'] * 0.93, # 7% stop loss
'risk_amount': position['potential_loss'],
'profit_amount': position['potential_profit'],
'risk_reward_ratio': position['risk_reward_ratio']
}
bullish_signals.append(entry_data)
print_signal(entry_data, "🟢")
except ValueError as e:
print(f"Skipping {ticker} position: {str(e)}")
continue
signal_data['date'] = signal_date
entry_data = process_signal_data(
ticker, signal_data, current_volume,
last_update, stock_type, calculator
)
bullish_signals.append(entry_data)
print_signal(entry_data, "🟢")
except Exception as e:
print(f"Error processing {ticker}: {str(e)}")
continue
save_signals_to_csv(bullish_signals, 'sunny')
except Exception as e:
print(f"Error during scan: {str(e)}")