stock_system/src/utils/scanner_utils.py

86 lines
3.6 KiB
Python

from datetime import datetime, timedelta
from utils.common_utils import get_user_input, get_stock_data, get_qualified_stocks
from screener.user_input import get_interval_choice, get_date_range
from trading.position_calculator import PositionCalculator
from typing import Optional
def initialize_scanner(min_price: float, max_price: float, min_volume: int,
portfolio_size: float = None, interval: str = "1d",
start_date: datetime = None, end_date: datetime = None) -> tuple:
"""
Initialize common scanner components
Args:
min_price (float): Minimum stock price
max_price (float): Maximum stock price
min_volume (int): Minimum volume threshold
portfolio_size (float, optional): Portfolio size for position calculations
interval (str, optional): Time interval for data (default: "1d")
start_date (datetime, optional): Start date for scanning
end_date (datetime, optional): End date for scanning
"""
print(f"\nScanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}")
if not start_date or not end_date:
raise ValueError("start_date and end_date must be provided")
qualified_stocks = get_qualified_stocks(start_date, end_date, min_price, max_price, min_volume)
if not qualified_stocks:
print("No stocks found matching criteria.")
return None, None, None, None, None
print(f"\nFound {len(qualified_stocks)} stocks matching criteria")
# Initialize position calculator if portfolio size provided
calculator = None
if portfolio_size and portfolio_size > 0:
calculator = PositionCalculator(
account_size=portfolio_size,
risk_percentage=1.0,
stop_loss_percentage=7.0
)
return interval, start_date, end_date, qualified_stocks, calculator
def process_signal_data(ticker: str, signal_data: dict, current_volume: int,
last_update: int, stock_type: str, calculator: PositionCalculator = None) -> dict:
"""
Process and format signal data consistently
"""
entry_price = signal_data['price']
# Determine target price based on signal type
if 'ha_close' in signal_data: # Heikin Ashi signal
# Use a 2:1 reward-to-risk ratio for Heikin Ashi
stop_loss_pct = 0.07 # 7% stop loss
stop_distance = entry_price * stop_loss_pct
target_price = entry_price + (stop_distance * 2) # 2x the stop distance
else:
# Handle other signal types (ATR-EMA or Sunny Bands)
target_price = signal_data.get('ema', signal_data.get('upper_band'))
entry_data = {
'ticker': ticker,
'entry_price': entry_price,
'target_price': target_price,
'volume': current_volume,
'signal_date': signal_data.get('date', datetime.now()),
'stock_type': stock_type,
'last_update': datetime.fromtimestamp(last_update/1000000000)
}
if calculator:
position = calculator.calculate_position_size(entry_price)
potential_profit = (target_price - entry_price) * position['shares']
entry_data.update({
'shares': position['shares'],
'position_size': position['position_value'],
'stop_loss': position['stop_loss'],
'risk_amount': position['potential_loss'],
'profit_amount': potential_profit,
'risk_reward_ratio': abs(potential_profit / position['potential_loss']) if position['potential_loss'] != 0 else 0
})
return entry_data