Features: ML

This commit is contained in:
Bobby 2025-03-10 08:38:58 -07:00
parent 8dbf51107f
commit b7aa844a24
7 changed files with 2160 additions and 263 deletions

View File

@ -30,3 +30,11 @@ pandas-ta
# Technical Analysis
ta
ta-lib
# AI and Machine Learning
tensorflow>=2.10.0
scikit-learn>=1.0.0
matplotlib>=3.5.0
# Add this to your requirements.txt
pandas_market_calendars>=3.0.0

94
src/app.py Normal file
View File

@ -0,0 +1,94 @@
import streamlit as st
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import pytz
import time
import os
import logging
# Import page modules
from pages.dashboard.dashboard_page import dashboard_page
from pages.screener.screener_page import screener_page
from pages.screener.technical_scanner_page import technical_scanner_page
from pages.journal.trading_journal_page import trading_journal_page
from pages.trading.trading_system_page import trading_system_page
from pages.analysis.monte_carlo_page import monte_carlo_page
from pages.analysis.ai_forecast_page import ai_forecast_page
from pages.backtesting.backtesting_page import backtesting_page
from pages.settings.settings_page import settings_page
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Set page config
st.set_page_config(
page_title="Trading Dashboard",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded"
)
# Define page mapping
pages = {
"Dashboard": dashboard_page,
"Stock Screener": {
"Basic Screener": screener_page,
"Technical Scanner": technical_scanner_page
},
"Trading Journal": trading_journal_page,
"Trading System": trading_system_page,
"Analysis": {
"Monte Carlo Simulation": monte_carlo_page,
"AI Stock Forecast": ai_forecast_page
},
"Backtesting": backtesting_page,
"Settings": settings_page
}
def main():
# Sidebar navigation
st.sidebar.title("Navigation")
# Handle nested pages
selected_page = None
selected_subpage = None
# First level navigation
main_page = st.sidebar.radio(
"Select Page",
options=list(pages.keys())
)
# Check if the selected page has subpages
if isinstance(pages[main_page], dict):
# Second level navigation
selected_subpage = st.sidebar.radio(
f"Select {main_page} Option",
options=list(pages[main_page].keys())
)
selected_page = pages[main_page][selected_subpage]
else:
selected_page = pages[main_page]
# Add a separator
st.sidebar.markdown("---")
# Add app info in sidebar
st.sidebar.info(
"""
**Trading Dashboard App**
Version 1.0
A comprehensive trading toolkit for analysis,
journaling, and decision making.
"""
)
# Display the selected page
selected_page()
if __name__ == "__main__":
main()

View File

@ -4,6 +4,7 @@ import time
import clickhouse_connect
from dotenv import load_dotenv
from contextlib import contextmanager
from datetime import datetime
# Load environment variables from .env file
load_dotenv()
@ -52,3 +53,75 @@ def create_client():
client.close()
except Exception as e:
logger.warning(f"Error closing connection: {str(e)}")
def get_current_prices(tickers):
"""Fetch current prices for the given tickers from ClickHouse using window_start."""
try:
with create_client() as client:
# Get the current time
now = datetime.now()
print(f"Current datetime: {now}")
# First check if there's any data for today
today_check_query = """
SELECT count() as count
FROM stock_db.stock_prices
WHERE window_start >= toUnixTimestamp(today()) * 1000000000
"""
today_result = client.query(today_check_query)
today_count = today_result.result_rows[0][0]
print(f"Number of records for today: {today_count}")
# If we have data for today, use it
if today_count > 0:
print("Using today's data")
query = f"""
SELECT ticker,
argMax(close, window_start) as close,
max(window_start) as last_update
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
AND window_start >= toUnixTimestamp(today()) * 1000000000
GROUP BY ticker
"""
else:
# Otherwise get the most recent data
print("No data for today, using most recent data")
query = f"""
SELECT ticker,
argMax(close, window_start) as close,
max(window_start) as last_update
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
GROUP BY ticker
"""
print(f"Executing query: {query}")
result = client.query(query)
# Process results with timestamp information
prices = {}
timestamps = {}
for row in result.result_rows:
ticker = row[0]
price = row[1]
timestamp = row[2]
prices[ticker] = price
timestamps[ticker] = timestamp
# Convert timestamps to readable format
readable_times = {}
for ticker, ts in timestamps.items():
try:
dt = datetime.fromtimestamp(ts / 1e9) # Convert from nanoseconds
readable_times[ticker] = dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
readable_times[ticker] = f"Error: {str(e)}"
print(f"Retrieved prices: {prices}")
print(f"Last updated: {readable_times}")
return prices
except Exception as e:
print(f"Error fetching current prices: {str(e)}")
return {}

View File

