Added screener type choice

This commit is contained in:
Bobby Abellana 2025-02-03 22:40:26 -08:00
parent e1d59411fa
commit 9cb4f62a48
No known key found for this signature in database
GPG Key ID: 647714CC45F3647B
6 changed files with 145 additions and 44 deletions

View File

@ -1,8 +1,10 @@
import datetime
from screener.data_fetcher import validate_date_range, fetch_financial_data, get_stocks_in_time_range
from screener.c_canslim import check_quarterly_earnings
from screener.a_canslim import check_annual_eps_growth # New module
from screener.c_canslim import check_quarterly_earnings, check_return_on_equity, check_sales_growth
from screener.a_canslim import check_annual_eps_growth
from screener.csv_appender import append_scores_to_csv
from screener.screeners import SCREENERS # Import categories
from screener.user_input import get_user_screener_selection # Import function
def main():
# 1⃣ Ask user for start and end date
@ -12,7 +14,10 @@ def main():
# 2⃣ Validate and adjust date range if needed
start_date, end_date = validate_date_range(user_start_date, user_end_date, required_quarters=4)
# 3⃣ Get all stock symbols in the given date range dynamically
# 3⃣ Get selected screeners & customization preferences
selected_screeners = get_user_screener_selection() # ✅ Now imported from `user_input.py`
# 4⃣ Get all stock symbols dynamically
symbol_list = get_stocks_in_time_range(start_date, end_date)
if not symbol_list:
@ -21,28 +26,33 @@ def main():
print(f"Processing {len(symbol_list)} stocks within the given date range...")
# 4️⃣ Process each stock symbol
# 5️⃣ Process each stock symbol
for symbol in symbol_list:
data = fetch_financial_data(symbol, start_date, end_date)
if not data:
print(f"Warning: No data returned for {symbol}. Assigning default score.")
scores = {
"EPS_Score": 0.25, # Quarterly EPS Growth
"Annual_EPS_Score": 0.25 # Annual EPS Growth
}
scores = {screener: 0.25 for category in selected_screeners for screener in selected_screeners[category]}
else:
# Extract relevant fields
quarterly_eps = data.get("quarterly_eps", [])
annual_eps = data.get("annual_eps", [])
scores = {}
# 5⃣ Compute CANSLIM Scores
scores = {
"EPS_Score": check_quarterly_earnings(quarterly_eps),
"Annual_EPS_Score": check_annual_eps_growth(annual_eps)
}
# 6⃣ Compute scores dynamically based on user selection
for category, screeners in selected_screeners.items():
for screener, threshold in screeners.items():
if screener == "EPS_Score":
scores[screener] = check_quarterly_earnings(data.get("quarterly_eps", []))
elif screener == "Annual_EPS_Score":
scores[screener] = check_annual_eps_growth(data.get("annual_eps", []))
elif screener == "Sales_Score":
scores[screener] = check_sales_growth(data.get("sales_growth", []))
elif screener == "ROE_Score":
scores[screener] = check_return_on_equity(data.get("roe", []))
# Apply user-defined threshold if applicable
if isinstance(threshold, (int, float)):
scores[screener] = scores[screener] >= threshold
# 6⃣ Append results to a **generic** CSV file in `data/metrics/`
# 7⃣ Append results to CSV
append_scores_to_csv(symbol, scores)
print("Scores saved in data/metrics/stock_scores.csv")

View File

@ -60,16 +60,23 @@ def check_sales_growth(quarterly_sales):
return 1 if growth >= 20 else 0 # Pass if growth is 20%+
def check_return_on_equity(roe):
def check_return_on_equity(roe_values):
"""
Checks for >=17% ROE.
Checks if the most recent Return on Equity (ROE) is at least 17%.
Args:
roe_values (list): List of historical ROE values.
Returns:
float: Score (1 pass, 0 fail, 0.25 insufficient data).
float: 1 (Pass), 0 (Fail), 0.25 (Insufficient data).
"""
if roe is None:
return 0.25
if roe >= 17:
return 1
return 0
if not roe_values or len(roe_values) == 0:
return 0.25 # Not enough data
most_recent_roe = roe_values[-1] # Get the most recent ROE value
if most_recent_roe is None:
return 0.25 # Missing data
return 1 if most_recent_roe >= 17 else 0

View File

