feat: Use Polygon.io API for fetching current stock prices

This commit is contained in:
Bobby (aider) 2025-02-19 20:49:37 -08:00
parent 744d717938
commit 544bc146cb

View File

@ -1,10 +1,10 @@
import os
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from trading.position_calculator import PositionCalculator
from utils.common_utils import get_user_input, get_stock_data, get_qualified_stocks
from typing import Optional
import requests
def get_float_input(prompt: str) -> Optional[float]:
return get_user_input(prompt, float)
@ -13,79 +13,56 @@ def get_current_prices(tickers: list) -> dict:
"""Get current prices for multiple tickers using yfinance"""
if not tickers:
return {}
polygon_api_key = os.environ.get("POLYGON_API_KEY")
if not polygon_api_key:
print("Error: POLYGON_API_KEY environment variable not set.")
return {}
prices = {}
try:
# Create a space-separated string of tickers
ticker_str = " ".join(tickers)
# Get data for all tickers at once with more reliable settings
data = yf.download(
ticker_str,
period="1d", # Just today's data
interval="1d", # Daily interval
group_by='ticker',
progress=False,
prepost=True # Include pre/post market prices
)
# Handle single ticker case
if len(tickers) == 1:
if isinstance(data, pd.DataFrame) and 'Close' in data.columns:
prices[tickers[0]] = float(data['Close'].iloc[-1])
else:
# Try alternative method
for ticker in tickers:
url = f"https://api.polygon.io/v2/last/trade/{ticker}?apiKey={polygon_api_key}"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
try:
stock = yf.Ticker(tickers[0])
price = stock.info.get('regularMarketPrice', 0.0)
prices[tickers[0]] = float(price)
except:
prices[tickers[0]] = 0.0
else:
# Handle multiple tickers
for ticker in tickers:
try:
if isinstance(data, pd.DataFrame) and (ticker, 'Close') in data.columns:
prices[ticker] = float(data[ticker]['Close'].iloc[-1])
else:
# Try alternative method
stock = yf.Ticker(ticker)
price = stock.info.get('regularMarketPrice', 0.0)
prices[ticker] = float(price)
except:
prices[ticker] = data['results']['price']
except KeyError:
prices[ticker] = 0.0
# Verify we have prices for all tickers
else:
print(f"Error fetching price for {ticker}: {response.status_code} - {response.text}")
prices[ticker] = 0.0
# Verify that we have prices for all tickers
for ticker in tickers:
if ticker not in prices or prices[ticker] == 0:
try:
stock = yf.Ticker(ticker)
price = stock.info.get('regularMarketPrice', 0.0)
if price:
prices[ticker] = float(price)
except:
if ticker not in prices:
prices[ticker] = 0.0
print(f"Could not fetch price for {ticker}")
prices[ticker] = 0.0
except Exception as e:
print(f"Error in batch price fetch: {e}")
# Fall back to individual ticker fetching
print(f"Error fetching prices: {e}")
# If there's an error, fall back to setting price to 0.0
for ticker in tickers:
if ticker not in prices:
prices[ticker] = 0.0
try:
stock = yf.Ticker(ticker)
price = stock.info.get('regularMarketPrice', 0.0)
prices[ticker] = float(price)
except:
prices[ticker] = 0.0
return prices
def validate_signal_date(signal_date: datetime) -> datetime:
"""
Validate and adjust signal date if needed
Args:
signal_date (datetime): Signal date to validate
Returns:
datetime: Valid signal date (not in future)
"""
@ -94,10 +71,10 @@ def validate_signal_date(signal_date: datetime) -> datetime:
return current_date
return signal_date
def print_signal(signal_data: dict, signal_type: str = "🔍") -> None:
def print_signal(signal_ dict, signal_type: str = "🔍") -> None:
"""
Print standardized signal output
Args:
signal_data (dict): Dictionary containing signal information
signal_type (str): Emoji indicator for signal type (default: 🔍)
@ -117,7 +94,7 @@ def print_signal(signal_data: dict, signal_type: str = "🔍") -> None:
def save_signals_to_csv(signals: list, scanner_name: str) -> None:
"""
Save signals to CSV file with standardized format and naming
Args:
signals (list): List of signal dictionaries
scanner_name (str): Name of the scanner for file naming
@ -125,14 +102,12 @@ def save_signals_to_csv(signals: list, scanner_name: str) -> None:
if not signals:
print("\nNo signals found")
return
output_dir = 'reports'
os.makedirs(output_dir, exist_ok=True)
output_date = datetime.now().strftime("%Y%m%d_%H%M")
output_file = f'{output_dir}/{scanner_name}_{output_date}.csv'
df_signals = pd.DataFrame(signals)
df_signals.to_csv(output_file, index=False)
print(f"\nSaved {len(signals)} signals to {output_file}")