@ -0,0 +1,657 @@
import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime, timedelta
from utils.common_utils import get_stock_data
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt
import io
import base64
import tempfile
import os
from pandas.tseries.offsets import BDay
import pandas_market_calendars as mcal
class AIForecaster:
def __init__(self, data: pd.DataFrame, forecast_days: int, lookback_window: int = 60):
"""
Initialize AI Forecaster
Args:
data (pd.DataFrame): Historical price data
forecast_days (int): Number of days to forecast
lookback_window (int): Number of past days to use for prediction
"""
# Make a copy and ensure column names are standardized
self.data = data.copy()
# Standardize column names (convert to lowercase first, then capitalize)
self.data.columns = [col.lower() for col in self.data.columns]
# Map common column names to standard format
column_mapping = {
'open': 'Open',
'high': 'High',
'low': 'Low',
'close': 'Close',
'volume': 'Volume',
'adj close': 'Adj Close',
'adj_close': 'Adj Close'
}
# Rename columns
self.data.rename(columns={k: v for k, v in column_mapping.items() if k in self.data.columns}, inplace=True)
# Ensure data is sorted by date
if 'date' in self.data.columns:
self.data.rename(columns={'date': 'Date'}, inplace=True)
self.data = self.data.sort_values('Date')
self.forecast_days = forecast_days
self.lookback_window = lookback_window
# Check if Close column exists
if 'Close' not in self.data.columns:
raise ValueError(f"Required column 'Close' not found. Available columns: {list(self.data.columns)}")
self.last_price = self.data['Close'].iloc[-1]
self.scaler = MinMaxScaler(feature_range=(0, 1))
# Features to use for prediction
self.features = ['Open', 'High', 'Low', 'Close', 'Volume']
self.available_features = [f for f in self.features if f in self.data.columns]
# Add technical indicators
self.add_technical_indicators()
def add_technical_indicators(self):
"""Add technical indicators to the dataset"""
# Moving averages
self.data['MA5'] = self.data['Close'].rolling(window=5).mean()
self.data['MA20'] = self.data['Close'].rolling(window=20).mean()
# Relative Strength Index (RSI)
delta = self.data['Close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = -delta.where(delta < 0, 0).rolling(window=14).mean()
rs = gain / loss
self.data['RSI'] = 100 - (100 / (1 + rs))
# MACD
self.data['EMA12'] = self.data['Close'].ewm(span=12, adjust=False).mean()
self.data['EMA26'] = self.data['Close'].ewm(span=26, adjust=False).mean()
self.data['MACD'] = self.data['EMA12'] - self.data['EMA26']
self.data['Signal'] = self.data['MACD'].ewm(span=9, adjust=False).mean()
# Bollinger Bands
self.data['BB_Middle'] = self.data['Close'].rolling(window=20).mean()
self.data['BB_Std'] = self.data['Close'].rolling(window=20).std()
self.data['BB_Upper'] = self.data['BB_Middle'] + 2 * self.data['BB_Std']
self.data['BB_Lower'] = self.data['BB_Middle'] - 2 * self.data['BB_Std']
# Drop NaN values
self.data = self.data.dropna()
# Add additional features to available features
additional_features = ['MA5', 'MA20', 'RSI', 'MACD', 'Signal']
self.available_features.extend(additional_features)
def prepare_data(self):
"""Prepare data for LSTM model"""
# Select features
dataset = self.data[self.available_features].values
# Scale the data
scaled_data = self.scaler.fit_transform(dataset)
# Create sequences for LSTM
X, y = [], []
for i in range(self.lookback_window, len(scaled_data)):
X.append(scaled_data[i-self.lookback_window:i])
y.append(scaled_data[i, self.available_features.index('Close')])
X, y = np.array(X), np.array(y)
# Split data into train and test sets (80% train, 20% test)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
return X_train, y_train, X_test, y_test, scaled_data
def build_model(self, X_train):
"""Build and compile LSTM model"""
model = Sequential()
# Input layer
model.add(LSTM(units=50, return_sequences=True,
input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))
# Hidden layers
model.add(LSTM(units=50, return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(units=50))
model.add(Dropout(0.2))
# Output layer
model.add(Dense(units=1))
# Compile model
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def train_model(self, verbose=1):
"""Train the LSTM model and make predictions"""
try:
# Prepare data
X_train, y_train, X_test, y_test, scaled_data = self.prepare_data()
# Build model
model = self.build_model(X_train)
# Train model with a callback to prevent training from hanging
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
history = model.fit(
X_train, y_train,
epochs=50,
batch_size=32,
validation_data=(X_test, y_test),
verbose=verbose,
callbacks=[early_stopping]
)
# Make predictions on test data
y_pred = model.predict(X_test)
# Prepare for inverse scaling
pred_copy = np.repeat(y_pred, len(self.available_features)).reshape(-1, len(self.available_features))
# Only the Close price column needs to be replaced
close_idx = self.available_features.index('Close')
# Create a dummy array with the same shape as scaled_data[-len(y_test):]
dummy = scaled_data[-len(y_test):].copy()
# Replace only the Close column with predictions
dummy[:, close_idx] = y_pred.flatten()
# Inverse transform
y_pred_actual = self.scaler.inverse_transform(dummy)[:, close_idx]
y_test_actual = self.scaler.inverse_transform(scaled_data[-len(y_test):])[:, close_idx]
# Calculate metrics
mse = mean_squared_error(y_test_actual, y_pred_actual)
mae = mean_absolute_error(y_test_actual, y_pred_actual)
rmse = np.sqrt(mse)
r2 = r2_score(y_test_actual, y_pred_actual)
metrics = {
'mse': mse,
'mae': mae,
'rmse': rmse,
'r2': r2
}
# Forecast future prices
last_sequence = scaled_data[-self.lookback_window:]
future_predictions = []
# Create a copy for forecasting
current_sequence = last_sequence.copy()
def get_trading_days(start_date, num_days):
"""Get future trading days using NYSE calendar"""
nyse = mcal.get_calendar('NYSE')
end_date = start_date + timedelta(days=num_days * 2) # Look ahead enough to find required trading days
schedule = nyse.schedule(start_date=start_date, end_date=end_date)
trading_days = mcal.date_range(schedule, frequency='1D')
return trading_days[:num_days]
# Get the last date from the data
last_date = self.data.index[-1] if isinstance(self.data.index, pd.DatetimeIndex) else pd.to_datetime(self.data['Date'].iloc[-1])
# Generate future trading days
forecast_dates = get_trading_days(last_date + timedelta(days=1), self.forecast_days)
for _ in range(self.forecast_days):
# Reshape for prediction
current_batch = current_sequence.reshape(1, self.lookback_window, len(self.available_features))
# Predict next value
next_pred = model.predict(current_batch)[0][0]
# Create a dummy row with the last known values
dummy_row = current_sequence[-1].copy()
# Update the Close price with our prediction
dummy_row[close_idx] = next_pred
# Add to predictions
future_predictions.append(dummy_row)
# Update sequence by removing first row and adding the new prediction
current_sequence = np.vstack([current_sequence[1:], dummy_row])
# Convert predictions to actual values
future_predictions = np.array(future_predictions)
future_prices = self.scaler.inverse_transform(future_predictions)[:, close_idx]
# Create forecast DataFrame
forecast_df = pd.DataFrame({
'Date': forecast_dates,
'Predicted_Close': future_prices
})
# Create historical predictions for plotting
historical_predictions = model.predict(np.array(X_test))
# Prepare for inverse scaling (same as before)
hist_dummy = scaled_data[-len(y_test):].copy()
hist_dummy[:, close_idx] = historical_predictions.flatten()
historical_actual = self.scaler.inverse_transform(hist_dummy)[:, close_idx]
# Get dates for historical predictions
if isinstance(self.data.index, pd.DatetimeIndex):
historical_dates = self.data.index[-len(y_test):]
else:
historical_dates = pd.to_datetime(self.data['Date'].iloc[-len(y_test):])
# Create historical predictions DataFrame
historical_df = pd.DataFrame({
'Date': historical_dates,
'Actual_Close': y_test_actual,
'Predicted_Close': historical_actual
})
return model, forecast_df, historical_df, metrics, history
except Exception as e:
raise Exception(f"Error during model training: {str(e)}")
def plot_forecast(self, forecast_df, historical_df, metrics):
"""Create an interactive plot of forecast results"""
fig = make_subplots(
rows=2, cols=1,
subplot_titles=('Price Forecast', 'Prediction Error'),
vertical_spacing=0.15,
row_heights=[0.7, 0.3]
)
# Plot historical actual prices
fig.add_trace(
go.Scatter(
x=historical_df['Date'],
y=historical_df['Actual_Close'],
name='Actual Price',
line=dict(color='blue', width=2)
),
row=1, col=1
)
# Plot historical predicted prices
fig.add_trace(
go.Scatter(
x=historical_df['Date'],
y=historical_df['Predicted_Close'],
name='Model Fit',
line=dict(color='green', width=2)
),
row=1, col=1
)
# Plot future predictions
fig.add_trace(
go.Scatter(
x=forecast_df['Date'],
y=forecast_df['Predicted_Close'],
name='Price Forecast',
line=dict(color='red', width=2)
),
row=1, col=1
)
# Add confidence intervals (simple approach: ±RMSE)
rmse = metrics['rmse']
fig.add_trace(
go.Scatter(
x=forecast_df['Date'],
y=forecast_df['Predicted_Close'] + rmse,
name='Upper Bound',
line=dict(color='rgba(255,0,0,0.3)', dash='dash'),
showlegend=False
),
row=1, col=1
)
fig.add_trace(
go.Scatter(
x=forecast_df['Date'],
y=forecast_df['Predicted_Close'] - rmse,
name='Lower Bound',
line=dict(color='rgba(255,0,0,0.3)', dash='dash'),
fill='tonexty',
showlegend=False
),
row=1, col=1
)
# Plot prediction error
error = historical_df['Actual_Close'] - historical_df['Predicted_Close']
fig.add_trace(
go.Bar(
x=historical_df['Date'],
y=error,
name='Prediction Error',
marker_color='orange'
),
row=2, col=1
)
# Add zero line for error
fig.add_trace(
go.Scatter(
x=[historical_df['Date'].min(), historical_df['Date'].max()],
y=[0, 0],
mode='lines',
line=dict(color='white', dash='dash'),
showlegend=False
),
row=2, col=1
)
# Update layout
fig.update_layout(
title='AI Stock Price Forecast',
showlegend=True,
height=800,
template='plotly_dark',
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)'
)
# Update axes
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
return fig
def plot_training_history(self, history):
"""Plot training and validation loss"""
plt.figure(figsize=(10, 6))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')
plt.grid(True)
# Convert plot to image
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
buf.seek(0)
img_str = base64.b64encode(buf.read()).decode('utf-8')
plt.close()
return img_str
def ai_forecast_page():
st.title("AI Stock Price Forecasting")
# Input parameters
col1, col2 = st.columns(2)
with col1:
ticker = st.text_input("Enter Ticker Symbol", value="AAPL").upper()
start_date = st.date_input(
"Start Date (for historical data)",
value=datetime.now() - timedelta(days=365)
)
end_date = st.date_input("End Date", value=datetime.now())
with col2:
forecast_days = st.number_input(
"Forecast Horizon (Days)",
min_value=5,
max_value=90,
value=30
)
lookback_window = st.number_input(
"Lookback Window (Days)",
min_value=10,
max_value=120,
value=60,
help="Number of past days used to predict the next day"
)
# Advanced options
with st.expander("Advanced Options"):
model_type = st.selectbox(
"Model Type",
options=["LSTM", "GRU", "Bidirectional LSTM"],
index=0,
help="Type of neural network to use"
)
training_epochs = st.slider(
"Training Epochs",
min_value=10,
max_value=200,
value=50,
help="Number of training iterations"
)
batch_size = st.selectbox(
"Batch Size",
options=[8, 16, 32, 64, 128],
index=2,
help="Number of samples processed before model update"
)
if st.button("Generate Forecast"):
# Create a progress bar
progress_bar = st.progress(0)
status_text = st.empty()
with st.spinner('Training AI model and generating forecast...'):
try:
# Update status
status_text.text("Loading historical data...")
progress_bar.progress(10)
# Get historical data
df = get_stock_data(
ticker,
datetime.combine(start_date, datetime.min.time()),
datetime.combine(end_date, datetime.min.time()),
'1d' # Daily data
)
if df.empty:
st.error("No data available for the selected period")
return
# Filter out non-trading days (weekends and holidays)
nyse = mcal.get_calendar('NYSE')
schedule = nyse.schedule(start_date=start_date, end_date=end_date)
trading_days = mcal.date_range(schedule, frequency='1D')
trading_days_dates = [d.date() for d in trading_days]
# Ensure the dataframe has a datetime index
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
elif 'Date' in df.columns:
df['Date'] = pd.to_datetime(df['Date'])
df = df.set_index('Date')
# Keep only trading days
df = df[df.index.map(lambda x: x.date() in trading_days_dates)]
# Update status
status_text.text("Preparing data and adding technical indicators...")
progress_bar.progress(30)
# Add debug information
with st.expander("Debug Information"):
st.write("DataFrame columns:", df.columns.tolist())
st.write("DataFrame head:", df.head())
st.write("DataFrame shape:", df.shape)
# Initialize forecaster
forecaster = AIForecaster(df, forecast_days, lookback_window)
# Update status
status_text.text("Training neural network model (this may take a few minutes)...")
progress_bar.progress(50)
# Train model and get predictions
model, forecast_df, historical_df, metrics, history = forecaster.train_model()
# Update status
status_text.text("Generating forecast visualization...")
progress_bar.progress(80)
# Display results
col1, col2 = st.columns([3, 1])
with col1:
# Plot forecast
fig = forecaster.plot_forecast(forecast_df, historical_df, metrics)
st.plotly_chart(fig, use_container_width=True)
with col2:
st.subheader("Forecast Metrics")
# Price metrics
st.write("##### Current & Forecast")
# Handle different column name formats
close_col = 'close' if 'close' in df.columns else 'Close'
current_price = df[close_col].iloc[-1]
forecast_price = forecast_df['Predicted_Close'].iloc[-1]
price_change = ((forecast_price - current_price) / current_price) * 100
st.metric("Current Price", f"${current_price:.2f}")
st.metric(
f"Forecast ({forecast_days} days)",
f"${forecast_price:.2f}",
f"{price_change:.1f}%",
delta_color="normal" if price_change >= 0 else "inverse"
)
# Model accuracy metrics
st.write("##### Model Accuracy")
st.metric("RMSE", f"${metrics['rmse']:.2f}")
st.metric("MAE", f"${metrics['mae']:.2f}")
st.metric("R² Score", f"{metrics['r2']:.3f}")
# Show training history
st.subheader("Model Training History")
history_img = forecaster.plot_training_history(history)
st.image(f"data:image/png;base64,{history_img}", use_container_width=True)
# Show forecast table
st.subheader("Detailed Forecast")
forecast_df['Date'] = forecast_df['Date'].dt.strftime('%Y-%m-%d')
forecast_df['Predicted_Close'] = forecast_df['Predicted_Close'].round(2)
forecast_df.columns = ['Date', 'Predicted Price']
# Calculate daily changes
forecast_df['Daily Change %'] = [0] + [
((forecast_df['Predicted Price'].iloc[i] - forecast_df['Predicted Price'].iloc[i-1]) /
forecast_df['Predicted Price'].iloc[i-1] * 100).round(2)
for i in range(1, len(forecast_df))
]
# Calculate cumulative change from current price
forecast_df['Cumulative Change %'] = [
((price - current_price) / current_price * 100).round(2)
for price in forecast_df['Predicted Price']
]
st.dataframe(forecast_df, use_container_width=True)
# Add download buttons
col1, col2 = st.columns(2)
with col1:
csv = forecast_df.to_csv(index=False)
st.download_button(
label="Download Forecast Data",
data=csv,
file_name=f'ai_forecast_{ticker}_{datetime.now().strftime("%Y%m%d")}.csv',
mime='text/csv'
)
with col2:
# Combine historical and forecast for full dataset
historical_df['Date'] = historical_df['Date'].dt.strftime('%Y-%m-%d')
full_data = pd.concat([
historical_df[['Date', 'Actual_Close', 'Predicted_Close']],
forecast_df[['Date', 'Predicted Price']].rename(
columns={'Predicted Price': 'Predicted_Close'}
)
])
full_data['Actual_Close'] = full_data['Actual_Close'].round(2)
full_data['Predicted_Close'] = full_data['Predicted_Close'].round(2)
csv_full = full_data.to_csv(index=False)
st.download_button(
label="Download Complete Dataset",
data=csv_full,
file_name=f'ai_forecast_full_{ticker}_{datetime.now().strftime("%Y%m%d")}.csv',
mime='text/csv'
)
# After displaying all the results, add a button to save the model
if 'model' in locals():
try:
# Create a temporary file to save the model
with tempfile.NamedTemporaryFile(suffix='.h5', delete=False) as tmp:
temp_path = tmp.name
# Save the model to the temporary file
model.save(temp_path)
# Read the saved model file
with open(temp_path, 'rb') as f:
model_data = f.read()
# Clean up the temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
# Provide download button
st.download_button(
label="Download Trained Model",
data=model_data,
file_name=f'ai_model_{ticker}_{datetime.now().strftime("%Y%m%d")}.h5',
mime='application/octet-stream'
)
except Exception as e:
st.warning(f"Could not save model: {str(e)}")
st.info("You can still use the forecast results even though the model couldn't be saved.")
# Complete progress
progress_bar.progress(100)
status_text.text("Forecast complete!")
except Exception as e:
st.error(f"Error during forecast: {str(e)}")
import traceback
st.code(traceback.format_exc())
if __name__ == "__main__":
ai_forecast_page()

View File

@ -2,6 +2,7 @@ import streamlit as st
import plotly.graph_objects as go
from datetime import datetime, timedelta
import pytz
from db.db_connection import create_client
from trading.journal import (
create_trades_table, get_open_trades, get_trade_history,
add_trade, update_trade, delete_trade, TradeEntry,
@ -9,6 +10,78 @@ from trading.journal import (
get_position_summary, get_latest_portfolio_value, update_portfolio_value
)
def fetch_current_prices(tickers):
"""Fetch current prices directly from ClickHouse."""
try:
with create_client() as client:
# Get the current time
now = datetime.now()
print(f"Current datetime: {now}")
# First check if there's any data for today
today_check_query = """
SELECT count() as count
FROM stock_db.stock_prices
WHERE window_start >= toUnixTimestamp(today()) * 1000000000
"""
today_result = client.query(today_check_query)
today_count = today_result.result_rows[0][0]
print(f"Number of records for today: {today_count}")
# If we have data for today, use it
if today_count > 0:
print("Using today's data")
query = f"""
SELECT ticker,
argMax(close, window_start) as close,
max(window_start) as last_update
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
AND window_start >= toUnixTimestamp(today()) * 1000000000
GROUP BY ticker
"""
else:
# Otherwise get the most recent data
print("No data for today, using most recent data")
query = f"""
SELECT ticker,
argMax(close, window_start) as close,
max(window_start) as last_update
FROM stock_db.stock_prices
WHERE ticker IN ({','.join([f"'{t}'" for t in tickers])})
GROUP BY ticker
"""
print(f"Executing query: {query}")
result = client.query(query)
# Process results with timestamp information
prices = {}
timestamps = {}
for row in result.result_rows:
ticker = row[0]
price = row[1]
timestamp = row[2]
prices[ticker] = price
timestamps[ticker] = timestamp
# Convert timestamps to readable format
readable_times = {}
for ticker, ts in timestamps.items():
try:
dt = datetime.fromtimestamp(ts / 1e9) # Convert from nanoseconds
readable_times[ticker] = dt.strftime('%Y-%m-%d %H:%M:%S')
except Exception as e:
readable_times[ticker] = f"Error: {str(e)}"
print(f"Retrieved prices: {prices}")
print(f"Last updated: {readable_times}")
return prices, readable_times
except Exception as e:
print(f"Error fetching current prices: {str(e)}")
return {}, {}
def calculate_weekly_metrics(trades) -> dict:
"""Calculate weekly performance metrics"""
pacific_tz = pytz.timezone('US/Pacific')
@ -203,10 +276,448 @@ def plot_trade_history(trades):
return fig
def calculate_period_metrics(trades, period_days=7) -> dict:
"""Calculate performance metrics for a specific time period"""
pacific_tz = pytz.timezone('US/Pacific')
now = datetime.now(pacific_tz)
# Create period_start without timezone first, then localize it
period_start = (now - timedelta(days=period_days)).replace(
hour=0, minute=0, second=0, microsecond=0
).astimezone(pacific_tz)
period_pl = 0
period_trades = []
total_investment = 0
# Get latest portfolio value
latest_portfolio = get_latest_portfolio_value()
portfolio_value = latest_portfolio['total_value'] if latest_portfolio else 0
for trade in trades:
# Get the trade date and ensure it's timezone aware
trade_date = trade.get('exit_date', trade.get('entry_date'))
if trade_date:
# Convert to Pacific timezone if needed
if trade_date.tzinfo is None:
trade_date = pacific_tz.localize(trade_date)
else:
trade_date = trade_date.astimezone(pacific_tz)
# For sells/exits that happened in this period
if (trade.get('direction') == 'sell' or trade.get('exit_price')) and trade_date and trade_date >= period_start:
price = float(trade.get('exit_price') or trade.get('entry_price'))
shares = float(trade['shares'])
position_id = trade['position_id']
# Find matching buy order
buy_trades = [t for t in trades
if t['position_id'] == position_id and
(t.get('direction', '').lower() != 'sell' and not t.get('exit_price'))]
if buy_trades:
# Calculate average entry price
total_cost = sum(float(t['entry_price']) * float(t['shares']) for t in buy_trades)
total_shares = sum(float(t['shares']) for t in buy_trades)
avg_entry_price = total_cost / total_shares if total_shares > 0 else 0
# Calculate trade P/L
trade_pl = (price - avg_entry_price) * shares
period_pl += trade_pl
period_trades.append(trade)
# Add to total investment for percentage calculation
investment = avg_entry_price * shares
total_investment += investment
# Calculate percentage gain/loss against total investment
investment_percentage = (period_pl / total_investment * 100) if total_investment > 0 else 0
# Calculate percentage gain/loss against portfolio value
portfolio_percentage = (period_pl / portfolio_value * 100) if portfolio_value > 0 else 0
return {
'period_pl': period_pl,
'trade_count': len(period_trades),
'trades': period_trades,
'investment_percentage': investment_percentage,
'portfolio_percentage': portfolio_percentage,
'total_investment': total_investment,
'portfolio_value': portfolio_value
}
def plot_performance_by_period(trades, period_type="weekly"):
"""Create a performance chart showing P/L by period"""
if not trades:
return None
# Define period settings
period_settings = {
"weekly": {"days": 7, "format": "%Y-W%U", "label": "Week"},
"monthly": {"days": 30, "format": "%Y-%m", "label": "Month"},
"quarterly": {"days": 90, "format": "%Y-Q%q", "label": "Quarter"},
"yearly": {"days": 365, "format": "%Y", "label": "Year"}
}
setting = period_settings.get(period_type, period_settings["weekly"])
# Sort trades by date
trades.sort(key=lambda x: x.get('exit_date') or x.get('entry_date'))
# Group trades by period
periods = {}
for trade in trades:
if trade.get('direction') == 'sell' or trade.get('exit_price'):
date = trade.get('exit_date') or trade.get('entry_date')
if date:
# Format the date according to period type
if period_type == "quarterly":
# Custom handling for quarters
quarter = (date.month - 1) // 3 + 1
period_key = f"{date.year}-Q{quarter}"
else:
period_key = date.strftime(setting["format"])
if period_key not in periods:
periods[period_key] = []
periods[period_key].append(trade)
# Calculate P/L for each period
period_data = []
for period_key, period_trades in periods.items():
pl = 0
total_investment = 0
for trade in period_trades:
price = float(trade.get('exit_price') or trade.get('entry_price'))
shares = float(trade['shares'])
position_id = trade['position_id']
# Find matching buy order
buy_trades = [t for t in trades
if t['position_id'] == position_id and
(t.get('direction', '').lower() != 'sell' and not t.get('exit_price'))]
if buy_trades:
# Calculate average entry price
total_cost = sum(float(t['entry_price']) * float(t['shares']) for t in buy_trades)
total_shares = sum(float(t['shares']) for t in buy_trades)
avg_entry_price = total_cost / total_shares if total_shares > 0 else 0
# Calculate trade P/L
trade_pl = (price - avg_entry_price) * shares
pl += trade_pl
# Add to total investment for percentage calculation
investment = avg_entry_price * shares
total_investment += investment
# Calculate percentage gain/loss
percentage = (pl / total_investment * 100) if total_investment > 0 else 0
period_data.append({
"period": period_key,
"pl": pl,
"percentage": percentage
})
# Sort by period
period_data.sort(key=lambda x: x["period"])
# Create figure with two y-axes
fig = go.Figure()
# Add bars for each period
periods = [d["period"] for d in period_data]
pls = [d["pl"] for d in period_data]
percentages = [d["percentage"] for d in period_data]
colors = ["green" if pl >= 0 else "red" for pl in pls]
# Add P/L bars
fig.add_trace(
go.Bar(
x=periods,
y=pls,
marker_color=colors,
text=[f"${pl:.2f}" for pl in pls],
textposition="auto",
name="P/L ($)",
hovertemplate=f"{setting['label']}: %{{x}}<br>P/L: $%{{y:.2f}}<extra></extra>"
)
)
# Add percentage line
fig.add_trace(
go.Scatter(
x=periods,
y=percentages,
mode='lines+markers',
name="Return (%)",
line=dict(color='blue'),
yaxis="y2",
hovertemplate=f"{setting['label']}: %{{x}}<br>Return: %{{y:.2f}}%<extra></extra>"
)
)
# Set up the layout with two y-axes
fig.update_layout(
title=f"P/L by {setting['label']}",
xaxis_title=setting['label'],
yaxis_title="P/L ($)",
yaxis2=dict(
title="Return (%)",
overlaying="y",
side="right",
showgrid=False
),
hovermode="x unified",
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
)
)
return fig
def update_cash_balance_for_trade(trade, is_new=True):
"""
Update cash balance when a trade is added or updated.
Args:
trade: The trade object (can be dict or TradeEntry)
is_new: Whether this is a new trade (True) or an update (False)
"""
try:
# Get current portfolio value
latest_portfolio = get_latest_portfolio_value()
if not latest_portfolio:
st.error("Could not retrieve current portfolio value")
return
current_cash = latest_portfolio['cash_balance']
total_value = latest_portfolio['total_value']
# Handle both dict-like objects and TradeEntry objects
if hasattr(trade, '__dict__'):
# It's a TradeEntry object
shares = float(trade.shares) if trade.shares is not None else 0
# Handle None entry_price
if trade.entry_price is None:
print(f"Warning: Entry price is None for trade {trade.__dict__}")
return # Skip cash update if entry price is None
price = float(trade.entry_price)
direction = trade.direction.lower() if hasattr(trade, 'direction') and trade.direction else 'buy'
else:
# It's a dictionary
shares = float(trade['shares']) if trade.get('shares') is not None else 0
# Handle None entry_price
if trade.get('entry_price') is None:
print(f"Warning: Entry price is None for trade {trade}")
return # Skip cash update if entry price is None
price = float(trade['entry_price'])
direction = trade.get('direction', 'buy').lower()
# Skip if shares or price is zero
if shares <= 0 or price <= 0:
print(f"Warning: Invalid shares ({shares}) or price ({price}). Skipping cash update.")
return
trade_value = shares * price
# For buy orders, subtract from cash
if direction == 'buy':
new_cash = current_cash - trade_value
# For sell orders, add to cash
elif direction == 'sell':
new_cash = current_cash + trade_value
else:
# No change for other operations
return
# Calculate new total value (cash change doesn't affect total)
new_total = total_value
# Update portfolio value
update_portfolio_value(new_total, new_cash)
print(f"Cash balance updated: {current_cash:.2f} -> {new_cash:.2f} ({direction} {shares} shares at ${price:.2f})")
except Exception as e:
print(f"Error updating cash balance: {str(e)}")
# Don't show error to user, just log it
# st.error(f"Error updating cash balance: {str(e)}")
def calculate_advanced_metrics(trades):
"""Calculate advanced trading metrics like holding time, biggest winner/loser, etc."""
if not trades:
return {}
# Initialize metrics
total_trades = 0
winning_trades = 0
losing_trades = 0
total_profit = 0
total_loss = 0
biggest_winner = {"profit": 0, "ticker": "", "position_id": ""}
biggest_loser = {"loss": 0, "ticker": "", "position_id": ""}
total_holding_time = timedelta(0)
# Process closed trades
closed_trades = []
for trade in trades:
# Only consider trades with exit prices (closed trades)
if trade.get('exit_price') or trade.get('direction', '').lower() == 'sell':
position_id = trade['position_id']
ticker = trade['ticker']
shares = float(trade['shares'])
exit_price = float(trade.get('exit_price') or trade['entry_price'])
exit_date = trade.get('exit_date') or trade['entry_date']
# Find matching buy order
buy_trades = [t for t in trades
if t['position_id'] == position_id and
(t.get('direction', '').lower() != 'sell' and not t.get('exit_price'))]
if buy_trades:
# Calculate average entry price
total_cost = sum(float(t['entry_price']) * float(t['shares']) for t in buy_trades)
total_shares = sum(float(t['shares']) for t in buy_trades)
avg_entry_price = total_cost / total_shares if total_shares > 0 else 0
# Get earliest entry date
entry_dates = [t['entry_date'] for t in buy_trades]
earliest_entry = min(entry_dates) if entry_dates else None
if earliest_entry and exit_date:
# Calculate holding time
holding_time = exit_date - earliest_entry
# Calculate P/L
trade_pl = (exit_price - avg_entry_price) * shares
# Add to closed trades
closed_trades.append({
"ticker": ticker,
"position_id": position_id,
"entry_date": earliest_entry,
"exit_date": exit_date,
"holding_time": holding_time,
"pl": trade_pl,
"shares": shares,
"entry_price": avg_entry_price,
"exit_price": exit_price
})
# Update metrics
total_trades += 1
total_holding_time += holding_time
if trade_pl > 0:
winning_trades += 1
total_profit += trade_pl
if trade_pl > biggest_winner["profit"]:
biggest_winner = {
"profit": trade_pl,
"ticker": ticker,
"position_id": position_id,
"percent": (exit_price - avg_entry_price) / avg_entry_price * 100,
"entry_price": avg_entry_price,
"exit_price": exit_price,
"shares": shares,
"holding_time": holding_time
}
else:
losing_trades += 1
total_loss += abs(trade_pl)
if abs(trade_pl) > biggest_loser["loss"]:
biggest_loser = {
"loss": abs(trade_pl),
"ticker": ticker,
"position_id": position_id,
"percent": (avg_entry_price - exit_price) / avg_entry_price * 100,
"entry_price": avg_entry_price,
"exit_price": exit_price,
"shares": shares,
"holding_time": holding_time
}
# Calculate averages
win_rate = (winning_trades / total_trades * 100) if total_trades > 0 else 0
avg_profit = (total_profit / winning_trades) if winning_trades > 0 else 0
avg_loss = (total_loss / losing_trades) if losing_trades > 0 else 0
profit_factor = (total_profit / total_loss) if total_loss > 0 else float('inf')
avg_holding_time = (total_holding_time / total_trades) if total_trades > 0 else timedelta(0)
# Format holding time for display
avg_holding_days = avg_holding_time.total_seconds() / (60 * 60 * 24)
avg_holding_hours = avg_holding_time.total_seconds() / (60 * 60)
if avg_holding_days >= 1:
avg_holding_str = f"{avg_holding_days:.1f} days"
else:
avg_holding_str = f"{avg_holding_hours:.1f} hours"
return {
"total_trades": total_trades,
"winning_trades": winning_trades,
"losing_trades": losing_trades,
"win_rate": win_rate,
"avg_profit": avg_profit,
"avg_loss": avg_loss,
"profit_factor": profit_factor,
"biggest_winner": biggest_winner,
"biggest_loser": biggest_loser,
"avg_holding_time": avg_holding_time,
"avg_holding_str": avg_holding_str,
"closed_trades": closed_trades
}
def trading_journal_page():
st.header("Trading Journal")
print("\n=== Starting Trading Journal Page ===") # Debug
# Direct database test at the beginning
print("\n=== DIRECT DATABASE TEST ===")
try:
with create_client() as client:
now = datetime.now()
print(f"Current datetime: {now}")
# Test query with LIMIT 25
test_query = """
SELECT count() as count
FROM stock_db.stock_prices
WHERE window_start >= toUnixTimestamp(today()) * 1000000000
LIMIT 25
"""
print(f"Executing test query: {test_query}")
result = client.query(test_query)
count = result.result_rows[0][0]
print(f"Number of records for today: {count}")
# Get sample data (limited to 5 records)
if count > 0:
sample_query = """
SELECT ticker, window_start, close
FROM stock_db.stock_prices
WHERE window_start >= toUnixTimestamp(today()) * 1000000000
LIMIT 5
"""
print(f"Executing sample query: {sample_query}")
sample_result = client.query(sample_query)
print("Sample data:")
for row in sample_result.result_rows:
print(f" {row}")
except Exception as e:
print(f"Error in direct database test: {str(e)}")
print("=== END DIRECT DATABASE TEST ===\n")
# Tabs for different journal functions
tab1, tab2, tab3, tab4 = st.tabs(["Open Positions", "Add Trade", "Update Trade", "Trade History"])
@ -215,81 +726,91 @@ def trading_journal_page():
print("\n--- Fetching Open Positions ---") # Debug
open_trades = get_open_trades()
print(f"Retrieved {len(open_trades) if open_trades else 0} open trades") # Debug
print(f"Open trades {open_trades}") # Debug
open_summary = get_open_trades_summary()
print(f"Retrieved {len(open_summary) if open_summary else 0} position summaries") # Debug
print(f"Open summary {open_summary}") # Debug
# Get latest portfolio value
latest_portfolio = get_latest_portfolio_value()
cash_balance = latest_portfolio['cash_balance'] if latest_portfolio else 0
# Get trade history for weekly metrics
trade_history = get_trade_history()
weekly_metrics = calculate_weekly_metrics(trade_history) if trade_history else {'weekly_pl': 0, 'weekly_trade_count': 0}
# Portfolio overview section - always show this
st.subheader("Portfolio Overview")
col1, col2 = st.columns(2)
# Calculate total invested value and paper P/L
total_invested_value = 0
total_paper_pl = 0
# Get current prices if there are open positions
if open_summary:
# Get current prices and latest portfolio value
unique_tickers = list(set(summary['ticker'] for summary in open_summary))
print(f"Fetching prices for tickers: {unique_tickers}") # Debug
current_prices = get_current_prices(unique_tickers)
print(f"Retrieved current prices: {current_prices}") # Debug
latest_portfolio = get_latest_portfolio_value()
total_portfolio_value = latest_portfolio['total_value'] if latest_portfolio else 0
cash_balance = latest_portfolio['cash_balance'] if latest_portfolio else 0
print("\n=== DEBUGGING CURRENT PRICES ===")
print(f"About to fetch prices for tickers: {unique_tickers}")
# Get trade history for weekly metrics
trade_history = get_trade_history()
weekly_metrics = calculate_weekly_metrics(trade_history) if trade_history else {'weekly_pl': 0, 'weekly_trade_count': 0}
# Portfolio overview section
st.subheader("Portfolio Overview")
col1, col2 = st.columns(2)
# Get cash balance
latest_portfolio = get_latest_portfolio_value()
cash_balance = latest_portfolio['cash_balance'] if latest_portfolio else 0
# Calculate total invested value and paper P/L
total_invested_value = 0
total_paper_pl = 0
if open_summary:
for summary in open_summary:
ticker = summary['ticker']
current_price = current_prices.get(ticker, 0)
shares = float(summary['total_shares'])
avg_entry = float(summary['avg_entry_price'])
# Calculate position value using current market price
position_value = current_price * shares if current_price > 0 else avg_entry * shares
total_invested_value += position_value
total_paper_pl += (current_price - avg_entry) * shares if current_price > 0 else 0
# Calculate total portfolio value (cash + invested value)
total_portfolio_value = cash_balance + total_invested_value
with col1:
# Add input for cash balance
new_cash_balance = st.number_input(
"Cash Balance ($)",
value=cash_balance,
step=100.0,
format="%.2f"
)
try:
current_prices, last_updated = fetch_current_prices(unique_tickers)
print(f"Successfully retrieved prices: {current_prices}")
print(f"Last updated: {last_updated}")
if new_cash_balance != cash_balance:
# Update cash balance if changed
update_portfolio_value(total_portfolio_value, new_cash_balance)
st.rerun()
st.metric("Total Portfolio Value",
f"${total_portfolio_value:,.2f}")
st.write(f"Cash: ${cash_balance:,.2f}")
st.write(f"Positions: ${total_invested_value:,.2f}")
with col2:
# Weekly metrics
weekly_pl_pct = (weekly_metrics['weekly_pl'] / total_portfolio_value * 100) if total_portfolio_value > 0 else 0
st.metric("Week P/L",
f"${weekly_metrics['weekly_pl']:,.2f}",
f"{weekly_pl_pct:.2f}% | {weekly_metrics['weekly_trade_count']} trades")
# Add a note in the UI about when prices were last updated
if current_prices:
st.info(f"Prices last updated: {', '.join([f'{ticker}: {time}' for ticker, time in last_updated.items()])}")
except Exception as e:
print(f"Error fetching current prices: {str(e)}")
current_prices = {}
total_paper_pl = 0
invested_value = 0
print("=== END DEBUGGING CURRENT PRICES ===\n")
# Calculate position values
for summary in open_summary:
ticker = summary['ticker']
current_price = current_prices.get(ticker, 0)
print(f"Processing summary for {ticker}: Current Price = {current_price}")
shares = float(summary['total_shares'])
avg_entry = float(summary['avg_entry_price'])
# Calculate position value using current market price
position_value = current_price * shares if current_price > 0 else avg_entry * shares
total_invested_value += position_value
total_paper_pl += (current_price - avg_entry) * shares if current_price > 0 else 0
# Calculate total portfolio value (cash + invested value)
total_portfolio_value = cash_balance + total_invested_value
with col1:
# Add input for cash balance
new_cash_balance = st.number_input(
"Cash Balance ($)",
value=cash_balance,
step=100.0,
format="%.2f"
)
if new_cash_balance != cash_balance:
# Update cash balance if changed
update_portfolio_value(total_portfolio_value, new_cash_balance)
st.rerun()
st.metric("Total Portfolio Value",
f"${total_portfolio_value:,.2f}")
st.write(f"Cash: ${cash_balance:,.2f}")
st.write(f"Positions: ${total_invested_value:,.2f}")
with col2:
# Weekly metrics
weekly_pl_pct = (weekly_metrics['weekly_pl'] / total_portfolio_value * 100) if total_portfolio_value > 0 else 0
st.metric("Week P/L",
f"${weekly_metrics['weekly_pl']:,.2f}",
f"{weekly_pl_pct:.2f}% | {weekly_metrics['weekly_trade_count']} trades")
# Display open positions if any
if open_summary:
st.subheader("Open Positions")
for summary in open_summary:
print(f"\nProcessing summary for {summary['ticker']}") # Debug
with st.expander(f"{summary['ticker']} Summary"):
@ -326,7 +847,6 @@ def trading_journal_page():
f"{pl_percentage:.2f}%")
total_paper_pl += paper_pl
invested_value += current_value
with col3:
if current_price:
@ -334,24 +854,8 @@ def trading_journal_page():
f"{metrics['allocation']:.1f}%")
st.metric("P/L Impact on Portfolio",
f"{metrics['pl_impact']:.2f}%")
# Update portfolio summary section
if invested_value > 0:
st.markdown("---")
st.subheader("Portfolio Summary")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Invested Value",
f"${invested_value:.2f}",
f"{(invested_value/total_portfolio_value)*100:.1f}% of Portfolio")
with col2:
st.metric("Total P/L",
f"${total_paper_pl:.2f}",
f"{(total_paper_pl/total_portfolio_value)*100:.2f}% of Portfolio")
with col3:
st.metric("Cash Allocation",
f"${cash_balance:.2f}",
f"{(cash_balance/total_portfolio_value)*100:.1f}% of Portfolio")
else:
st.info("No open positions")
with tab2:
st.subheader("Add New Trade")
@ -476,8 +980,13 @@ def trading_journal_page():
print(f"Trade object created: {trade.__dict__}") # Debug
# Add the trade to the database
add_trade(trade)
print("Trade added successfully") # Debug
# Update cash balance
update_cash_balance_for_trade(trade)
st.success("Trade added successfully!")
st.query_params(rerun=True)
except Exception as e:
@ -542,6 +1051,10 @@ def trading_journal_page():
entry_datetime = datetime.combine(entry_date, entry_time)
entry_datetime = pytz.timezone('US/Pacific').localize(entry_datetime)
# Get original trade details for cash adjustment
original_shares = trade['shares']
original_price = float(trade['entry_price']) if trade['entry_price'] is not None else 0.0
updates = {
'entry_date': entry_datetime,
'shares': new_shares,
@ -553,9 +1066,28 @@ def trading_journal_page():
'notes': new_notes
}
# If closing a position, add exit price and date
if new_exit is not None and new_exit > 0:
updates['exit_price'] = new_exit
updates['exit_date'] = entry_datetime
# Create a sell trade to update cash balance
sell_trade = TradeEntry(
ticker=trade['ticker'],
entry_date=entry_datetime,
shares=new_shares,
entry_price=new_exit, # Use exit price as the entry price for the sell trade
direction='sell',
position_id=trade['position_id']
)
# Update cash balance for the sell
update_cash_balance_for_trade(sell_trade)
print("\n--- Processing Trade Update ---") # Debug
print(f"Update data being sent: {updates}") # Debug
# Update the trade in the database
update_trade(trade_id, updates)
print("Trade update completed") # Debug
@ -577,21 +1109,70 @@ def trading_journal_page():
history = get_trade_history()
if history:
# Add time period selection
period_options = {
"1 week": 7,
"2 weeks": 14,
"1 month": 30,
"3 months": 90,
"6 months": 180,
"12 months": 365
}
col1, col2 = st.columns(2)
with col1:
selected_period = st.selectbox(
"Performance Period",
options=list(period_options.keys()),
index=2 # Default to 1 month
)
with col2:
period_view = st.selectbox(
"View By",
options=["weekly", "monthly", "quarterly", "yearly"],
index=0 # Default to weekly
)
# Calculate metrics for selected period
period_days = period_options[selected_period]
period_metrics = calculate_period_metrics(history, period_days)
# Display period metrics
st.subheader(f"Performance ({selected_period})")
metric_col1, metric_col2 = st.columns(2)
with metric_col1:
st.metric("Period P/L", f"${period_metrics['period_pl']:.2f}")
st.metric("Trades Closed", f"{period_metrics['trade_count']}")
with metric_col2:
st.metric("Return on Investment", f"{period_metrics['investment_percentage']:.2f}%",
delta_color="normal" if period_metrics['investment_percentage'] >= 0 else "inverse")
st.metric("Return on Portfolio", f"{period_metrics['portfolio_percentage']:.2f}%",
delta_color="normal" if period_metrics['portfolio_percentage'] >= 0 else "inverse")
# Add period performance chart
period_fig = plot_performance_by_period(history, period_view)
if period_fig:
st.plotly_chart(period_fig, use_container_width=True)
# Add cumulative P/L chart
st.subheader("Cumulative P/L")
fig = plot_trade_history(history)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Display trades grouped by position
st.subheader("Positions")
# Group trades by position_id
positions = {}
for trade in history:
if trade['position_id'] not in positions:
positions[trade['position_id']] = []
positions[trade['position_id']].append(trade)
# Add P/L chart
fig = plot_trade_history(history)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Display trades grouped by position
# Sort trades chronologically
for position_id, trades in positions.items():
# Sort trades chronologically
trades.sort(key=lambda x: x['entry_date'])
first_trade = trades[0]
@ -661,5 +1242,61 @@ def trading_journal_page():
if trade.get('exit_reason'):
st.text(f"Reason: {trade['exit_reason']}")
st.markdown("---")
# Add advanced metrics calculation
advanced_metrics = calculate_advanced_metrics(history)
# Display advanced metrics
st.subheader("Trading Statistics")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Win Rate", f"{advanced_metrics.get('win_rate', 0):.1f}%")
st.metric("Profit Factor", f"{advanced_metrics.get('profit_factor', 0):.2f}")
with col2:
st.metric("Avg Holding Time", advanced_metrics.get('avg_holding_str', 'N/A'))
st.metric("Total Trades", f"{advanced_metrics.get('total_trades', 0)}")
with col3:
st.metric("Avg Win", f"${advanced_metrics.get('avg_profit', 0):.2f}")
st.metric("Avg Loss", f"${advanced_metrics.get('avg_loss', 0):.2f}")
# Display biggest winner and loser
if advanced_metrics.get('biggest_winner', {}).get('profit', 0) > 0:
st.subheader("Biggest Winner")
winner = advanced_metrics['biggest_winner']
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Ticker", winner['ticker'])
st.metric("Profit", f"${winner['profit']:.2f}")
with col2:
st.metric("Return", f"{winner['percent']:.2f}%")
st.metric("Shares", f"{int(winner['shares']):,}")
with col3:
holding_days = winner['holding_time'].total_seconds() / (60 * 60 * 24)
holding_hours = winner['holding_time'].total_seconds() / (60 * 60)
holding_str = f"{holding_days:.1f} days" if holding_days >= 1 else f"{holding_hours:.1f} hours"
st.metric("Entry", f"${winner['entry_price']:.2f}")
st.metric("Exit", f"${winner['exit_price']:.2f}")
st.metric("Holding Time", holding_str)
if advanced_metrics.get('biggest_loser', {}).get('loss', 0) > 0:
st.subheader("Biggest Loser")
loser = advanced_metrics['biggest_loser']
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Ticker", loser['ticker'])
st.metric("Loss", f"${loser['loss']:.2f}")
with col2:
st.metric("Return", f"-{loser['percent']:.2f}%")
st.metric("Shares", f"{int(loser['shares']):,}")
with col3:
holding_days = loser['holding_time'].total_seconds() / (60 * 60 * 24)
holding_hours = loser['holding_time'].total_seconds() / (60 * 60)
holding_str = f"{holding_days:.1f} days" if holding_days >= 1 else f"{holding_hours:.1f} hours"
st.metric("Entry", f"${loser['entry_price']:.2f}")
st.metric("Exit", f"${loser['exit_price']:.2f}")
st.metric("Holding Time", holding_str)
else:
st.info("No trade history found")

