C in CANSLIM
This commit is contained in:
parent
ec31b8701d
commit
cb098278c9
53
src/main.py
53
src/main.py
@ -0,0 +1,53 @@
|
|||||||
|
import datetime
|
||||||
|
from screener.data_fetcher import validate_date_range, fetch_financial_data, get_stocks_in_time_range
|
||||||
|
from screener.c_canslim import check_quarterly_earnings, check_sales_growth, check_return_on_equity
|
||||||
|
from screener.csv_appender import append_scores_to_csv
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# 1️⃣ Ask user for start and end date
|
||||||
|
user_start_date = input("Enter start date (YYYY-MM-DD): ")
|
||||||
|
user_end_date = input("Enter end date (YYYY-MM-DD): ")
|
||||||
|
|
||||||
|
# 2️⃣ Validate and adjust date range if needed
|
||||||
|
start_date, end_date = validate_date_range(user_start_date, user_end_date, required_quarters=4)
|
||||||
|
|
||||||
|
# 3️⃣ Get all stock symbols in the given date range dynamically
|
||||||
|
symbol_list = get_stocks_in_time_range(start_date, end_date)
|
||||||
|
|
||||||
|
if not symbol_list:
|
||||||
|
print("No stocks found within the given date range.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Processing {len(symbol_list)} stocks within the given date range...")
|
||||||
|
|
||||||
|
# 4️⃣ Process each stock symbol
|
||||||
|
for symbol in symbol_list:
|
||||||
|
data = fetch_financial_data(symbol, start_date, end_date)
|
||||||
|
|
||||||
|
if not data:
|
||||||
|
print(f"Warning: No data returned for {symbol}. Assigning default score.")
|
||||||
|
scores = {
|
||||||
|
"EPS_Score": 0.25, # EPS Growth
|
||||||
|
"Sales_Score": 0.25, # Sales Growth
|
||||||
|
"ROE_Score": 0.25 # Return on Equity
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Extract relevant fields
|
||||||
|
quarterly_eps = data.get("eps", [])
|
||||||
|
sales_growth = data.get("sales_growth", [])
|
||||||
|
roe = 18 # Placeholder, modify if needed
|
||||||
|
|
||||||
|
# 5️⃣ Compute CANSLIM Scores
|
||||||
|
scores = {
|
||||||
|
"EPS_Score": check_quarterly_earnings(quarterly_eps),
|
||||||
|
"Sales_Score": check_sales_growth(sales_growth),
|
||||||
|
"ROE_Score": check_return_on_equity(roe)
|
||||||
|
}
|
||||||
|
|
||||||
|
# 6️⃣ Append results to a **generic** CSV file in `data/metrics/`
|
||||||
|
append_scores_to_csv(symbol, scores)
|
||||||
|
|
||||||
|
print("Scores saved in data/metrics/stock_scores.csv")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
75
src/screener/c_canslim.py
Normal file
75
src/screener/c_canslim.py
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
def check_quarterly_earnings(quarterly_eps):
|
||||||
|
"""
|
||||||
|
Checks 25%+ growth in the most recent quarter and, if possible,
|
||||||
|
accelerating growth quarter over quarter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Score (1 pass, 0 fail, 0.25 insufficient data).
|
||||||
|
"""
|
||||||
|
if not quarterly_eps or len(quarterly_eps) < 2:
|
||||||
|
return 0.25 # not enough data
|
||||||
|
|
||||||
|
old_eps = quarterly_eps[-2]
|
||||||
|
new_eps = quarterly_eps[-1]
|
||||||
|
|
||||||
|
if old_eps <= 0:
|
||||||
|
return 0.25 # can't compute growth properly
|
||||||
|
|
||||||
|
growth = (new_eps - old_eps) / abs(old_eps) * 100
|
||||||
|
if growth < 25:
|
||||||
|
return 0 # fails the 25% growth condition
|
||||||
|
|
||||||
|
# Check acceleration if we have at least 3 quarters
|
||||||
|
if len(quarterly_eps) >= 3:
|
||||||
|
older_eps = quarterly_eps[-3]
|
||||||
|
if older_eps <= 0:
|
||||||
|
# can't compare properly, but the 25% is met
|
||||||
|
return 1
|
||||||
|
prev_growth = (old_eps - older_eps) / abs(older_eps) * 100
|
||||||
|
if growth >= prev_growth:
|
||||||
|
# accelerating
|
||||||
|
return 1
|
||||||
|
else:
|
||||||
|
# not accelerating but still >=25% for last quarter
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return 1 # only 2 quarters, meets 25% condition
|
||||||
|
|
||||||
|
def check_sales_growth(quarterly_sales):
|
||||||
|
"""
|
||||||
|
Checks for 20%-25% or higher sales growth in the most recent quarter.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: 1 (Pass), 0 (Fail), 0.25 (Insufficient data).
|
||||||
|
"""
|
||||||
|
if not quarterly_sales or len(quarterly_sales) < 2:
|
||||||
|
return 0.25 # Not enough data
|
||||||
|
|
||||||
|
old_sales = quarterly_sales[-2]
|
||||||
|
new_sales = quarterly_sales[-1]
|
||||||
|
|
||||||
|
# Handle None values before making calculations
|
||||||
|
if old_sales is None or new_sales is None:
|
||||||
|
return 0.25
|
||||||
|
|
||||||
|
if old_sales <= 0:
|
||||||
|
return 0.25 # Can't compute growth properly
|
||||||
|
|
||||||
|
growth = (new_sales - old_sales) / abs(old_sales) * 100
|
||||||
|
|
||||||
|
return 1 if growth >= 20 else 0 # Pass if growth is 20%+
|
||||||
|
|
||||||
|
|
||||||
|
def check_return_on_equity(roe):
|
||||||
|
"""
|
||||||
|
Checks for >=17% ROE.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Score (1 pass, 0 fail, 0.25 insufficient data).
|
||||||
|
"""
|
||||||
|
if roe is None:
|
||||||
|
return 0.25
|
||||||
|
|
||||||
|
if roe >= 17:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
36
src/screener/csv_appender.py
Normal file
36
src/screener/csv_appender.py
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
import csv
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Get the absolute path to the `data/metrics/` directory (outside `src/`)
|
||||||
|
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) # Go two levels up
|
||||||
|
METRICS_DIR = os.path.join(BASE_DIR, "data", "metrics")
|
||||||
|
CSV_FILE = os.path.join(METRICS_DIR, "stock_scores.csv")
|
||||||
|
|
||||||
|
def append_scores_to_csv(symbol, scores):
|
||||||
|
"""
|
||||||
|
Append stock analysis scores to a generic CSV file in the `data/metrics` directory.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol (str): Stock ticker symbol.
|
||||||
|
scores (dict): Dictionary of metric scores (e.g., C_Score, Sales_Score, etc.).
|
||||||
|
"""
|
||||||
|
# Ensure the directory exists
|
||||||
|
os.makedirs(METRICS_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
# Define the header dynamically based on the provided scores
|
||||||
|
headers = ["Symbol"] + list(scores.keys())
|
||||||
|
|
||||||
|
# Check if file exists
|
||||||
|
file_exists = os.path.isfile(CSV_FILE)
|
||||||
|
|
||||||
|
with open(CSV_FILE, 'a', newline='') as csvfile:
|
||||||
|
writer = csv.DictWriter(csvfile, fieldnames=headers)
|
||||||
|
|
||||||
|
# Write the header if the file is new
|
||||||
|
if not file_exists:
|
||||||
|
writer.writeheader()
|
||||||
|
|
||||||
|
# Write stock data
|
||||||
|
row = {"Symbol": symbol}
|
||||||
|
row.update(scores)
|
||||||
|
writer.writerow(row)
|
||||||
126
src/screener/data_fetcher.py
Normal file
126
src/screener/data_fetcher.py
Normal file
@ -0,0 +1,126 @@
|
|||||||
|
import datetime
|
||||||
|
from db.db_connection import create_client
|
||||||
|
|
||||||
|
def validate_date_range(start_date, end_date, required_quarters=4):
|
||||||
|
"""
|
||||||
|
Ensures we have enough data (e.g., required_quarters) to evaluate
|
||||||
|
the 'C' in CANSLIM. If the date range is too short, adjust or return a warning.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start_date (str or datetime): User-provided start date.
|
||||||
|
end_date (str or datetime): User-provided end date.
|
||||||
|
required_quarters (int): Number of quarters needed (default: 4).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(datetime, datetime): Tuple of adjusted (start_date, end_date).
|
||||||
|
"""
|
||||||
|
if isinstance(start_date, str):
|
||||||
|
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d")
|
||||||
|
if isinstance(end_date, str):
|
||||||
|
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d")
|
||||||
|
|
||||||
|
# Approximate needed delta for required_quarters
|
||||||
|
needed_days = required_quarters * 91 # ~3 months each
|
||||||
|
needed_delta = datetime.timedelta(days=needed_days)
|
||||||
|
|
||||||
|
if (end_date - start_date) < needed_delta:
|
||||||
|
start_date = end_date - needed_delta
|
||||||
|
print("Warning: Provided date range was too short. Adjusted start_date for enough data.")
|
||||||
|
|
||||||
|
return start_date, end_date
|
||||||
|
|
||||||
|
def fetch_financial_data(symbol, start_date, end_date):
|
||||||
|
"""
|
||||||
|
Fetch financial data (EPS, Sales Growth) from stock_financials table
|
||||||
|
for a given stock symbol within a specific date range.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol (str): Stock ticker symbol.
|
||||||
|
start_date (str or datetime): Start date for data retrieval.
|
||||||
|
end_date (str or datetime): End date for data retrieval.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: A dictionary containing calculated EPS and sales growth.
|
||||||
|
"""
|
||||||
|
client = create_client()
|
||||||
|
|
||||||
|
# Query stock_financials for revenue, net income, and EPS
|
||||||
|
query = f"""
|
||||||
|
SELECT
|
||||||
|
filing_date,
|
||||||
|
revenue,
|
||||||
|
net_income,
|
||||||
|
diluted_eps -- Using diluted EPS for accuracy
|
||||||
|
FROM stock_db.stock_financials
|
||||||
|
WHERE ticker = '{symbol}'
|
||||||
|
AND filing_date BETWEEN toDate('{start_date}') AND toDate('{end_date}')
|
||||||
|
AND timeframe = 'quarterly' -- Ensure only quarterly reports are used
|
||||||
|
ORDER BY filing_date ASC
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = client.query(query)
|
||||||
|
|
||||||
|
if not result.result_rows:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Extracting data
|
||||||
|
dates = []
|
||||||
|
revenues = []
|
||||||
|
net_incomes = []
|
||||||
|
eps_values = []
|
||||||
|
sales_growth = []
|
||||||
|
|
||||||
|
for row in result.result_rows:
|
||||||
|
dates.append(row[0])
|
||||||
|
revenues.append(row[1])
|
||||||
|
net_incomes.append(row[2])
|
||||||
|
eps_values.append(row[3]) # Directly using diluted EPS
|
||||||
|
|
||||||
|
# Calculate Sales Growth (Quarter-over-Quarter)
|
||||||
|
for i in range(1, len(revenues)): # Start from index 1 since we compare with previous
|
||||||
|
prev_revenue = revenues[i - 1]
|
||||||
|
current_revenue = revenues[i]
|
||||||
|
if prev_revenue > 0: # Avoid division by zero
|
||||||
|
growth = ((current_revenue - prev_revenue) / prev_revenue) * 100
|
||||||
|
else:
|
||||||
|
growth = None # Not enough data
|
||||||
|
sales_growth.append(growth)
|
||||||
|
|
||||||
|
# Pad sales_growth list to match length of EPS (since first quarter lacks a previous value)
|
||||||
|
sales_growth.insert(0, None) # First quarter doesn't have a previous comparison
|
||||||
|
|
||||||
|
return {
|
||||||
|
"symbol": symbol,
|
||||||
|
"dates": dates,
|
||||||
|
"eps": eps_values,
|
||||||
|
"sales_growth": sales_growth
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_stocks_in_time_range(start_date, end_date):
|
||||||
|
"""
|
||||||
|
Query ClickHouse for all stock symbols that have data within the given date range.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start_date (str or datetime): Start date in 'YYYY-MM-DD' format.
|
||||||
|
end_date (str or datetime): End date in 'YYYY-MM-DD' format.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of stock symbols that have data in the specified date range.
|
||||||
|
"""
|
||||||
|
if isinstance(start_date, datetime.datetime):
|
||||||
|
start_date = start_date.strftime("%Y-%m-%d")
|
||||||
|
if isinstance(end_date, datetime.datetime):
|
||||||
|
end_date = end_date.strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
client = create_client()
|
||||||
|
|
||||||
|
query = f"""
|
||||||
|
SELECT DISTINCT ticker
|
||||||
|
FROM stock_db.stock_prices_daily
|
||||||
|
WHERE date BETWEEN toDate('{start_date}') AND toDate('{end_date}')
|
||||||
|
"""
|
||||||
|
|
||||||
|
result = client.query(query)
|
||||||
|
|
||||||
|
return [row[0] for row in result.result_rows]
|
||||||
Loading…
Reference in New Issue
Block a user