@ -6,37 +6,52 @@ BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) #
METRICS_DIR = os.path.join(BASE_DIR, "data", "metrics")
CSV_FILE = os.path.join(METRICS_DIR, "stock_scores.csv")
# Track all unique screeners used in any stock
ALL_HEADERS = set()
def append_scores_to_csv(symbol, scores):
"""
Append stock analysis scores to a generic CSV file in the `data/metrics` directory.
Append stock analysis scores to a generic CSV file in `data/metrics` directory.
Ensures all rows have the same headers, adding new screeners dynamically.
Args:
symbol (str): Stock ticker symbol.
scores (dict): Dictionary of metric scores (e.g., EPS_Score, Sales_Score, Annual_EPS_Score, etc.).
scores (dict): Dictionary of metric scores (e.g., EPS_Score, Sales_Score, etc.).
"""
# Ensure the directory exists
os.makedirs(METRICS_DIR, exist_ok=True)
# Compute the total score (sum of all numeric values in scores)
# Compute Total Score
total_score = sum(scores.values())
# Include `Total_Score` in the scores dictionary
scores["Total_Score"] = total_score
# Define the header dynamically based on the provided scores
headers = ["Symbol"] + list(scores.keys())
# Update tracked headers
ALL_HEADERS.update(scores.keys())
# Preferred order: "Symbol" first, "Total_Score" last
preferred_order = ["Symbol"]
remaining_headers = sorted([h for h in ALL_HEADERS if h not in preferred_order + ["Total_Score"]])
sorted_headers = preferred_order + remaining_headers + ["Total_Score"]
# Check if file exists
file_exists = os.path.isfile(CSV_FILE)
with open(CSV_FILE, 'a', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer = csv.DictWriter(csvfile, fieldnames=sorted_headers)
# Write the header if the file is new
if not file_exists:
# If it's a new file or headers changed, write a new header row
if not file_exists or set(sorted_headers) != set(writer.fieldnames):
csvfile.seek(0) # Move to start of the file
csvfile.truncate() # Clear previous file to rewrite headers
writer.writeheader()
# Write stock data
# Fill missing values with 0.25 (default)
row = {"Symbol": symbol}
row.update(scores)
for header in sorted_headers:
if header == "Symbol":
row[header] = symbol
else:
row[header] = scores.get(header, 0.25) # Use default if missing
writer.writerow(row)

View File

@ -34,6 +34,8 @@ def fetch_financial_data(symbol, start_date, end_date):
Fetch financial data for a given stock symbol, including:
- Quarterly EPS for EPS Score
- Annual EPS for Annual EPS Score
- Sales Growth
- Return on Equity (ROE)
Args:
symbol (str): Stock ticker symbol.
@ -41,7 +43,7 @@ def fetch_financial_data(symbol, start_date, end_date):
end_date (str or datetime): End date for data retrieval.
Returns:
dict: Contains EPS, sales growth, and annual EPS.
dict: Contains EPS, sales growth, annual EPS, and ROE.
"""
client = create_client()
@ -50,6 +52,8 @@ def fetch_financial_data(symbol, start_date, end_date):
filing_date,
diluted_eps,
revenue,
net_income,
equity,
timeframe
FROM stock_db.stock_financials
WHERE ticker = '{symbol}'
@ -67,18 +71,22 @@ def fetch_financial_data(symbol, start_date, end_date):
annual_eps = []
revenues = []
sales_growth = []
net_incomes = []
equities = []
for row in result.result_rows:
filing_date, eps, revenue, timeframe = row
filing_date, eps, revenue, net_income, equity, timeframe = row
if timeframe == "quarterly":
quarterly_eps.append(eps)
revenues.append(revenue)
net_incomes.append(net_income)
equities.append(equity)
elif timeframe == "annual":
annual_eps.append(eps)
# Calculate Sales Growth (Quarter-over-Quarter)
for i in range(1, len(revenues)): # Start from index 1 since we compare with previous
for i in range(1, len(revenues)):
prev_revenue = revenues[i - 1]
current_revenue = revenues[i]
if prev_revenue > 0:
@ -89,11 +97,21 @@ def fetch_financial_data(symbol, start_date, end_date):
sales_growth.insert(0, None) # First quarter lacks comparison
# Calculate ROE
roe_values = []
for i in range(len(net_incomes)):
if equities[i] > 0:
roe = (net_incomes[i] / equities[i]) * 100
else:
roe = None
roe_values.append(roe)
return {
"symbol": symbol,
"quarterly_eps": quarterly_eps, # Used for EPS_Score
"annual_eps": annual_eps, # Used for Annual_EPS_Score
"sales_growth": sales_growth
"sales_growth": sales_growth,
"roe": roe_values # Return on Equity
}

17
src/screener/screeners.py Normal file
View File

@ -0,0 +1,17 @@
# Define categories and associated metrics
SCREENERS = {
"Fundamentals": {
"EPS_Score": "Checks quarterly EPS growth",
"Annual_EPS_Score": "Checks 3-year annual EPS growth",
"Sales_Score": "Checks quarterly sales growth",
"ROE_Score": "Checks return on equity"
},
"Volume-Based": {
"Volume_Oscillator_Score": "Checks for unusual volume surges",
"Relative_Volume_Score": "Compares current volume to past volume"
},
"Technical": {
"SMA_Cross_Score": "Checks if short-term SMA crosses above long-term SMA",
"RSI_Score": "Evaluates RSI to identify overbought/oversold conditions"
}
}

View File

@ -0,0 +1,34 @@
from screener.screeners import SCREENERS # Import SCREENERS dictionary
def get_user_screener_selection():
"""
Ask the user which screeners they want to run and whether to use defaults.
Returns a dictionary of selected screeners with default/customization choices.
"""
selected_screeners = {}
print("\nAvailable Screener Categories:")
for category in SCREENERS:
print(f"- {category}")
selected_categories = input("\nEnter the categories you want to screen (comma-separated), or type 'all' for all: ").strip().lower()
if selected_categories == "all":
selected_categories = SCREENERS.keys()
else:
selected_categories = [c.strip().title() for c in selected_categories.split(",") if c.strip().title() in SCREENERS]
for category in selected_categories:
print(f"\nCategory: {category}")
use_defaults = input(f"Use default settings for {category}? (y/n): ").strip().lower()
selected_screeners[category] = {}
for screener, description in SCREENERS[category].items():
if use_defaults == "y":
selected_screeners[category][screener] = "default"
else:
custom_value = input(f"{screener} ({description}) - Enter custom threshold or press Enter to use default: ").strip()
selected_screeners[category][screener] = float(custom_value) if custom_value else "default"
return selected_screeners