stock_system/src/screener/t_atr_ema_v2.py

217 lines
9.1 KiB
Python

from screener.user_input import get_interval_choice
import os
from datetime import datetime, timedelta
import pandas as pd
from db.db_connection import create_client
from trading.position_calculator import PositionCalculator
from screener.t_sunnyband import get_stock_data
from indicators.three_atr_ema import ThreeATREMAIndicator
def run_atr_ema_target_scanner(min_price: float, max_price: float, min_volume: int, portfolio_size: float = None):
print(f"\n🔍 Scanning for stocks ${min_price:.2f}-${max_price:.2f} with min volume {min_volume:,}")
interval = get_interval_choice()
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
# Convert to Unix timestamps (nanoseconds)
start_ts = int(start_date.timestamp() * 1e9)
end_ts = int(end_date.timestamp() * 1e9)
client = create_client()
try:
query = f"""
WITH latest_data AS (
SELECT
ticker,
sum(volume) AS total_volume,
argMax(open, window_start) AS last_open,
argMax(close, window_start) AS last_close,
argMax(high, window_start) AS last_high,
argMax(low, window_start) AS last_low,
max(window_start) AS last_update,
sum(transactions) AS transaction_count
FROM stock_db.stock_prices
WHERE window_start BETWEEN {start_ts} AND {end_ts}
AND toYear(toDateTime(window_start/1000000000)) <= toYear(now())
AND toYear(toDateTime(window_start/1000000000)) >= (toYear(now()) - 1)
GROUP BY ticker
HAVING last_close BETWEEN {min_price} AND {max_price}
AND total_volume >= {min_volume}
)
SELECT
ticker,
total_volume,
last_open,
last_close,
last_high,
last_low,
last_update,
transaction_count
FROM latest_data
ORDER BY ticker
"""
result = client.query(query)
stocks = result.result_rows
if not stocks:
print("❌ No stocks found matching criteria.")
return
print(f"\n✅ Found {len(stocks)} stocks matching criteria")
# **Correct column order as per ClickHouse output**
columns = ["ticker", "volume", "open", "close", "high", "low", "window_start", "transactions"]
df_stocks = pd.DataFrame(stocks, columns=columns)
# **Convert timestamps from nanoseconds to readable datetime**
df_stocks["window_start"] = pd.to_datetime(df_stocks["window_start"], unit="ns")
# Debugging: Check if columns exist
print("\n📊 Data Sample from ClickHouse Query:")
print(df_stocks.head())
indicator = ThreeATREMAIndicator()
calculator = PositionCalculator(portfolio_size, risk_percentage=1.0, stop_loss_percentage=7.0) if portfolio_size else None
bullish_signals = []
for _, row in df_stocks.iterrows():
ticker = row["ticker"]
current_price = row["close"]
current_volume = row["volume"]
last_update = row["window_start"]
try:
# Get historical data
df = get_stock_data(ticker, start_date, end_date, interval)
# Add date validation
if not df.empty:
max_date = df['date'].max()
min_date = df['date'].min()
if max_date.year > end_date.year or min_date.year < (end_date.year - 1):
print(f"⚠️ {ticker}: Invalid date range ({min_date.date()} to {max_date.date()})")
continue
# Enhanced validation with debugging
if df.empty:
print(f"⚠️ {ticker}: Empty DataFrame")
continue
# Debug data
print(f"\nProcessing {ticker}")
print(f"Columns present: {df.columns.tolist()}")
print(f"Data types: {df.dtypes}")
print(f"First row: {df.iloc[0]}")
print(f"Last row: {df.iloc[-1]}")
# Ensure DataFrame has required columns and proper types
required_columns = ['date', 'open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_columns):
missing = set(required_columns) - set(df.columns)
print(f"⚠️ {ticker}: Missing columns: {missing}")
continue
# Convert columns to numeric and handle any conversion errors
for col in ['open', 'high', 'low', 'close', 'volume']:
try:
df[col] = pd.to_numeric(df[col], errors='coerce')
except Exception as e:
print(f"⚠️ {ticker}: Error converting {col} to numeric: {str(e)}")
print(f"Sample of problematic column: {df[col].head()}")
raise ValueError(f"Data conversion error in {col}")
# Verify data validity after conversion
if df['close'].isnull().any():
null_rows = df[df['close'].isnull()]
print(f"⚠️ {ticker}: Contains {len(null_rows)} null values in close price")
print("First few null rows:")
print(null_rows.head())
continue
if len(df) < 50:
print(f"⚠️ {ticker}: Insufficient data points ({len(df)})")
continue
# Make a clean copy for indicator calculation
calc_df = df.copy()
# Calculate indicator with validated data
try:
results = indicator.calculate(calc_df)
if results.empty:
print(f"⚠️ {ticker}: No valid indicator results")
continue
except Exception as e:
print(f"⚠️ {ticker}: Error calculating indicator:")
print(f"Error details: {str(e)}")
print(f"Data shape: {calc_df.shape}")
print(f"Sample data:\n{calc_df.head()}")
continue
results = indicator.calculate(df)
last_row = results.iloc[-1]
prev_row = results.iloc[-2]
bullish_entry = (
last_row["close"] < last_row["ema"] and
prev_row["close"] <= prev_row["lower_band"] and
last_row["close"] > prev_row["close"]
)
if bullish_entry:
entry_price = last_row["close"]
target_1 = entry_price * 1.10 # 10% profit
target_2 = entry_price * 1.20 # 20% profit
# Trailing stop logic
trail_stop = None
trail_active = False
if last_row["close"] >= last_row["upper_band"]:
trail_active = True
highest_price = max(results["high"].iloc[-5:]) # Last 5 days
trail_stop = highest_price * 0.98 # 2% below high
# Position sizing
position = calculator.calculate_position_size(entry_price, target_2) if calculator else None
position_size = position["position_value"] if position else None
# Save signal
signal_data = {
"ticker": ticker,
"entry_price": entry_price,
"target_1": target_1,
"target_2": target_2,
"volume": current_volume,
"last_update": last_update,
"trail_stop": trail_stop,
"position_size": position_size
}
bullish_signals.append(signal_data)
# Print result
print(f"\n🟢 {ticker} @ ${entry_price:.2f}")
print(f" 🎯 Target 1: ${target_1:.2f} | Target 2: ${target_2:.2f}")
if trail_active:
print(f" 🚨 Trailing Stop: ${trail_stop:.2f}")
except Exception as e:
print(f"❌ Error processing {ticker}: {e}")
continue
# Save results
if bullish_signals:
output_dir = "reports"
os.makedirs(output_dir, exist_ok=True)
output_file = f"{output_dir}/atr_ema_targets_{datetime.now().strftime('%Y%m%d_%H%M')}.csv"
pd.DataFrame(bullish_signals).to_csv(output_file, index=False)
print(f"\n📁 Saved bullish signals to {output_file}")
else:
print("❌ No bullish signals found.")
except Exception as e:
print(f"❌ Error during scan: {e}")