231 lines
8.2 KiB
Python
231 lines
8.2 KiB
Python
import os
|
|
import os
|
|
import pandas as pd
|
|
from datetime import datetime, timedelta
|
|
from db.db_connection import create_client
|
|
|
|
def validate_signal_date(signal_date: datetime) -> datetime:
|
|
"""
|
|
Validate and adjust signal date if needed
|
|
|
|
Args:
|
|
signal_date (datetime): Signal date to validate
|
|
|
|
Returns:
|
|
datetime: Valid signal date (not in future)
|
|
"""
|
|
current_date = datetime.now()
|
|
if signal_date > current_date:
|
|
return current_date
|
|
return signal_date
|
|
|
|
def print_signal(signal_data: dict, signal_type: str = "🔍") -> None:
|
|
"""
|
|
Print standardized signal output
|
|
|
|
Args:
|
|
signal_data (dict): Dictionary containing signal information
|
|
signal_type (str): Emoji indicator for signal type (default: 🔍)
|
|
"""
|
|
try:
|
|
print(f"\n{signal_type} {signal_data['ticker']} @ ${signal_data['entry_price']:.2f} on {signal_data['signal_date'].strftime('%Y-%m-%d %H:%M')}")
|
|
print(f" Size: {signal_data['shares']} shares (${signal_data['position_size']:.2f})")
|
|
print(f" Stop: ${signal_data['stop_loss']:.2f} (7%) | Target: ${signal_data['target_price']:.2f}")
|
|
print(f" Risk/Reward: 1:{signal_data['risk_reward_ratio']:.1f} | Risk: ${abs(signal_data['risk_amount']):.2f}")
|
|
print(f" Potential Profit: ${signal_data['profit_amount']:.2f}")
|
|
except KeyError as e:
|
|
print(f"Error printing signal for {signal_data.get('ticker', 'Unknown')}: Missing key {e}")
|
|
# Print available keys for debugging
|
|
print(f"Available keys: {list(signal_data.keys())}")
|
|
|
|
def get_qualified_stocks(start_date: datetime, end_date: datetime, min_price: float, max_price: float, min_volume: int) -> list:
|
|
"""
|
|
Get qualified stocks based on price and volume criteria within date range
|
|
|
|
Args:
|
|
start_date (datetime): Start date for data fetch
|
|
end_date (datetime): End date for data fetch
|
|
min_price (float): Minimum stock price
|
|
max_price (float): Maximum stock price
|
|
min_volume (int): Minimum trading volume
|
|
|
|
Returns:
|
|
list: List of tuples (ticker, price, volume, last_update)
|
|
"""
|
|
try:
|
|
start_ts = int(start_date.timestamp() * 1000000000)
|
|
end_ts = int(end_date.timestamp() * 1000000000)
|
|
|
|
with create_client() as client:
|
|
query = f"""
|
|
WITH filtered_data AS (
|
|
SELECT
|
|
ticker,
|
|
window_start,
|
|
close,
|
|
volume,
|
|
toDateTime(toDateTime(window_start/1000000000)) as trade_date
|
|
FROM stock_db.stock_prices
|
|
WHERE window_start BETWEEN {start_ts} AND {end_ts}
|
|
AND toDateTime(window_start/1000000000) <= now()
|
|
),
|
|
daily_data AS (
|
|
SELECT
|
|
ticker,
|
|
toDate(trade_date) as date,
|
|
argMax(close, window_start) as daily_close,
|
|
sum(volume) as daily_volume
|
|
FROM filtered_data
|
|
GROUP BY ticker, toDate(trade_date)
|
|
HAVING daily_close BETWEEN {min_price} AND {max_price}
|
|
AND daily_volume >= {min_volume}
|
|
),
|
|
latest_data AS (
|
|
SELECT
|
|
ticker,
|
|
argMax(daily_close, date) as last_close,
|
|
sum(daily_volume) as total_volume,
|
|
max(toUnixTimestamp(date)) as last_update
|
|
FROM daily_data
|
|
GROUP BY ticker
|
|
)
|
|
SELECT
|
|
ticker,
|
|
last_close,
|
|
total_volume,
|
|
last_update
|
|
FROM latest_data
|
|
ORDER BY ticker
|
|
"""
|
|
|
|
result = client.query(query)
|
|
qualified_stocks = [(row[0], row[1], row[2], row[3]) for row in result.result_rows]
|
|
|
|
return qualified_stocks
|
|
|
|
except Exception as e:
|
|
print(f"Error getting qualified stocks: {str(e)}")
|
|
return []
|
|
|
|
def save_signals_to_csv(signals: list, scanner_name: str) -> None:
|
|
"""
|
|
Save signals to CSV file with standardized format and naming
|
|
|
|
Args:
|
|
signals (list): List of signal dictionaries
|
|
scanner_name (str): Name of the scanner for file naming
|
|
"""
|
|
if not signals:
|
|
print("\nNo signals found")
|
|
return
|
|
|
|
output_dir = 'reports'
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
output_date = datetime.now().strftime("%Y%m%d_%H%M")
|
|
output_file = f'{output_dir}/{scanner_name}_{output_date}.csv'
|
|
|
|
df_signals = pd.DataFrame(signals)
|
|
df_signals.to_csv(output_file, index=False)
|
|
print(f"\nSaved {len(signals)} signals to {output_file}")
|
|
|
|
def get_stock_data(ticker: str, start_date: datetime, end_date: datetime, interval: str) -> pd.DataFrame:
|
|
"""
|
|
Fetch and resample stock data based on the chosen interval
|
|
|
|
Args:
|
|
ticker (str): Stock ticker symbol
|
|
start_date (datetime): Start date for data fetch
|
|
end_date (datetime): End date for data fetch
|
|
interval (str): Time interval for data ('daily', '5min', '15min', '30min', '1hour')
|
|
|
|
Returns:
|
|
pd.DataFrame: Resampled DataFrame with OHLCV data
|
|
"""
|
|
try:
|
|
with create_client() as client:
|
|
# Expand window to get enough data for calculations
|
|
start_date = start_date - timedelta(days=90)
|
|
|
|
# Base query to get raw data at finest granularity
|
|
query = f"""
|
|
SELECT
|
|
toDateTime(window_start/1000000000) as date,
|
|
open,
|
|
high,
|
|
low,
|
|
close,
|
|
volume
|
|
FROM stock_db.stock_prices
|
|
WHERE ticker = '{ticker}'
|
|
AND window_start BETWEEN
|
|
{int(start_date.timestamp() * 1e9)} AND
|
|
{int(end_date.timestamp() * 1e9)}
|
|
AND toDateTime(window_start/1000000000) <= now()
|
|
ORDER BY date ASC
|
|
"""
|
|
|
|
result = client.query(query)
|
|
|
|
if not result.result_rows:
|
|
return pd.DataFrame()
|
|
|
|
# Create base DataFrame
|
|
df = pd.DataFrame(
|
|
result.result_rows,
|
|
columns=['date', 'open', 'high', 'low', 'close', 'volume']
|
|
)
|
|
|
|
# Convert numeric columns
|
|
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
|
|
for col in numeric_columns:
|
|
df[col] = pd.to_numeric(df[col], errors='coerce')
|
|
|
|
# Convert date column
|
|
df['date'] = pd.to_datetime(df['date'])
|
|
|
|
# Set date as index for resampling
|
|
df.set_index('date', inplace=True)
|
|
|
|
# Resample based on interval
|
|
if interval == 'daily':
|
|
rule = '1D'
|
|
elif interval == '5min':
|
|
rule = '5T'
|
|
elif interval == '15min':
|
|
rule = '15T'
|
|
elif interval == '30min':
|
|
rule = '30T'
|
|
elif interval == '1hour':
|
|
rule = '1H'
|
|
else:
|
|
rule = '1D' # Default to daily
|
|
|
|
resampled = df.resample(rule).agg({
|
|
'open': 'first',
|
|
'high': 'max',
|
|
'low': 'min',
|
|
'close': 'last',
|
|
'volume': 'sum'
|
|
}).dropna()
|
|
|
|
# Reset index to get date as column
|
|
resampled.reset_index(inplace=True)
|
|
|
|
# Filter to requested date range
|
|
mask = (resampled['date'] >= start_date + timedelta(days=89)) & (resampled['date'] <= end_date)
|
|
resampled = resampled.loc[mask]
|
|
|
|
# Handle null values
|
|
if resampled['close'].isnull().any():
|
|
print(f"Warning: Found null values in close prices")
|
|
resampled = resampled.dropna(subset=['close'])
|
|
|
|
if resampled.empty or 'close' not in resampled.columns:
|
|
return pd.DataFrame()
|
|
|
|
return resampled
|
|
|
|
except Exception as e:
|
|
print(f"Error fetching {ticker} data: {str(e)}")
|
|
return pd.DataFrame()
|