356 lines
15 KiB
Python
356 lines
15 KiB
Python
# data_manager.py
|
|
# -----------------------------------------------------------------------------
|
|
# Finder - Data Management Module
|
|
# -----------------------------------------------------------------------------
|
|
|
|
import pandas as pd
|
|
import yfinance as yf
|
|
from pykrx.stock import get_market_cap_by_ticker, get_market_ticker_name
|
|
import requests
|
|
from io import StringIO
|
|
import time
|
|
import os
|
|
import config # 설정 파일 임포트
|
|
import pandas_ta as ta
|
|
from datetime import datetime
|
|
import pickle
|
|
from typing import Dict, Optional
|
|
|
|
# 데이터 캐시 디렉토리 생성
|
|
if not os.path.exists(config.DATA_CACHE_DIR):
|
|
os.makedirs(config.DATA_CACHE_DIR)
|
|
|
|
def normalize_timezone(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""타임존을 제거하고 UTC 기준으로 정규화
|
|
|
|
Args:
|
|
df: 인덱스가 DatetimeIndex인 데이터프레임
|
|
|
|
Returns:
|
|
타임존이 제거된 데이터프레임
|
|
"""
|
|
if df.index.tz is not None:
|
|
df.index = df.index.tz_localize(None)
|
|
return df
|
|
|
|
def get_kr_tickers(start_rank: int, end_rank: int) -> Dict[str, str]:
|
|
"""
|
|
[v1.0 업그레이드] KOSPI/KOSDAQ 시총 상위 종목을 {티커: 이름} 딕셔너리로 반환
|
|
"""
|
|
print("한국 주식 시가총액 순위 로드 중...")
|
|
today_str = datetime.now().strftime('%Y%m%d')
|
|
cache_file = os.path.join(config.DATA_CACHE_DIR, f"kr_tickers_{start_rank}_{end_rank}_{today_str}.pkl")
|
|
# 캐시가 있으면 바로 로드하여 네트워크 호출을 피함
|
|
if config.LOAD_FROM_LOCAL and os.path.exists(cache_file):
|
|
try:
|
|
with open(cache_file, 'rb') as f:
|
|
tickers_dict = pickle.load(f)
|
|
print(f"한국 티커 캐시 로드: {cache_file}")
|
|
return tickers_dict
|
|
except Exception as e:
|
|
print(f"경고: 티커 캐시 로드 실패({cache_file}): {e}")
|
|
|
|
df_kospi = get_market_cap_by_ticker(date=today_str, market="KOSPI")
|
|
df_kosdaq = get_market_cap_by_ticker(date=today_str, market="KOSDAQ")
|
|
|
|
df_all = pd.concat([df_kospi, df_kosdaq]).sort_values(by="시가총액", ascending=False)
|
|
|
|
# 순위에 따라 슬라이싱
|
|
df_sliced = df_all.iloc[start_rank - 1 : end_rank]
|
|
|
|
# [수정] 리스트 대신 딕셔너리 생성
|
|
tickers_dict = {}
|
|
for ticker in df_sliced.index:
|
|
try:
|
|
# pykrx에서 티커 이름 조회
|
|
name = get_market_ticker_name(ticker)
|
|
tickers_dict[ticker] = name
|
|
except (KeyError, ValueError) as e:
|
|
# 알려진 에러: 조회 실패 시 (예: ETF, ETN 등)
|
|
tickers_dict[ticker] = f"Unknown_KR_{ticker}"
|
|
except Exception as e:
|
|
# 예상치 못한 에러는 로그 출력 후 재발생
|
|
print(f"예상치 못한 에러: {ticker} 이름 조회 - {type(e).__name__}: {e}")
|
|
raise
|
|
|
|
print(f"한국 주식 {len(tickers_dict)}개 종목 로드 완료.")
|
|
# 캐시에 저장
|
|
try:
|
|
with open(cache_file, 'wb') as f:
|
|
pickle.dump(tickers_dict, f)
|
|
except Exception as e:
|
|
print(f"경고: 티커 캐시 저장 실패({cache_file}): {e}")
|
|
return tickers_dict # <-- 딕셔너리 반환
|
|
|
|
def get_us_tickers(start_rank: int, end_rank: int) -> Dict[str, str]:
|
|
"""
|
|
[v1.0 업그레이드] S&P 500 종목을 {티커: 이름} 딕셔너리로 반환
|
|
(403 Forbidden 방지 및 StringIO 경고 수정 포함)
|
|
|
|
Args:
|
|
start_rank: 시작 순위 (1부터 시작)
|
|
end_rank: 종료 순위 (포함)
|
|
|
|
Returns:
|
|
{티커: 종목명} 딕셔너리
|
|
"""
|
|
print("미국 S&P 500 종목 리스트 로드 중 (Wikipedia)...")
|
|
try:
|
|
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
}
|
|
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
html_content = response.text
|
|
|
|
# [수정] FutureWarning 해결
|
|
tables = pd.read_html(StringIO(html_content))
|
|
|
|
# [v3.2 수정] 'Symbol'과 'Security' 컬럼이 있는 테이블을 찾아 안정성 확보
|
|
sp500_table = None
|
|
for table in tables:
|
|
if 'Symbol' in table.columns and 'Security' in table.columns:
|
|
sp500_table = table
|
|
break
|
|
|
|
if sp500_table is None:
|
|
raise ValueError("Could not find S&P 500 table with 'Symbol' and 'Security' columns in Wikipedia page.")
|
|
|
|
# 순위에 따라 슬라이싱
|
|
df_selected = sp500_table.iloc[start_rank - 1 : end_rank]
|
|
|
|
# [수정] yfinance 티커 형식(. -> -)으로 변환하고 딕셔너리 생성
|
|
df_selected['Symbol_yf'] = df_selected['Symbol'].str.replace('.', '-', regex=False)
|
|
stock_dict = dict(zip(df_selected['Symbol_yf'], df_selected['Security'])) # {티커: 이름}
|
|
|
|
print(f"미국 S&P 500 종목 {len(stock_dict)}개 로드 완료.")
|
|
return stock_dict # <-- 딕셔너리 반환
|
|
|
|
except Exception as e:
|
|
print(f"에러: 미국 S&P 500 리스트 스크래핑 실패 - {e}")
|
|
return {}
|
|
|
|
def get_stock_data(ticker: str, start_date: str, end_date: str) -> pd.DataFrame:
|
|
"""
|
|
[v2.4 업그레이드] 캐시 파일 범위 재사용 기능 추가
|
|
지정된 티커의 주가 데이터를 로드합니다. Pickle(.pkl)과 CSV(.csv)를 모두 지원합니다.
|
|
|
|
Args:
|
|
ticker: 종목 티커 (예: '005930', 'AAPL')
|
|
start_date: 시작 날짜 (YYYY-MM-DD)
|
|
end_date: 종료 날짜 (YYYY-MM-DD)
|
|
|
|
Returns:
|
|
주가 데이터 데이터프레임 (인덱스: Date)
|
|
LOAD_FROM_LOCAL = True일 때:
|
|
- .pkl 파일이 있으면 우선적으로 로드합니다 (속도 향상).
|
|
- 정확히 일치하지 않으면 요청 범위를 포함하는 더 넓은 .pkl 파일을 찾아 재활용합니다.
|
|
- .pkl 파일이 없고 .csv 파일만 있으면, .csv를 로드 후 .pkl로 변환 저장합니다.
|
|
LOAD_FROM_LOCAL = False일 때:
|
|
- API에서 새로 다운로드하고, 보조지표를 계산한 후,
|
|
- SAVE_TO_LOCAL=True이면 .pkl과 .csv 두 가지 포맷으로 모두 저장합니다.
|
|
"""
|
|
|
|
import glob
|
|
|
|
# 요청된 범위 변환
|
|
start_dt = pd.to_datetime(start_date)
|
|
end_dt = pd.to_datetime(end_date)
|
|
|
|
# --- 1. 티커 및 캐시 파일 경로 설정 ---
|
|
if len(ticker) == 6 and ticker.isdigit():
|
|
yf_ticker = f"{ticker}.KS"
|
|
else:
|
|
yf_ticker = ticker
|
|
|
|
base_filename = f"{yf_ticker}_{start_date}_{end_date}"
|
|
pkl_file = os.path.join(config.DATA_CACHE_DIR, f"{base_filename}.pkl")
|
|
csv_file = os.path.join(config.DATA_CACHE_DIR, f"{base_filename}.csv")
|
|
|
|
# --- 2. 로컬 데이터 로드 모드 (LOAD_FROM_LOCAL = True) ---
|
|
if config.LOAD_FROM_LOCAL:
|
|
# 2-1. 정확히 일치하는 .pkl 파일 먼저 체크
|
|
if os.path.exists(pkl_file):
|
|
try:
|
|
df = pd.read_pickle(pkl_file)
|
|
df = normalize_timezone(df)
|
|
if not df.empty:
|
|
# print(f"'{yf_ticker}' Pickle 로드 성공.") # 디버깅용
|
|
return df
|
|
except (pickle.UnpicklingError, EOFError) as e:
|
|
print(f"경고: Pickle 파일 {pkl_file}이 손상되었습니다: {e}")
|
|
except Exception as e:
|
|
print(f"경고: Pickle 로드 예상치 못한 오류 ({pkl_file}): {type(e).__name__}: {e}")
|
|
|
|
# 2-2. 요청 범위를 포함하는 더 넓은 .pkl 파일 찾아 재활용
|
|
try:
|
|
# 같은 티커로 시작하는 모든 pkl 파일 탐색
|
|
pattern = os.path.join(config.DATA_CACHE_DIR, f"{yf_ticker}_*.pkl")
|
|
pkl_files = glob.glob(pattern)
|
|
|
|
# 파일명에서 시작/종료 날짜 추출하여 요청된 범위를 포함하는 파일 찾기
|
|
valid_files = []
|
|
for f in pkl_files:
|
|
try:
|
|
# 파일명 예: "000080.KS_2018-01-01_2023-12-31.pkl"
|
|
basename = os.path.basename(f)
|
|
parts = basename.replace('.pkl', '').split('_')
|
|
if len(parts) >= 3:
|
|
file_start = pd.to_datetime(parts[-2])
|
|
file_end = pd.to_datetime(parts[-1])
|
|
# 요청 범위를 포함하는 파일인지 확인
|
|
if file_start <= start_dt and file_end >= end_dt:
|
|
valid_files.append((f, file_start, file_end))
|
|
except (ValueError, IndexError) as e:
|
|
# 파일명 형식이 맞지 않는 경우 무시
|
|
continue
|
|
|
|
# 유효한 파일이 있으면 가장 범위가 넓은 것 선택
|
|
if valid_files:
|
|
valid_files.sort(key=lambda x: (x[2] - x[1]), reverse=True) # 범위가 넓은 순
|
|
best_file = valid_files[0][0]
|
|
try:
|
|
df = pd.read_pickle(best_file)
|
|
df = normalize_timezone(df)
|
|
if not df.empty:
|
|
# 요청된 범위로 slice
|
|
df_sliced = df.loc[(df.index >= start_dt) & (df.index <= end_dt)]
|
|
if not df_sliced.empty:
|
|
# print(f"'{yf_ticker}' 캐시 {os.path.basename(best_file)}에서 재사용 로드 완료.")
|
|
return df_sliced
|
|
except Exception as e:
|
|
print(f"경고: 캐시 파일 파싱 실패 ({f}): {e}")
|
|
except Exception as e:
|
|
print(f"경고: 캐시 재활용 실패 ({best_file}): {e}")
|
|
except Exception as e:
|
|
print(f"경고: glob 처리 실패: {e}") # .pkl 파일이 없거나 손상된 경우, .csv 파일 체크
|
|
if os.path.exists(csv_file):
|
|
try:
|
|
df = pd.read_csv(csv_file, index_col='Date', parse_dates=True)
|
|
df = normalize_timezone(df)
|
|
if not df.empty:
|
|
# .csv로 로드 성공 시, 다음을 위해 .pkl 파일 생성
|
|
print(f"'{yf_ticker}' CSV 로드 성공. Pickle 파일로 변환 저장합니다...")
|
|
df.to_pickle(pkl_file)
|
|
return df
|
|
except (pd.errors.ParserError, KeyError) as e:
|
|
print(f"경고: CSV 파일 {csv_file} 파싱 실패: {e}")
|
|
except Exception as e:
|
|
print(f"경고: CSV 로드 예상치 못한 오류 ({csv_file}): {type(e).__name__}: {e}")
|
|
|
|
return pd.DataFrame()
|
|
|
|
# --- 3. 신규 다운로드 모드 (LOAD_FROM_LOCAL = False) ---
|
|
print(f"{yf_ticker}: yfinance API로부터 데이터 다운로드 중...")
|
|
try:
|
|
time.sleep(config.API_REQUEST_DELAY)
|
|
data = yf.Ticker(yf_ticker)
|
|
df = data.history(start=start_date, end=end_date, interval="1d")
|
|
|
|
if df.empty and yf_ticker.endswith(".KS"):
|
|
yf_ticker_kq = f"{ticker}.KQ"
|
|
print(f".KS 실패, {yf_ticker_kq} 재시도...")
|
|
time.sleep(config.API_REQUEST_DELAY)
|
|
data_kq = yf.Ticker(yf_ticker_kq)
|
|
df = data_kq.history(start=start_date, end=end_date, interval="1d")
|
|
|
|
if df.empty:
|
|
print(f"경고: {ticker}({yf_ticker}) 데이터 로드 실패. 빈 파일 저장.")
|
|
if config.SAVE_TO_LOCAL:
|
|
# 빈 파일도 pkl과 csv 둘 다 생성하여 불필요한 재시도 방지
|
|
pd.DataFrame().to_pickle(pkl_file)
|
|
pd.DataFrame().to_csv(csv_file)
|
|
return pd.DataFrame()
|
|
|
|
df = normalize_timezone(df)
|
|
|
|
# --- 다운로드 직후 보조지표 계산 ---
|
|
print(f"{yf_ticker}: 보조지표 계산 중...")
|
|
df = calculate_indicators(df)
|
|
|
|
# 4. 계산된 지표 포함하여 두 포맷으로 모두 저장
|
|
if config.SAVE_TO_LOCAL:
|
|
print(f"{yf_ticker}: 데이터 및 지표를 pkl, csv 캐시 파일로 저장 중...")
|
|
df.to_pickle(pkl_file)
|
|
df.to_csv(csv_file)
|
|
|
|
return df
|
|
|
|
except (ConnectionError, TimeoutError) as e:
|
|
print(f"에러: {yf_ticker} 네트워크 오류 - {e}")
|
|
return pd.DataFrame()
|
|
except Exception as e:
|
|
print(f"에러: {yf_ticker} 예상치 못한 오류 - {type(e).__name__}: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def calculate_indicators(df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
[v1.0 업그레이드 / FIX]
|
|
pandas_ta를 사용하여 필요한 모든 보조지표를 계산합니다.
|
|
(KeyError 방지를 위해 '명시적 할당' 방식으로 수정)
|
|
|
|
Args:
|
|
df: OHLCV 데이터를 포함하는 데이터프레임
|
|
|
|
Returns:
|
|
보조지표가 추가된 데이터프레임
|
|
"""
|
|
if df.empty:
|
|
return df
|
|
|
|
try:
|
|
# 1. 이동평균선 계산 (v2.0에서 사용하는 모든 MA)
|
|
# [FIX] 'df.ta.sma(append=True)' 대신 명시적 할당 'df['col'] = ta.sma(...)' 사용
|
|
for ma in config.ALL_MA_LIST:
|
|
df[f'MA_{ma}'] = ta.sma(df['Close'], length=ma)
|
|
|
|
# 2. 거래량 평균 계산
|
|
vol_period = config.STRONG_BREAKTHROUGH_CONDITIONS.get('volume_avg_period', 20)
|
|
vol_ma_name = f'Volume_MA_{vol_period}'
|
|
# [FIX] 'df.ta.sma'가 아닌 'ta.sma'를 직접 호출하여 할당
|
|
df[vol_ma_name] = ta.sma(df['Volume'], length=vol_period)
|
|
|
|
# 3. (선택) v1.0의 다른 지표들도 미리 계산 (향후 전략 확장을 위함)
|
|
|
|
# [FIX] 'df.ta.rsi(append=True)' 대신 명시적 할당
|
|
df['RSI_14'] = ta.rsi(df['Close'], length=14)
|
|
|
|
# bbands와 macd는 여러 컬럼을 반환하므로 df.ta.xxx(append=True) 방식 유지
|
|
df.ta.bbands(length=20, std=2, append=True)
|
|
df.ta.macd(fast=12, slow=26, signal=9, append=True)
|
|
|
|
except (AttributeError, KeyError) as e:
|
|
print(f" [경고] pandas_ta 보조 지표 계산 중 오류 (컬럼 누락): {e}")
|
|
except Exception as e:
|
|
print(f" [경고] pandas_ta 예상치 못한 오류: {type(e).__name__}: {e}")
|
|
|
|
return df
|
|
|
|
def get_financial_data(ticker: str) -> Optional[pd.Series]:
|
|
"""
|
|
yfinance를 사용해 펀더멘털 데이터 (매출액) 로드 (백테스트 결과 출력용)
|
|
|
|
Args:
|
|
ticker: 종목 티커
|
|
|
|
Returns:
|
|
최근 8분기 매출액 Series, 로드 실패 시 None
|
|
"""
|
|
try:
|
|
yf_ticker = yf.Ticker(ticker) # yfinance는 티커 자동 보정 시도
|
|
quarterly_financials = yf_ticker.quarterly_financials
|
|
|
|
if 'Total Revenue' in quarterly_financials.index:
|
|
revenue = quarterly_financials.loc['Total Revenue']
|
|
# 최근 8분기(2년) 데이터 반환
|
|
return revenue.head(8)
|
|
else:
|
|
return None
|
|
except (KeyError, AttributeError) as e:
|
|
print(f"경고: {ticker} 재무 데이터 없음 - {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"경고: {ticker} 재무 데이터 예상치 못한 오류 - {type(e).__name__}: {e}")
|
|
return None |