395 lines
16 KiB
Python
395 lines
16 KiB
Python
import streamlit as st
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
import pytz
|
|
from trading.journal import (
|
|
create_trades_table, get_open_trades, get_trade_history,
|
|
add_trade, update_trade, delete_trade, TradeEntry,
|
|
get_open_trades_summary, get_current_prices, generate_position_id,
|
|
get_position_summary
|
|
)
|
|
from trading.position_calculator import PositionCalculator
|
|
from screener.scanner_controller import run_technical_scanner
|
|
from screener.canslim_controller import run_canslim_screener
|
|
import plotly.graph_objects as go
|
|
from plotly.subplots import make_subplots
|
|
|
|
def init_session_state():
|
|
"""Initialize session state variables"""
|
|
if 'page' not in st.session_state:
|
|
st.session_state.page = 'Trading Journal'
|
|
|
|
def load_scanner_reports():
|
|
"""Load and return available scanner reports"""
|
|
import os
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
reports = []
|
|
reports_dir = "reports"
|
|
|
|
if os.path.exists(reports_dir):
|
|
for file in os.listdir(reports_dir):
|
|
if file.endswith(".csv"):
|
|
file_path = os.path.join(reports_dir, file)
|
|
# Get file creation time
|
|
created = datetime.fromtimestamp(os.path.getctime(file_path))
|
|
reports.append({
|
|
'name': file,
|
|
'path': file_path,
|
|
'created': created
|
|
})
|
|
|
|
# Sort by creation time, newest first
|
|
return sorted(reports, key=lambda x: x['created'], reverse=True)
|
|
|
|
def format_datetime(dt):
|
|
"""Format datetime for display"""
|
|
if dt:
|
|
return dt.strftime('%Y-%m-%d %H:%M')
|
|
return ''
|
|
|
|
def plot_trade_history(trades):
|
|
"""Create a P/L chart using Plotly"""
|
|
if not trades:
|
|
return None
|
|
|
|
# Prepare data
|
|
dates = []
|
|
pnl = []
|
|
cumulative_pnl = 0
|
|
|
|
for trade in trades:
|
|
if trade['exit_price']:
|
|
trade_pnl = (trade['exit_price'] - trade['entry_price']) * trade['shares']
|
|
cumulative_pnl += trade_pnl
|
|
dates.append(trade['exit_date'])
|
|
pnl.append(cumulative_pnl)
|
|
|
|
if not dates:
|
|
return None
|
|
|
|
# Create figure
|
|
fig = go.Figure()
|
|
fig.add_trace(
|
|
go.Scatter(x=dates, y=pnl, mode='lines+markers',
|
|
name='Cumulative P/L',
|
|
line=dict(color='blue'),
|
|
hovertemplate='Date: %{x}<br>P/L: $%{y:.2f}<extra></extra>')
|
|
)
|
|
|
|
fig.update_layout(
|
|
title='Cumulative Profit/Loss Over Time',
|
|
xaxis_title='Date',
|
|
yaxis_title='Cumulative P/L ($)',
|
|
hovermode='x unified'
|
|
)
|
|
|
|
return fig
|
|
|
|
def trading_journal_page():
|
|
st.header("Trading Journal")
|
|
|
|
# Tabs for different journal functions
|
|
tab1, tab2, tab3, tab4 = st.tabs(["Open Positions", "Add Trade", "Update Trade", "Trade History"])
|
|
|
|
with tab1:
|
|
st.subheader("Open Positions")
|
|
open_trades = get_open_trades()
|
|
open_summary = get_open_trades_summary()
|
|
|
|
if open_summary:
|
|
# Get current prices
|
|
unique_tickers = list(set(summary['ticker'] for summary in open_summary))
|
|
current_prices = get_current_prices(unique_tickers)
|
|
|
|
total_portfolio_value = 0
|
|
total_paper_pl = 0
|
|
|
|
for summary in open_summary:
|
|
with st.expander(f"{summary['ticker']} Summary"):
|
|
ticker = summary['ticker']
|
|
avg_entry = summary['avg_entry_price']
|
|
current_price = current_prices.get(ticker)
|
|
total_shares = summary['total_shares']
|
|
position_value = avg_entry * total_shares
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.metric("Total Shares", f"{total_shares:,}")
|
|
st.metric("Average Entry", f"${avg_entry:.2f}")
|
|
st.metric("Position Value", f"${position_value:.2f}")
|
|
|
|
with col2:
|
|
if current_price:
|
|
current_value = current_price * total_shares
|
|
paper_pl = (current_price - avg_entry) * total_shares
|
|
pl_percentage = (paper_pl / position_value) * 100
|
|
|
|
st.metric("Current Price", f"${current_price:.2f}")
|
|
st.metric("Paper P/L", f"${paper_pl:.2f}", f"{pl_percentage:.2f}%")
|
|
|
|
total_portfolio_value += current_value
|
|
total_paper_pl += paper_pl
|
|
|
|
if total_portfolio_value > 0:
|
|
st.markdown("---")
|
|
st.subheader("Portfolio Summary")
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.metric("Total Portfolio Value", f"${total_portfolio_value:.2f}")
|
|
with col2:
|
|
st.metric("Total P/L", f"${total_paper_pl:.2f}",
|
|
f"{(total_paper_pl / (total_portfolio_value - total_paper_pl)) * 100:.2f}%")
|
|
|
|
with tab2:
|
|
st.subheader("Add New Trade")
|
|
|
|
ticker = st.text_input("Ticker Symbol").upper()
|
|
|
|
if ticker:
|
|
# Show existing positions for this ticker
|
|
existing_positions = get_position_summary(ticker)
|
|
if existing_positions:
|
|
st.write(f"Existing {ticker} Positions:")
|
|
for pos in existing_positions:
|
|
st.write(f"Position ID: {pos['position_id']}")
|
|
st.write(f"Total Shares: {pos['total_shares']}")
|
|
st.write(f"Average Entry: ${pos['avg_entry_price']:.2f}")
|
|
|
|
add_to_existing = st.checkbox("Add to existing position")
|
|
if add_to_existing:
|
|
position_id = st.selectbox(
|
|
"Select Position ID",
|
|
options=[pos['position_id'] for pos in existing_positions]
|
|
)
|
|
else:
|
|
position_id = generate_position_id(ticker)
|
|
else:
|
|
position_id = generate_position_id(ticker)
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
shares = st.number_input("Number of Shares", min_value=1, step=1)
|
|
entry_price = st.number_input("Entry Price", min_value=0.01, step=0.01)
|
|
target_price = st.number_input("Target Price", min_value=0.01, step=0.01)
|
|
|
|
with col2:
|
|
stop_loss = st.number_input("Stop Loss", min_value=0.01, step=0.01)
|
|
strategy = st.text_input("Strategy")
|
|
order_type = st.selectbox("Order Type", ["Market", "Limit"])
|
|
|
|
entry_date = st.date_input("Entry Date")
|
|
entry_time = st.time_input("Entry Time")
|
|
|
|
followed_rules = st.checkbox("Followed Trading Rules")
|
|
entry_reason = st.text_area("Entry Reason")
|
|
notes = st.text_area("Notes")
|
|
|
|
if st.button("Add Trade"):
|
|
try:
|
|
entry_datetime = datetime.combine(entry_date, entry_time)
|
|
entry_datetime = pytz.timezone('US/Pacific').localize(entry_datetime)
|
|
|
|
trade = TradeEntry(
|
|
ticker=ticker,
|
|
entry_date=entry_datetime,
|
|
shares=shares,
|
|
entry_price=entry_price,
|
|
target_price=target_price,
|
|
stop_loss=stop_loss,
|
|
strategy=strategy,
|
|
order_type=order_type,
|
|
position_id=position_id,
|
|
followed_rules=followed_rules,
|
|
entry_reason=entry_reason,
|
|
notes=notes
|
|
)
|
|
|
|
add_trade(trade)
|
|
st.success("Trade added successfully!")
|
|
st.experimental_rerun()
|
|
except Exception as e:
|
|
st.error(f"Error adding trade: {str(e)}")
|
|
|
|
with tab3:
|
|
st.subheader("Update Trade")
|
|
open_trades = get_open_trades()
|
|
|
|
if open_trades:
|
|
trade_id = st.selectbox(
|
|
"Select Trade to Update",
|
|
options=[t['id'] for t in open_trades],
|
|
format_func=lambda x: f"{next(t['ticker'] for t in open_trades if t['id'] == x)} - {x}"
|
|
)
|
|
|
|
trade = next(t for t in open_trades if t['id'] == trade_id)
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
new_shares = st.number_input("Shares", value=trade['shares'])
|
|
new_entry = st.number_input("Entry Price", value=float(trade['entry_price']))
|
|
new_target = st.number_input("Target Price", value=float(trade['target_price']))
|
|
|
|
with col2:
|
|
new_stop = st.number_input("Stop Loss", value=float(trade['stop_loss']))
|
|
new_strategy = st.text_input("Strategy", value=trade['strategy'])
|
|
new_order_type = st.selectbox("Order Type", ["Market", "Limit"],
|
|
index=0 if trade['order_type'] == "Market" else 1)
|
|
|
|
new_notes = st.text_area("Notes", value=trade['notes'] if trade['notes'] else "")
|
|
|
|
if st.button("Update Trade"):
|
|
updates = {
|
|
'shares': new_shares,
|
|
'entry_price': new_entry,
|
|
'target_price': new_target,
|
|
'stop_loss': new_stop,
|
|
'strategy': new_strategy,
|
|
'order_type': new_order_type,
|
|
'notes': new_notes
|
|
}
|
|
|
|
try:
|
|
update_trade(trade_id, updates)
|
|
st.success("Trade updated successfully!")
|
|
st.experimental_rerun()
|
|
except Exception as e:
|
|
st.error(f"Error updating trade: {str(e)}")
|
|
else:
|
|
st.info("No open trades to update")
|
|
|
|
with tab4:
|
|
st.subheader("Trade History")
|
|
history = get_trade_history()
|
|
|
|
if history:
|
|
# Add P/L chart
|
|
fig = plot_trade_history(history)
|
|
if fig:
|
|
st.plotly_chart(fig, use_container_width=True)
|
|
|
|
for trade in history:
|
|
with st.expander(f"{trade['ticker']} - {format_datetime(trade['entry_date'])}"):
|
|
profit_loss = (trade['exit_price'] - trade['entry_price']) * trade['shares'] if trade['exit_price'] else None
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.metric("Entry Price", f"${trade['entry_price']:.2f}")
|
|
st.metric("Shares", trade['shares'])
|
|
if profit_loss:
|
|
st.metric("P/L", f"${profit_loss:.2f}")
|
|
|
|
with col2:
|
|
if trade['exit_price']:
|
|
st.metric("Exit Price", f"${trade['exit_price']:.2f}")
|
|
st.metric("Exit Date", format_datetime(trade['exit_date']))
|
|
|
|
st.text(f"Strategy: {trade['strategy']}")
|
|
if trade['notes']:
|
|
st.text(f"Notes: {trade['notes']}")
|
|
else:
|
|
st.info("No trade history found")
|
|
|
|
def technical_scanner_page():
|
|
st.header("Technical Scanner")
|
|
|
|
# Create tabs for scanner and reports
|
|
scanner_tab, reports_tab = st.tabs(["Run Scanner", "View Reports"])
|
|
|
|
with scanner_tab:
|
|
scanner_type = st.selectbox(
|
|
"Select Scanner",
|
|
["SunnyBands", "ATR-EMA", "ATR-EMA v2"]
|
|
)
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
min_price = st.number_input("Minimum Price", value=5.0, step=0.1)
|
|
max_price = st.number_input("Maximum Price", value=100.0, step=0.1)
|
|
|
|
with col2:
|
|
min_volume = st.number_input("Minimum Volume", value=500000, step=100000)
|
|
portfolio_size = st.number_input("Portfolio Size", value=100000.0, step=1000.0)
|
|
|
|
if st.button("Run Scanner"):
|
|
with st.spinner("Running scanner..."):
|
|
try:
|
|
signals = run_technical_scanner(scanner_type.lower().replace(" ", "_"))
|
|
if signals:
|
|
st.success(f"Found {len(signals)} signals")
|
|
for signal in signals:
|
|
with st.expander(f"{signal['ticker']} - ${signal['entry_price']:.2f}"):
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.metric("Entry Price", f"${signal['entry_price']:.2f}")
|
|
st.metric("Target", f"${signal['target_price']:.2f}")
|
|
st.metric("Stop Loss", f"${signal['stop_loss']:.2f}")
|
|
with col2:
|
|
st.metric("Shares", signal['shares'])
|
|
st.metric("Position Size", f"${signal['position_size']:.2f}")
|
|
st.metric("Risk Amount", f"${abs(signal['risk_amount']):.2f}")
|
|
else:
|
|
st.info("No signals found")
|
|
except Exception as e:
|
|
st.error(f"Error running scanner: {str(e)}")
|
|
|
|
with reports_tab:
|
|
st.subheader("Scanner Reports")
|
|
|
|
reports = load_scanner_reports()
|
|
if reports:
|
|
# Create a selectbox to choose the report
|
|
selected_report = st.selectbox(
|
|
"Select Report",
|
|
options=reports,
|
|
format_func=lambda x: f"{x['name']} ({x['created'].strftime('%Y-%m-%d %H:%M')})"
|
|
)
|
|
|
|
if selected_report:
|
|
try:
|
|
# Load and display the CSV
|
|
df = pd.read_csv(selected_report['path'])
|
|
|
|
# Add download button
|
|
st.download_button(
|
|
label="Download Report",
|
|
data=df.to_csv(index=False),
|
|
file_name=selected_report['name'],
|
|
mime='text/csv'
|
|
)
|
|
|
|
# Display the dataframe
|
|
st.dataframe(df)
|
|
|
|
except Exception as e:
|
|
st.error(f"Error loading report: {str(e)}")
|
|
else:
|
|
st.info("No scanner reports found")
|
|
|
|
def main():
|
|
st.set_page_config(page_title="Trading System", layout="wide")
|
|
init_session_state()
|
|
|
|
# Sidebar navigation
|
|
st.sidebar.title("Navigation")
|
|
st.session_state.page = st.sidebar.radio(
|
|
"Go to",
|
|
["Trading Journal", "Technical Scanner", "CANSLIM Screener"]
|
|
)
|
|
|
|
# Create necessary tables
|
|
create_trades_table()
|
|
|
|
# Display selected page
|
|
if st.session_state.page == "Trading Journal":
|
|
trading_journal_page()
|
|
elif st.session_state.page == "Technical Scanner":
|
|
technical_scanner_page()
|
|
elif st.session_state.page == "CANSLIM Screener":
|
|
st.header("CANSLIM Screener")
|
|
# Add CANSLIM screener implementation
|
|
|
|
if __name__ == "__main__":
|
|
main()
|