View File

@ -25,192 +25,615 @@ def trading_system_page():
st.header("Trading System")
# Create tabs
tab1, tab2 = st.tabs(["Position Calculator", "Watch Lists"])
tab1, tab2, tab3 = st.tabs(["Position Calculator", "Prop Firm Calculator", "Watch Lists"])
# Tab 1: Position Calculator
with tab1:
st.subheader("Position Calculator")
# Get latest portfolio value and open trades for total portfolio calculation
portfolio_data = get_latest_portfolio_value()
cash_balance = portfolio_data['cash_balance'] if portfolio_data else 0
# Calculate total portfolio value including open positions
open_summary = get_open_trades_summary()
total_position_value = 0
total_paper_pl = 0
if open_summary:
# Get current prices for all open positions
unique_tickers = list(set(summary['ticker'] for summary in open_summary))
current_prices = get_current_prices(unique_tickers)
# Calculate total invested value and paper P/L
for summary in open_summary:
ticker = summary['ticker']
current_price = current_prices.get(ticker, 0)
shares = summary['total_shares']
avg_entry = summary['avg_entry_price']
# Get latest portfolio value and open trades for total portfolio calculation
portfolio_data = get_latest_portfolio_value()
cash_balance = portfolio_data['cash_balance'] if portfolio_data else 0
# Calculate total portfolio value including open positions
open_summary = get_open_trades_summary()
total_position_value = 0
total_paper_pl = 0
if open_summary:
# Get current prices for all open positions
unique_tickers = list(set(summary['ticker'] for summary in open_summary))
current_prices = get_current_prices(unique_tickers)
position_value = current_price * shares if current_price else avg_entry * shares
total_position_value += position_value
total_paper_pl += (current_price - avg_entry) * shares if current_price else 0
total_portfolio_value = cash_balance + total_position_value
# Display portfolio information
col1, col2, col3 = st.columns(3)
with col1:
st.info(f"Available Cash: ${cash_balance:,.2f}")
with col2:
st.info(f"Positions Value: ${total_position_value:,.2f}")
with col3:
st.info(f"Total Portfolio Value: ${total_portfolio_value:,.2f}")
col1, col2 = st.columns(2)
with col1:
account_size = st.number_input("Account Size ($)",
min_value=0.0,
value=total_portfolio_value,
step=1000.0)
risk_percentage = st.number_input("Risk Percentage (%)",
min_value=0.1,
max_value=100.0,
value=1.0,
step=0.1)
use_monte_carlo = st.checkbox("Use Monte Carlo for Analysis", value=True)
if use_monte_carlo:
days_out = st.number_input("Days to Project",
min_value=1,
max_value=30,
value=5,
help="Number of days to project for target price")
confidence_level = st.slider("Confidence Level (%)",
min_value=80,
max_value=99,
value=95)
with col2:
ticker = st.text_input("Ticker Symbol", value="").upper()
entry_price = st.number_input("Entry Price ($)", min_value=0.01, step=0.01)
if not use_monte_carlo:
target_price = st.number_input("Target Price ($)", min_value=0.01, step=0.01)
if st.button("Calculate Position"):
try:
if not ticker:
st.error("Please enter a ticker symbol")
return
# Calculate total invested value and paper P/L
for summary in open_summary:
ticker = summary['ticker']
current_price = current_prices.get(ticker, 0)
shares = summary['total_shares']
avg_entry = summary['avg_entry_price']
# Get historical data for Monte Carlo simulation
position_value = current_price * shares if current_price else avg_entry * shares
total_position_value += position_value
total_paper_pl += (current_price - avg_entry) * shares if current_price else 0
total_portfolio_value = cash_balance + total_position_value
# Display portfolio information
col1, col2, col3 = st.columns(3)
with col1:
st.info(f"Available Cash: ${cash_balance:,.2f}")
with col2:
st.info(f"Positions Value: ${total_position_value:,.2f}")
with col3:
st.info(f"Total Portfolio Value: ${total_portfolio_value:,.2f}")
col1, col2 = st.columns(2)
with col1:
account_size = st.number_input("Account Size ($)",
min_value=0.0,
value=total_portfolio_value,
step=1000.0,
key="personal_account_size")
risk_percentage = st.number_input("Risk Percentage (%)",
min_value=0.1,
max_value=100.0,
value=1.0,
step=0.1,
key="personal_risk_percentage")
use_monte_carlo = st.checkbox("Use Monte Carlo for Analysis", value=True, key="personal_monte_carlo")
if use_monte_carlo:
with st.spinner("Calculating optimal stop loss..."):
df = get_stock_data(
ticker,
datetime.now() - timedelta(days=30), # Last 30 days of data
datetime.now(),
'1m' # Minute data for more accurate simulation
)
days_out = st.number_input("Days to Project",
min_value=1,
max_value=30,
value=5,
help="Number of days to project for target price",
key="personal_days_out")
confidence_level = st.slider("Confidence Level (%)",
min_value=80,
max_value=99,
value=95,
key="personal_confidence_level")
with col2:
ticker = st.text_input("Ticker Symbol", value="", key="personal_ticker").upper()
entry_price = st.number_input("Entry Price ($)", min_value=0.01, step=0.01, key="personal_entry_price")
if not use_monte_carlo:
target_price = st.number_input("Target Price ($)", min_value=0.01, step=0.01, key="personal_target_price")
if st.button("Calculate Position", key="personal_calculate"):
try:
if not ticker:
st.error("Please enter a ticker symbol")
return
if df.empty:
st.error("No data available for the selected ticker")
return
# Initialize Monte Carlo simulator
# Initialize Monte Carlo simulator
simulator = MonteCarloSimulator(df, num_simulations=1000, time_horizon=days_out)
# Calculate stop loss and target prices
stop_loss_price = simulator.calculate_stop_loss(risk_percentage)
target_price = simulator.calculate_target_price(confidence_level)
# Calculate stop loss percentage
stop_loss_percentage = abs((stop_loss_price - entry_price) / entry_price * 100)
else:
stop_loss_percentage = 7.0 # Default value if not using Monte Carlo
calculator = PositionCalculator(
account_size=account_size,
risk_percentage=risk_percentage,
stop_loss_percentage=stop_loss_percentage
)
position = calculator.calculate_position_size(entry_price, target_price)
# Calculate maximum shares possible with available cash
max_shares_by_cash = int(cash_balance / entry_price) if entry_price > 0 else 0
# Adjust shares based on available cash
recommended_shares = min(position['shares'], max_shares_by_cash)
col1, col2 = st.columns(2)
with col1:
if recommended_shares < position['shares']:
st.warning(
f"Position size limited by available cash.\n"
f"Ideal shares: {position['shares']:,}\n"
f"Maximum affordable shares: {recommended_shares:,}"
)
position_value = recommended_shares * entry_price
risk_amount = position['risk_amount'] * (recommended_shares / position['shares'])
st.metric("Recommended Shares", f"{recommended_shares:,}")
st.metric("Position Value", f"${position_value:,.2f}")
st.metric("Risk Amount", f"${risk_amount:,.2f}")
# Get historical data for Monte Carlo simulation
if use_monte_carlo:
with st.spinner("Calculating optimal stop loss..."):
df = get_stock_data(
ticker,
datetime.now() - timedelta(days=30), # Last 30 days of data
datetime.now(),
'1m' # Minute data for more accurate simulation
)
if df.empty:
st.error("No data available for the selected ticker")
return
# Initialize Monte Carlo simulator
simulator = MonteCarloSimulator(df, num_simulations=1000, time_horizon=days_out)
# Calculate stop loss and target prices
stop_loss_price = simulator.calculate_stop_loss(risk_percentage)
target_price = simulator.calculate_target_price(confidence_level)
# Calculate stop loss percentage
stop_loss_percentage = abs((stop_loss_price - entry_price) / entry_price * 100)
else:
st.metric("Number of Shares", f"{position['shares']:,}")
st.metric("Position Value", f"${position['position_value']:,.2f}")
st.metric("Risk Amount", f"${position['risk_amount']:,.2f}")
with col2:
st.metric("Stop Loss Price", f"${position['stop_loss']:.2f}")
st.metric("Potential Loss", f"${position['potential_loss']:,.2f}")
if 'potential_profit' in position:
potential_profit = (target_price - entry_price) * recommended_shares
risk_reward = abs(potential_profit / (position['stop_loss'] - entry_price) / recommended_shares) if recommended_shares > 0 else 0
st.metric("Potential Profit", f"${potential_profit:,.2f}")
st.metric("Risk/Reward Ratio", f"{risk_reward:.2f}")
# Show percentage of cash being used
if recommended_shares > 0:
cash_usage = (recommended_shares * entry_price / cash_balance) * 100
portfolio_usage = (recommended_shares * entry_price / total_portfolio_value) * 100
st.info(
f"This position would use:\n"
f"- {cash_usage:.1f}% of available cash\n"
f"- {portfolio_usage:.1f}% of total portfolio"
stop_loss_percentage = 7.0 # Default value if not using Monte Carlo
calculator = PositionCalculator(
account_size=account_size,
risk_percentage=risk_percentage,
stop_loss_percentage=stop_loss_percentage
)
# Add Monte Carlo metrics if used
position = calculator.calculate_position_size(entry_price, target_price)
# Calculate maximum shares possible with available cash
max_shares_by_cash = int(cash_balance / entry_price) if entry_price > 0 else 0
# Adjust shares based on available cash
recommended_shares = min(position['shares'], max_shares_by_cash)
col1, col2 = st.columns(2)
with col1:
if recommended_shares < position['shares']:
st.warning(
f"Position size limited by available cash.\n"
f"Ideal shares: {position['shares']:,}\n"
f"Maximum affordable shares: {recommended_shares:,}"
)
position_value = recommended_shares * entry_price
risk_amount = position['risk_amount'] * (recommended_shares / position['shares'])
st.metric("Recommended Shares", f"{recommended_shares:,}")
st.metric("Position Value", f"${position_value:,.2f}")
st.metric("Risk Amount", f"${risk_amount:,.2f}")
else:
st.metric("Number of Shares", f"{position['shares']:,}")
st.metric("Position Value", f"${position['position_value']:,.2f}")
st.metric("Risk Amount", f"${position['risk_amount']:,.2f}")
with col2:
st.metric("Stop Loss Price", f"${position['stop_loss']:.2f}")
st.metric("Potential Loss", f"${position['potential_loss']:,.2f}")
if 'potential_profit' in position:
potential_profit = (target_price - entry_price) * recommended_shares
risk_reward = abs(potential_profit / (position['stop_loss'] - entry_price) / recommended_shares) if recommended_shares > 0 else 0
st.metric("Potential Profit", f"${potential_profit:,.2f}")
st.metric("Risk/Reward Ratio", f"{risk_reward:.2f}")
# Show percentage of cash being used
if recommended_shares > 0:
cash_usage = (recommended_shares * entry_price / cash_balance) * 100
portfolio_usage = (recommended_shares * entry_price / total_portfolio_value) * 100
st.info(
f"This position would use:\n"
f"- {cash_usage:.1f}% of available cash\n"
f"- {portfolio_usage:.1f}% of total portfolio"
)
# Add Monte Carlo metrics if used
if use_monte_carlo:
st.subheader("Monte Carlo Analysis")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Stop Loss Price", f"${stop_loss_price:.2f}")
st.metric("Stop Loss %", f"{stop_loss_percentage:.2f}%")
with col2:
st.metric("Target Price", f"${target_price:.2f}")
st.metric("Target %", f"{((target_price - entry_price) / entry_price * 100):.2f}%")
with col3:
st.metric("Days Projected", f"{days_out}")
st.metric("Confidence Level", f"{confidence_level}%")
# Add to watchlist option
st.divider()
st.subheader("Save to Watch List")
if st.button("Prepare for Watch List", key="personal_prepare_watchlist"):
st.session_state.prefill_watchlist = {
'ticker': ticker,
'entry_price': float(entry_price),
'target_price': float(target_price),
'stop_loss': float(position['stop_loss']),
'shares': recommended_shares
}
st.success("Details saved! Switch to Watch Lists tab to complete adding to your watch list.")
except Exception as e:
st.error(f"Error calculating position: {str(e)}")
# Tab 2: Prop Firm Calculator
with tab2:
st.subheader("Prop Firm Calculator")
# Prop firm parameters
st.markdown("### Prop Firm Parameters")
col1, col2 = st.columns(2)
with col1:
buying_power = st.number_input(
"Buying Power ($)",
min_value=1000.0,
value=20000.0,
step=1000.0,
help="Total capital allocated by the prop firm"
)
max_daily_loss = st.number_input(
"Daily Loss Limit ($)",
min_value=100.0,
value=300.0,
step=50.0,
help="Maximum loss allowed in a single trading day"
)
max_total_loss = st.number_input(
"Max Total Loss ($)",
min_value=100.0,
value=900.0,
step=50.0,
help="Maximum total loss allowed during the evaluation period"
)
with col2:
evaluation_days = st.number_input(
"Evaluation Period (Days)",
min_value=5,
value=45,
step=1,
help="Number of days in the evaluation period"
)
risk_percentage = st.number_input(
"Risk Percentage per Trade (%)",
min_value=0.1,
max_value=100.0,
value=1.0,
step=0.1,
help="Percentage of account to risk on each trade"
)
max_position_size = st.number_input(
"Max Position Size (%)",
min_value=1.0,
max_value=100.0,
value=15.0,
step=1.0,
help="Maximum percentage of buying power for a single position"
)
# In the prop firm calculator section, add this after the max_position_size input
day_trading_mode = st.checkbox(
"Day Trading Mode",
value=True,
help="Enable specific settings for day trading"
)
if day_trading_mode:
max_loss_per_trade = st.number_input(
"Max Loss Per Trade ($)",
min_value=10.0,
max_value=max_daily_loss,
value=50.0,
step=10.0,
help="Maximum dollar amount to risk on a single trade"
)
# Calculate how many trades you can take at max loss
max_trades_at_full_loss = int(max_daily_loss / max_loss_per_trade)
st.info(f"You can take up to {max_trades_at_full_loss} trades at maximum loss before hitting daily limit")
# Add position scaling parameters to the top form
use_scaling = st.checkbox(
"Enable Position Scaling",
value=True,
help="Calculate how to add to winning positions"
)
if use_scaling:
col1, col2 = st.columns(2)
with col1:
num_entries = st.number_input(
"Number of Entry Points",
min_value=2,
max_value=5,
value=3,
help="How many times you want to add to your position"
)
scaling_method = st.selectbox(
"Scaling Method",
options=["Equal Size", "Increasing Size", "Decreasing Size"],
index=0,
help="How to distribute position size across entries"
)
with col2:
price_increment = st.number_input(
"Price Movement Between Entries (%)",
min_value=0.1,
max_value=10.0,
value=1.0,
step=0.1,
help="How much the price should move before adding to position"
)
max_scaling_factor = st.number_input(
"Maximum Position Scaling Factor",
min_value=1.0,
max_value=10.0,
value=3.0,
step=0.5,
help="Maximum multiple of initial position size"
)
# Position calculator
st.markdown("### Position Calculator")
col1, col2 = st.columns(2)
with col1:
ticker = st.text_input("Ticker Symbol", value="", key="prop_ticker").upper()
entry_price = st.number_input("Entry Price ($)", min_value=0.01, step=0.01, key="prop_entry_price")
use_monte_carlo = st.checkbox("Use Monte Carlo for Analysis", value=True, key="prop_monte_carlo")
if use_monte_carlo:
st.subheader("Monte Carlo Analysis")
days_out = st.number_input(
"Days to Project",
min_value=1,
max_value=30,
value=5,
help="Number of days to project for target price",
key="prop_days_out"
)
confidence_level = st.slider(
"Confidence Level (%)",
min_value=80,
max_value=99,
value=95,
key="prop_confidence_level"
)
with col2:
if not use_monte_carlo:
target_price = st.number_input("Target Price ($)", min_value=0.01, step=0.01, key="prop_target_price")
stop_loss_price = st.number_input("Stop Loss Price ($)", min_value=0.01, step=0.01, key="prop_stop_loss")
# Calculate daily risk limit as a percentage
daily_risk_pct = (max_daily_loss / buying_power) * 100
st.info(f"Daily risk limit: {daily_risk_pct:.2f}% of account")
# Calculate total risk limit as a percentage
total_risk_pct = (max_total_loss / buying_power) * 100
st.info(f"Total risk limit: {total_risk_pct:.2f}% of account")
if st.button("Calculate Position", key="prop_calculate"):
try:
if not ticker:
st.error("Please enter a ticker symbol")
return
# Get historical data for Monte Carlo simulation
if use_monte_carlo:
with st.spinner("Calculating optimal stop loss..."):
df = get_stock_data(
ticker,
datetime.now() - timedelta(days=30), # Last 30 days of data
datetime.now(),
'1m' # Minute data for more accurate simulation
)
if df.empty:
st.error("No data available for the selected ticker")
return
# Initialize Monte Carlo simulator
simulator = MonteCarloSimulator(df, num_simulations=1000, time_horizon=days_out)
# Calculate stop loss and target prices
stop_loss_price = simulator.calculate_stop_loss(risk_percentage)
target_price = simulator.calculate_target_price(confidence_level)
# Calculate stop loss percentage
stop_loss_percentage = abs((stop_loss_price - entry_price) / entry_price * 100)
else:
stop_loss_percentage = abs((stop_loss_price - entry_price) / entry_price * 100)
# Calculate position size based on risk percentage
calculator = PositionCalculator(
account_size=buying_power,
risk_percentage=risk_percentage,
stop_loss_percentage=stop_loss_percentage
)
position = calculator.calculate_position_size(entry_price, target_price)
# Calculate maximum shares based on daily loss limit
max_shares_by_daily_loss = int(max_daily_loss / abs(entry_price - stop_loss_price)) if entry_price != stop_loss_price else 0
# Calculate maximum shares based on position size limit
max_position_value = buying_power * (max_position_size / 100)
max_shares_by_position_limit = int(max_position_value / entry_price) if entry_price > 0 else 0
# Calculate shares based on fixed dollar risk
max_shares_by_fixed_risk = int(max_loss_per_trade / abs(entry_price - stop_loss_price)) if entry_price != stop_loss_price else 0
# Update recommended shares calculation
recommended_shares = min(
position['shares'],
max_shares_by_daily_loss,
max_shares_by_position_limit,
max_shares_by_fixed_risk
)
# Calculate position metrics
position_value = recommended_shares * entry_price
max_loss = abs(entry_price - stop_loss_price) * recommended_shares
potential_profit = abs(target_price - entry_price) * recommended_shares
risk_reward = potential_profit / max_loss if max_loss > 0 else 0
# Display results
st.markdown("### Position Results")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Stop Loss Price", f"${stop_loss_price:.2f}")
st.metric("Stop Loss %", f"{stop_loss_percentage:.2f}%")
with col2:
st.metric("Target Price", f"${target_price:.2f}")
st.metric("Target %", f"{((target_price - entry_price) / entry_price * 100):.2f}%")
with col3:
st.metric("Days Projected", f"{days_out}")
st.metric("Confidence Level", f"{confidence_level}%")
# Add to watchlist option
st.divider()
st.subheader("Save to Watch List")
if st.button("Prepare for Watch List", key="prepare_watchlist"):
st.session_state.prefill_watchlist = {
'ticker': ticker,
'entry_price': float(entry_price),
'target_price': float(target_price),
'stop_loss': float(position['stop_loss']),
'shares': recommended_shares
}
st.success("Details saved! Switch to Watch Lists tab to complete adding to your watch list.")
st.metric("Recommended Shares", f"{recommended_shares:,}")
st.metric("Position Value", f"${position_value:,.2f}")
st.metric("% of Buying Power", f"{(position_value/buying_power*100):.2f}%")
except Exception as e:
st.error(f"Error calculating position: {str(e)}")
with tab2:
with col2:
st.metric("Stop Loss Price", f"${stop_loss_price:.2f}")
st.metric("Maximum Loss", f"${max_loss:.2f}")
st.metric("% of Daily Limit", f"{(max_loss/max_daily_loss*100):.2f}%")
with col3:
st.metric("Target Price", f"${target_price:.2f}")
st.metric("Potential Profit", f"${potential_profit:.2f}")
st.metric("Risk/Reward Ratio", f"{risk_reward:.2f}")
# Show constraint information
st.subheader("Position Constraints")
constraints = {
"Risk-based position size": position['shares'],
"Daily loss limit": max_shares_by_daily_loss,
"Maximum position size": max_shares_by_position_limit,
"Fixed dollar risk per trade": max_shares_by_fixed_risk
}
# Determine which constraint is active
active_constraint = min(constraints, key=constraints.get)
for constraint, shares in constraints.items():
if constraint == active_constraint:
st.warning(f"**{constraint}**: {shares:,} shares (active constraint)")
else:
st.info(f"{constraint}: {shares:,} shares")
# Add Monte Carlo metrics if used
if use_monte_carlo:
st.subheader("Monte Carlo Analysis")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Stop Loss Price", f"${stop_loss_price:.2f}")
st.metric("Stop Loss %", f"{stop_loss_percentage:.2f}%")
with col2:
st.metric("Target Price", f"${target_price:.2f}")
st.metric("Target %", f"{((target_price - entry_price) / entry_price * 100):.2f}%")
with col3:
st.metric("Days Projected", f"{days_out}")
st.metric("Confidence Level", f"{confidence_level}%")
# Add to watchlist option
st.divider()
st.subheader("Save to Watch List")
if st.button("Prepare for Watch List", key="prop_prepare_watchlist"):
st.session_state.prefill_watchlist = {
'ticker': ticker,
'entry_price': float(entry_price),
'target_price': float(target_price),
'stop_loss': float(stop_loss_price),
'shares': recommended_shares
}
st.success("Details saved! Switch to Watch Lists tab to complete adding to your watch list.")
# Add this after the position results section in the prop firm calculator tab
if use_scaling and recommended_shares > 0:
st.divider()
st.subheader("Position Scaling Strategy")
# Calculate scaling strategy
initial_shares = recommended_shares
max_total_shares = int(initial_shares * max_scaling_factor)
# Calculate shares for each entry point
entry_points = []
# First entry is the initial position
entry_points.append({
"entry_num": 1,
"price": entry_price,
"shares": initial_shares,
"value": initial_shares * entry_price,
"cumulative_shares": initial_shares,
"cumulative_value": initial_shares * entry_price
})
# Calculate remaining entry points
for i in range(2, num_entries + 1):
# Calculate price for this entry
price_movement = (price_increment / 100) * entry_price * (i - 1)
entry_price_i = entry_price + price_movement
# Calculate shares for this entry based on scaling method
if scaling_method == "Equal Size":
shares_i = initial_shares
elif scaling_method == "Increasing Size":
shares_i = int(initial_shares * (1 + (i - 1) * 0.5))
else: # Decreasing Size
shares_i = int(initial_shares * (1 - (i - 1) * 0.25))
shares_i = max(shares_i, int(initial_shares * 0.25)) # Ensure minimum size
# Ensure we don't exceed max total shares
cumulative_shares = entry_points[-1]["cumulative_shares"] + shares_i
if cumulative_shares > max_total_shares:
shares_i = max_total_shares - entry_points[-1]["cumulative_shares"]
if shares_i <= 0:
break
# Add entry point
entry_points.append({
"entry_num": i,
"price": entry_price_i,
"shares": shares_i,
"value": shares_i * entry_price_i,
"cumulative_shares": cumulative_shares,
"cumulative_value": entry_points[-1]["cumulative_value"] + (shares_i * entry_price_i)
})
# Display scaling strategy
st.markdown("#### Ladder Entry Strategy")
# Create a table for the entry points
data = []
for ep in entry_points:
data.append([
f"Entry #{ep['entry_num']}",
f"${ep['price']:.2f}",
f"{ep['shares']:,}",
f"${ep['value']:.2f}",
f"{ep['cumulative_shares']:,}",
f"${ep['cumulative_value']:.2f}"
])
st.table({
"Entry Point": [row[0] for row in data],
"Price": [row[1] for row in data],
"Shares": [row[2] for row in data],
"Position Value": [row[3] for row in data],
"Cumulative Shares": [row[4] for row in data],
"Cumulative Value": [row[5] for row in data]
})
# Calculate average entry price after all entries
if entry_points:
final_entry = entry_points[-1]
# Calculate weighted average entry price correctly
total_value = 0
total_shares = 0
for entry in entry_points:
total_value += entry["price"] * entry["shares"]
total_shares += entry["shares"]
avg_entry = total_value / total_shares if total_shares > 0 else 0
max_position_value = final_entry["cumulative_value"]
max_position_pct = (max_position_value / buying_power) * 100
col1, col2 = st.columns(2)
with col1:
st.metric("Final Position Size", f"{final_entry['cumulative_shares']:,} shares")
st.metric("Average Entry Price", f"${avg_entry:.2f}")
with col2:
st.metric("Total Position Value", f"${max_position_value:.2f}")
st.metric("% of Buying Power", f"{max_position_pct:.2f}%")
# Risk analysis for the scaled position
max_loss_scaled = (avg_entry - stop_loss_price) * final_entry["cumulative_shares"]
max_profit_scaled = (target_price - avg_entry) * final_entry["cumulative_shares"]
risk_reward_scaled = max_profit_scaled / max_loss_scaled if max_loss_scaled > 0 else 0
st.markdown("#### Risk Analysis for Scaled Position")
col1, col2 = st.columns(2)
with col1:
st.metric("Maximum Loss", f"${max_loss_scaled:.2f}")
st.metric("% of Daily Limit", f"{(max_loss_scaled/max_daily_loss*100):.2f}%")
with col2:
st.metric("Potential Profit", f"${max_profit_scaled:.2f}")
st.metric("Risk/Reward Ratio", f"{risk_reward_scaled:.2f}")
except Exception as e:
st.error(f"Error calculating position: {str(e)}")
logger.exception("Error in prop firm calculator")
# Tab 3: Watch Lists
with tab3:
st.subheader("Watch Lists")
# Create new watch list

View File

@ -9,6 +9,7 @@ from pages.trading.trading_system_page import trading_system_page
from pages.trading.trading_plan_page import trading_plan_page
from pages.backtesting.backtesting_page import backtesting_page
from pages.analysis.monte_carlo_page import monte_carlo_page
from pages.analysis.ai_forecast_page import ai_forecast_page
from trading.journal import (
create_trades_table, get_open_trades, get_trade_history,
get_latest_portfolio_value, update_portfolio_value
@ -32,7 +33,9 @@ def main():
st.sidebar.title("Navigation")
st.session_state.page = st.sidebar.radio(
"Go to",
["Strategy Guide", "Trading Journal", "Technical Scanner", "CANSLIM Screener", "Trading System", "Trading Plans", "Backtesting", "Monte Carlo Analysis"]
["Strategy Guide", "Trading Journal", "Technical Scanner", "CANSLIM Screener",
"Trading System", "Trading Plans", "Backtesting", "Monte Carlo Analysis",
"AI Stock Forecast"]
)
# Create necessary tables
@ -56,6 +59,8 @@ def main():
backtesting_page()
elif st.session_state.page == "Monte Carlo Analysis":
monte_carlo_page()
elif st.session_state.page == "AI Stock Forecast":
ai_forecast_page()
if __name__ == "__main__":
main()