From 9cb4f62a48b6717038fdb0879b8119cc2a1502d0 Mon Sep 17 00:00:00 2001 From: Bobby Abellana Date: Mon, 3 Feb 2025 22:40:26 -0800 Subject: [PATCH] Added screener type choice --- src/main.py | 44 ++++++++++++++++++++++-------------- src/screener/c_canslim.py | 27 ++++++++++++++-------- src/screener/csv_appender.py | 41 ++++++++++++++++++++++----------- src/screener/data_fetcher.py | 26 +++++++++++++++++---- src/screener/screeners.py | 17 ++++++++++++++ src/screener/user_input.py | 34 ++++++++++++++++++++++++++++ 6 files changed, 145 insertions(+), 44 deletions(-) create mode 100644 src/screener/screeners.py create mode 100644 src/screener/user_input.py diff --git a/src/main.py b/src/main.py index a0db315..7a60e27 100644 --- a/src/main.py +++ b/src/main.py @@ -1,8 +1,10 @@ import datetime from screener.data_fetcher import validate_date_range, fetch_financial_data, get_stocks_in_time_range -from screener.c_canslim import check_quarterly_earnings -from screener.a_canslim import check_annual_eps_growth # New module +from screener.c_canslim import check_quarterly_earnings, check_return_on_equity, check_sales_growth +from screener.a_canslim import check_annual_eps_growth from screener.csv_appender import append_scores_to_csv +from screener.screeners import SCREENERS # Import categories +from screener.user_input import get_user_screener_selection # Import function def main(): # 1️⃣ Ask user for start and end date @@ -12,7 +14,10 @@ def main(): # 2️⃣ Validate and adjust date range if needed start_date, end_date = validate_date_range(user_start_date, user_end_date, required_quarters=4) - # 3️⃣ Get all stock symbols in the given date range dynamically + # 3️⃣ Get selected screeners & customization preferences + selected_screeners = get_user_screener_selection() # ✅ Now imported from `user_input.py` + + # 4️⃣ Get all stock symbols dynamically symbol_list = get_stocks_in_time_range(start_date, end_date) if not symbol_list: @@ -21,28 +26,33 @@ def main(): print(f"Processing {len(symbol_list)} stocks within the given date range...") - # 4️⃣ Process each stock symbol + # 5️⃣ Process each stock symbol for symbol in symbol_list: data = fetch_financial_data(symbol, start_date, end_date) if not data: print(f"Warning: No data returned for {symbol}. Assigning default score.") - scores = { - "EPS_Score": 0.25, # Quarterly EPS Growth - "Annual_EPS_Score": 0.25 # Annual EPS Growth - } + scores = {screener: 0.25 for category in selected_screeners for screener in selected_screeners[category]} else: - # Extract relevant fields - quarterly_eps = data.get("quarterly_eps", []) - annual_eps = data.get("annual_eps", []) + scores = {} - # 5️⃣ Compute CANSLIM Scores - scores = { - "EPS_Score": check_quarterly_earnings(quarterly_eps), - "Annual_EPS_Score": check_annual_eps_growth(annual_eps) - } + # 6️⃣ Compute scores dynamically based on user selection + for category, screeners in selected_screeners.items(): + for screener, threshold in screeners.items(): + if screener == "EPS_Score": + scores[screener] = check_quarterly_earnings(data.get("quarterly_eps", [])) + elif screener == "Annual_EPS_Score": + scores[screener] = check_annual_eps_growth(data.get("annual_eps", [])) + elif screener == "Sales_Score": + scores[screener] = check_sales_growth(data.get("sales_growth", [])) + elif screener == "ROE_Score": + scores[screener] = check_return_on_equity(data.get("roe", [])) + + # Apply user-defined threshold if applicable + if isinstance(threshold, (int, float)): + scores[screener] = scores[screener] >= threshold - # 6️⃣ Append results to a **generic** CSV file in `data/metrics/` + # 7️⃣ Append results to CSV append_scores_to_csv(symbol, scores) print("Scores saved in data/metrics/stock_scores.csv") diff --git a/src/screener/c_canslim.py b/src/screener/c_canslim.py index 6057c09..3b6c79c 100644 --- a/src/screener/c_canslim.py +++ b/src/screener/c_canslim.py @@ -60,16 +60,23 @@ def check_sales_growth(quarterly_sales): return 1 if growth >= 20 else 0 # Pass if growth is 20%+ -def check_return_on_equity(roe): +def check_return_on_equity(roe_values): """ - Checks for >=17% ROE. - + Checks if the most recent Return on Equity (ROE) is at least 17%. + + Args: + roe_values (list): List of historical ROE values. + Returns: - float: Score (1 pass, 0 fail, 0.25 insufficient data). + float: 1 (Pass), 0 (Fail), 0.25 (Insufficient data). """ - if roe is None: - return 0.25 - - if roe >= 17: - return 1 - return 0 + if not roe_values or len(roe_values) == 0: + return 0.25 # Not enough data + + most_recent_roe = roe_values[-1] # Get the most recent ROE value + + if most_recent_roe is None: + return 0.25 # Missing data + + return 1 if most_recent_roe >= 17 else 0 + diff --git a/src/screener/csv_appender.py b/src/screener/csv_appender.py index 9d5e6f7..fbe7a88 100644 --- a/src/screener/csv_appender.py +++ b/src/screener/csv_appender.py @@ -6,37 +6,52 @@ BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) # METRICS_DIR = os.path.join(BASE_DIR, "data", "metrics") CSV_FILE = os.path.join(METRICS_DIR, "stock_scores.csv") +# Track all unique screeners used in any stock +ALL_HEADERS = set() + def append_scores_to_csv(symbol, scores): """ - Append stock analysis scores to a generic CSV file in the `data/metrics` directory. - + Append stock analysis scores to a generic CSV file in `data/metrics` directory. + + Ensures all rows have the same headers, adding new screeners dynamically. + Args: symbol (str): Stock ticker symbol. - scores (dict): Dictionary of metric scores (e.g., EPS_Score, Sales_Score, Annual_EPS_Score, etc.). + scores (dict): Dictionary of metric scores (e.g., EPS_Score, Sales_Score, etc.). """ # Ensure the directory exists os.makedirs(METRICS_DIR, exist_ok=True) - # Compute the total score (sum of all numeric values in scores) + # Compute Total Score total_score = sum(scores.values()) - - # Include `Total_Score` in the scores dictionary scores["Total_Score"] = total_score - # Define the header dynamically based on the provided scores - headers = ["Symbol"] + list(scores.keys()) + # Update tracked headers + ALL_HEADERS.update(scores.keys()) + + # Preferred order: "Symbol" first, "Total_Score" last + preferred_order = ["Symbol"] + remaining_headers = sorted([h for h in ALL_HEADERS if h not in preferred_order + ["Total_Score"]]) + sorted_headers = preferred_order + remaining_headers + ["Total_Score"] # Check if file exists file_exists = os.path.isfile(CSV_FILE) with open(CSV_FILE, 'a', newline='') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=headers) + writer = csv.DictWriter(csvfile, fieldnames=sorted_headers) - # Write the header if the file is new - if not file_exists: + # If it's a new file or headers changed, write a new header row + if not file_exists or set(sorted_headers) != set(writer.fieldnames): + csvfile.seek(0) # Move to start of the file + csvfile.truncate() # Clear previous file to rewrite headers writer.writeheader() - # Write stock data + # Fill missing values with 0.25 (default) row = {"Symbol": symbol} - row.update(scores) + for header in sorted_headers: + if header == "Symbol": + row[header] = symbol + else: + row[header] = scores.get(header, 0.25) # Use default if missing + writer.writerow(row) diff --git a/src/screener/data_fetcher.py b/src/screener/data_fetcher.py index 6bb021a..f1bb038 100644 --- a/src/screener/data_fetcher.py +++ b/src/screener/data_fetcher.py @@ -34,6 +34,8 @@ def fetch_financial_data(symbol, start_date, end_date): Fetch financial data for a given stock symbol, including: - Quarterly EPS for EPS Score - Annual EPS for Annual EPS Score + - Sales Growth + - Return on Equity (ROE) Args: symbol (str): Stock ticker symbol. @@ -41,7 +43,7 @@ def fetch_financial_data(symbol, start_date, end_date): end_date (str or datetime): End date for data retrieval. Returns: - dict: Contains EPS, sales growth, and annual EPS. + dict: Contains EPS, sales growth, annual EPS, and ROE. """ client = create_client() @@ -50,6 +52,8 @@ def fetch_financial_data(symbol, start_date, end_date): filing_date, diluted_eps, revenue, + net_income, + equity, timeframe FROM stock_db.stock_financials WHERE ticker = '{symbol}' @@ -67,18 +71,22 @@ def fetch_financial_data(symbol, start_date, end_date): annual_eps = [] revenues = [] sales_growth = [] + net_incomes = [] + equities = [] for row in result.result_rows: - filing_date, eps, revenue, timeframe = row + filing_date, eps, revenue, net_income, equity, timeframe = row if timeframe == "quarterly": quarterly_eps.append(eps) revenues.append(revenue) + net_incomes.append(net_income) + equities.append(equity) elif timeframe == "annual": annual_eps.append(eps) # Calculate Sales Growth (Quarter-over-Quarter) - for i in range(1, len(revenues)): # Start from index 1 since we compare with previous + for i in range(1, len(revenues)): prev_revenue = revenues[i - 1] current_revenue = revenues[i] if prev_revenue > 0: @@ -89,11 +97,21 @@ def fetch_financial_data(symbol, start_date, end_date): sales_growth.insert(0, None) # First quarter lacks comparison + # Calculate ROE + roe_values = [] + for i in range(len(net_incomes)): + if equities[i] > 0: + roe = (net_incomes[i] / equities[i]) * 100 + else: + roe = None + roe_values.append(roe) + return { "symbol": symbol, "quarterly_eps": quarterly_eps, # Used for EPS_Score "annual_eps": annual_eps, # Used for Annual_EPS_Score - "sales_growth": sales_growth + "sales_growth": sales_growth, + "roe": roe_values # Return on Equity } diff --git a/src/screener/screeners.py b/src/screener/screeners.py new file mode 100644 index 0000000..2a6c4d6 --- /dev/null +++ b/src/screener/screeners.py @@ -0,0 +1,17 @@ +# Define categories and associated metrics +SCREENERS = { + "Fundamentals": { + "EPS_Score": "Checks quarterly EPS growth", + "Annual_EPS_Score": "Checks 3-year annual EPS growth", + "Sales_Score": "Checks quarterly sales growth", + "ROE_Score": "Checks return on equity" + }, + "Volume-Based": { + "Volume_Oscillator_Score": "Checks for unusual volume surges", + "Relative_Volume_Score": "Compares current volume to past volume" + }, + "Technical": { + "SMA_Cross_Score": "Checks if short-term SMA crosses above long-term SMA", + "RSI_Score": "Evaluates RSI to identify overbought/oversold conditions" + } +} diff --git a/src/screener/user_input.py b/src/screener/user_input.py new file mode 100644 index 0000000..8057f3d --- /dev/null +++ b/src/screener/user_input.py @@ -0,0 +1,34 @@ +from screener.screeners import SCREENERS # Import SCREENERS dictionary + +def get_user_screener_selection(): + """ + Ask the user which screeners they want to run and whether to use defaults. + Returns a dictionary of selected screeners with default/customization choices. + """ + selected_screeners = {} + + print("\nAvailable Screener Categories:") + for category in SCREENERS: + print(f"- {category}") + + selected_categories = input("\nEnter the categories you want to screen (comma-separated), or type 'all' for all: ").strip().lower() + + if selected_categories == "all": + selected_categories = SCREENERS.keys() + else: + selected_categories = [c.strip().title() for c in selected_categories.split(",") if c.strip().title() in SCREENERS] + + for category in selected_categories: + print(f"\nCategory: {category}") + use_defaults = input(f"Use default settings for {category}? (y/n): ").strip().lower() + + selected_screeners[category] = {} + + for screener, description in SCREENERS[category].items(): + if use_defaults == "y": + selected_screeners[category][screener] = "default" + else: + custom_value = input(f"{screener} ({description}) - Enter custom threshold or press Enter to use default: ").strip() + selected_screeners[category][screener] = float(custom_value) if custom_value else "default" + + return selected_screeners