import streamlit as st import pandas as pd from datetime import datetime import pytz from trading.journal import ( create_trades_table, get_open_trades, get_trade_history, add_trade, update_trade, delete_trade, TradeEntry, get_open_trades_summary, get_current_prices, generate_position_id, get_position_summary ) from trading.position_calculator import PositionCalculator from screener.scanner_controller import run_technical_scanner from screener.canslim_controller import run_canslim_screener import plotly.graph_objects as go from plotly.subplots import make_subplots def init_session_state(): """Initialize session state variables""" if 'page' not in st.session_state: st.session_state.page = 'Trading Journal' def load_scanner_reports(): """Load and return available scanner reports""" import os import pandas as pd from datetime import datetime reports = [] reports_dir = "reports" if os.path.exists(reports_dir): for file in os.listdir(reports_dir): if file.endswith(".csv"): file_path = os.path.join(reports_dir, file) # Get file creation time created = datetime.fromtimestamp(os.path.getctime(file_path)) reports.append({ 'name': file, 'path': file_path, 'created': created }) # Sort by creation time, newest first return sorted(reports, key=lambda x: x['created'], reverse=True) def format_datetime(dt): """Format datetime for display""" if dt: return dt.strftime('%Y-%m-%d %H:%M') return '' def plot_trade_history(trades): """Create a P/L chart using Plotly""" if not trades: return None # Prepare data dates = [] pnl = [] cumulative_pnl = 0 for trade in trades: if trade['exit_price']: trade_pnl = (trade['exit_price'] - trade['entry_price']) * trade['shares'] cumulative_pnl += trade_pnl dates.append(trade['exit_date']) pnl.append(cumulative_pnl) if not dates: return None # Create figure fig = go.Figure() fig.add_trace( go.Scatter(x=dates, y=pnl, mode='lines+markers', name='Cumulative P/L', line=dict(color='blue'), hovertemplate='Date: %{x}
P/L: $%{y:.2f}') ) fig.update_layout( title='Cumulative Profit/Loss Over Time', xaxis_title='Date', yaxis_title='Cumulative P/L ($)', hovermode='x unified' ) return fig def trading_journal_page(): st.header("Trading Journal") # Tabs for different journal functions tab1, tab2, tab3, tab4 = st.tabs(["Open Positions", "Add Trade", "Update Trade", "Trade History"]) with tab1: st.subheader("Open Positions") open_trades = get_open_trades() open_summary = get_open_trades_summary() if open_summary: # Get current prices unique_tickers = list(set(summary['ticker'] for summary in open_summary)) current_prices = get_current_prices(unique_tickers) total_portfolio_value = 0 total_paper_pl = 0 for summary in open_summary: with st.expander(f"{summary['ticker']} Summary"): ticker = summary['ticker'] avg_entry = summary['avg_entry_price'] current_price = current_prices.get(ticker) total_shares = summary['total_shares'] position_value = avg_entry * total_shares col1, col2 = st.columns(2) with col1: st.metric("Total Shares", f"{total_shares:,}") st.metric("Average Entry", f"${avg_entry:.2f}") st.metric("Position Value", f"${position_value:.2f}") with col2: if current_price: current_value = current_price * total_shares paper_pl = (current_price - avg_entry) * total_shares pl_percentage = (paper_pl / position_value) * 100 st.metric("Current Price", f"${current_price:.2f}") st.metric("Paper P/L", f"${paper_pl:.2f}", f"{pl_percentage:.2f}%") total_portfolio_value += current_value total_paper_pl += paper_pl if total_portfolio_value > 0: st.markdown("---") st.subheader("Portfolio Summary") col1, col2 = st.columns(2) with col1: st.metric("Total Portfolio Value", f"${total_portfolio_value:.2f}") with col2: st.metric("Total P/L", f"${total_paper_pl:.2f}", f"{(total_paper_pl / (total_portfolio_value - total_paper_pl)) * 100:.2f}%") with tab2: st.subheader("Add New Trade") ticker = st.text_input("Ticker Symbol").upper() if ticker: # Show existing positions for this ticker existing_positions = get_position_summary(ticker) if existing_positions: st.write(f"Existing {ticker} Positions:") for pos in existing_positions: st.write(f"Position ID: {pos['position_id']}") st.write(f"Total Shares: {pos['total_shares']}") st.write(f"Average Entry: ${pos['avg_entry_price']:.2f}") add_to_existing = st.checkbox("Add to existing position") if add_to_existing: position_id = st.selectbox( "Select Position ID", options=[pos['position_id'] for pos in existing_positions], key="position_select" ) else: position_id = generate_position_id(ticker) else: position_id = generate_position_id(ticker) col1, col2 = st.columns(2) with col1: shares = st.number_input("Number of Shares", min_value=1, step=1) entry_price = st.number_input("Entry Price", min_value=0.01, step=0.01) target_price = st.number_input("Target Price", min_value=0.01, step=0.01) with col2: stop_loss = st.number_input("Stop Loss", min_value=0.01, step=0.01) strategy = st.text_input("Strategy") order_type = st.selectbox("Order Type", ["Market", "Limit"], key="add_trade_order_type") entry_date = st.date_input("Entry Date") entry_time = st.time_input("Entry Time") followed_rules = st.checkbox("Followed Trading Rules") entry_reason = st.text_area("Entry Reason") notes = st.text_area("Notes") if st.button("Add Trade"): try: entry_datetime = datetime.combine(entry_date, entry_time) entry_datetime = pytz.timezone('US/Pacific').localize(entry_datetime) trade = TradeEntry( ticker=ticker, entry_date=entry_datetime, shares=shares, entry_price=entry_price, target_price=target_price, stop_loss=stop_loss, strategy=strategy, order_type=order_type, position_id=position_id, followed_rules=followed_rules, entry_reason=entry_reason, notes=notes ) add_trade(trade) st.success("Trade added successfully!") st.experimental_rerun() except Exception as e: st.error(f"Error adding trade: {str(e)}") with tab3: st.subheader("Update Trade") open_trades = get_open_trades() if open_trades: trade_id = st.selectbox( "Select Trade to Update", options=[t['id'] for t in open_trades], format_func=lambda x: f"{next(t['ticker'] for t in open_trades if t['id'] == x)} - {x}", key="trade_select" ) trade = next(t for t in open_trades if t['id'] == trade_id) col1, col2 = st.columns(2) with col1: new_shares = st.number_input("Shares", value=trade['shares']) new_entry = st.number_input("Entry Price", value=float(trade['entry_price'])) new_target = st.number_input("Target Price", value=float(trade['target_price'])) with col2: new_stop = st.number_input("Stop Loss", value=float(trade['stop_loss'])) new_strategy = st.text_input("Strategy", value=trade['strategy']) new_order_type = st.selectbox("Order Type", ["Market", "Limit"], index=0 if trade['order_type'] == "Market" else 1, key="update_trade_order_type") # Add date and time fields entry_date = st.date_input( "Entry Date", value=trade['entry_date'].date(), key="update_entry_date" ) entry_time = st.time_input( "Entry Time", value=trade['entry_date'].time(), key="update_entry_time" ) new_notes = st.text_area("Notes", value=trade['notes'] if trade['notes'] else "") if st.button("Update Trade"): try: # Combine date and time into datetime entry_datetime = datetime.combine(entry_date, entry_time) entry_datetime = pytz.timezone('US/Pacific').localize(entry_datetime) updates = { 'entry_date': entry_datetime, 'shares': new_shares, 'entry_price': new_entry, 'target_price': new_target, 'stop_loss': new_stop, 'strategy': new_strategy, 'order_type': new_order_type, 'notes': new_notes } update_trade(trade_id, updates) st.success("Trade updated successfully!") st.experimental_rerun() except Exception as e: st.error(f"Error updating trade: {str(e)}") else: st.info("No open trades to update") with tab4: st.subheader("Trade History") history = get_trade_history() if history: # Add P/L chart fig = plot_trade_history(history) if fig: st.plotly_chart(fig, use_container_width=True) for trade in history: with st.expander(f"{trade['ticker']} - {format_datetime(trade['entry_date'])}"): profit_loss = (trade['exit_price'] - trade['entry_price']) * trade['shares'] if trade['exit_price'] else None col1, col2 = st.columns(2) with col1: st.metric("Entry Price", f"${trade['entry_price']:.2f}") st.metric("Shares", trade['shares']) if profit_loss: st.metric("P/L", f"${profit_loss:.2f}") with col2: if trade['exit_price']: st.metric("Exit Price", f"${trade['exit_price']:.2f}") st.metric("Exit Date", format_datetime(trade['exit_date'])) st.text(f"Strategy: {trade['strategy']}") if trade['notes']: st.text(f"Notes: {trade['notes']}") else: st.info("No trade history found") def technical_scanner_page(): st.header("Technical Scanner") # Create tabs for scanner and reports scanner_tab, reports_tab = st.tabs(["Run Scanner", "View Reports"]) with scanner_tab: scanner_type = st.selectbox( "Select Scanner", ["SunnyBands", "ATR-EMA", "ATR-EMA v2"], key="tech_scanner_type" ) # Add interval selection interval = st.selectbox( "Select Time Interval", ["Daily", "5 minute", "15 minute", "30 minute", "1 hour"], key="interval_select" ) # Convert interval to format expected by scanner interval_map = { "Daily": "1d", "5 minute": "5m", "15 minute": "15m", "30 minute": "30m", "1 hour": "1h" } selected_interval = interval_map[interval] # Date range selection date_col1, date_col2 = st.columns(2) with date_col1: start_date = st.date_input("Start Date") with date_col2: end_date = st.date_input("End Date") col1, col2 = st.columns(2) with col1: min_price = st.number_input("Minimum Price", value=5.0, step=0.1) max_price = st.number_input("Maximum Price", value=100.0, step=0.1) with col2: min_volume = st.number_input("Minimum Volume", value=500000, step=100000) portfolio_size = st.number_input("Portfolio Size", value=100000.0, step=1000.0) if st.button("Run Scanner"): with st.spinner("Running scanner..."): try: signals = run_technical_scanner( scanner_choice=scanner_type.lower().replace(" ", "_"), start_date=start_date.strftime("%Y-%m-%d"), end_date=end_date.strftime("%Y-%m-%d"), min_price=min_price, max_price=max_price, min_volume=min_volume, portfolio_size=portfolio_size, interval=selected_interval ) if signals: st.success(f"Found {len(signals)} signals") for signal in signals: with st.expander(f"{signal['ticker']} - ${signal['entry_price']:.2f}"): col1, col2 = st.columns(2) with col1: st.metric("Entry Price", f"${signal['entry_price']:.2f}") st.metric("Target", f"${signal['target_price']:.2f}") st.metric("Stop Loss", f"${signal['stop_loss']:.2f}") with col2: st.metric("Shares", signal['shares']) st.metric("Position Size", f"${signal['position_size']:.2f}") st.metric("Risk Amount", f"${abs(signal['risk_amount']):.2f}") else: st.info("No signals found") except Exception as e: st.error(f"Error running scanner: {str(e)}") with reports_tab: st.subheader("Scanner Reports") reports = load_scanner_reports() if reports: # Create a selectbox to choose the report selected_report = st.selectbox( "Select Report", options=reports, format_func=lambda x: f"{x['name']} ({x['created'].strftime('%Y-%m-%d %H:%M')})", key="tech_scanner_report" ) if selected_report: try: # Load and display the CSV df = pd.read_csv(selected_report['path']) # Add download button st.download_button( label="Download Report", data=df.to_csv(index=False), file_name=selected_report['name'], mime='text/csv' ) # Display the dataframe st.dataframe(df) except Exception as e: st.error(f"Error loading report: {str(e)}") else: st.info("No scanner reports found") def canslim_screener_page(): st.header("CANSLIM Screener") # Create tabs for scanner and reports scanner_tab, reports_tab = st.tabs(["Run Scanner", "View Reports"]) with scanner_tab: # Date range selection col1, col2 = st.columns(2) with col1: start_date = st.date_input("Start Date") with col2: end_date = st.date_input("End Date") # CANSLIM criteria selection st.subheader("Select Screening Criteria") c_criteria = st.expander("Current Quarterly Earnings (C)") with c_criteria: eps_threshold = st.slider("EPS Growth Threshold (%)", 0, 100, 25) sales_threshold = st.slider("Sales Growth Threshold (%)", 0, 100, 25) roe_threshold = st.slider("ROE Threshold (%)", 0, 50, 17) a_criteria = st.expander("Annual Earnings Growth (A)") with a_criteria: annual_eps_threshold = st.slider("Annual EPS Growth Threshold (%)", 0, 100, 25) l_criteria = st.expander("Industry Leadership (L)") with l_criteria: use_l = st.checkbox("Check Industry Leadership", value=True) l_threshold = st.slider("Industry Leadership Score Threshold", 0.0, 1.0, 0.7) i_criteria = st.expander("Institutional Sponsorship (I)") with i_criteria: use_i = st.checkbox("Check Institutional Sponsorship", value=True) i_threshold = st.slider("Institutional Sponsorship Score Threshold", 0.0, 1.0, 0.7) if st.button("Run CANSLIM Screener"): with st.spinner("Running CANSLIM screener..."): try: # Prepare selected screeners dictionary selected_screeners = { "C": { "EPS_Score": eps_threshold / 100, "Sales_Score": sales_threshold / 100, "ROE_Score": roe_threshold / 100 }, "A": { "Annual_EPS_Score": annual_eps_threshold / 100 } } if use_l: selected_screeners["L"] = {"L_Score": l_threshold} if use_i: selected_screeners["I"] = {"I_Score": i_threshold} # Convert dates to strings for the screener start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") # Run the screener st.session_state.screener_params = { "start_date": start_str, "end_date": end_str, "selected_screeners": selected_screeners } # Modify run_canslim_screener to accept parameters run_canslim_screener() st.success("Screening complete! Check the Reports tab for results.") except Exception as e: st.error(f"Error running CANSLIM screener: {str(e)}") with reports_tab: st.subheader("CANSLIM Reports") # Use the same report loading function but look in a CANSLIM-specific directory reports = load_scanner_reports() # You might want to modify this to look in a CANSLIM directory if reports: # Create a selectbox to choose the report selected_report = st.selectbox( "Select Report", options=reports, format_func=lambda x: f"{x['name']} ({x['created'].strftime('%Y-%m-%d %H:%M')})", key="canslim_scanner_report" ) if selected_report: try: # Load and display the CSV df = pd.read_csv(selected_report['path']) # Add download button st.download_button( label="Download Report", data=df.to_csv(index=False), file_name=selected_report['name'], mime='text/csv' ) # Display the dataframe st.dataframe(df) except Exception as e: st.error(f"Error loading report: {str(e)}") else: st.info("No CANSLIM reports found") def main(): st.set_page_config(page_title="Trading System", layout="wide") init_session_state() # Sidebar navigation st.sidebar.title("Navigation") st.session_state.page = st.sidebar.radio( "Go to", ["Trading Journal", "Technical Scanner", "CANSLIM Screener"] ) # Create necessary tables create_trades_table() # Display selected page if st.session_state.page == "Trading Journal": trading_journal_page() elif st.session_state.page == "Technical Scanner": technical_scanner_page() elif st.session_state.page == "CANSLIM Screener": canslim_screener_page() if __name__ == "__main__": main()