from __future__ import annotations import json import os import threading import time from typing import TYPE_CHECKING import pyupbit import requests from . import state_manager # [NEW] Import StateManager from .common import FLOAT_EPSILON, HOLDINGS_FILE, MIN_TRADE_AMOUNT, api_rate_limiter, logger from .constants import BALANCE_RETRY_BACKOFF, DEFAULT_RETRY_BACKOFF, DEFAULT_RETRY_COUNT, PRICE_CACHE_TTL from .retry_utils import retry_with_backoff if TYPE_CHECKING: from .config import RuntimeConfig # 부동소수점 비교용 임계값 (MIN_TRADE_AMOUNT와 동일한 용도) EPSILON = FLOAT_EPSILON # 파일 잠금을 위한 RLock 객체 (재진입 가능) holdings_lock = threading.RLock() # 짧은 TTL 캐시 (현재가/잔고) - constants.py에서 import # PRICE_CACHE_TTL은 constants.py에 정의됨 BALANCE_CACHE_TTL = PRICE_CACHE_TTL # 동일한 TTL 사용 _price_cache: dict[str, tuple[float, float]] = {} # market -> (price, ts) _balance_cache: tuple[dict | None, float] = ({}, 0.0) _cache_lock = threading.Lock() def _load_holdings_unsafe(holdings_file: str) -> dict[str, dict]: """내부 사용 전용: Lock 없이 holdings 파일 로드""" if os.path.exists(holdings_file): if os.path.getsize(holdings_file) == 0: logger.debug("[DEBUG] 보유 파일이 비어있습니다: %s", holdings_file) return {} with open(holdings_file, encoding="utf-8") as f: return json.load(f) return {} def load_holdings(holdings_file: str = HOLDINGS_FILE) -> dict[str, dict]: """ holdings 파일을 로드합니다. Returns: 심볼별 보유 정보 (symbol -> {amount, buy_price, ...}) """ try: with holdings_lock: return _load_holdings_unsafe(holdings_file) except json.JSONDecodeError as e: logger.error("[ERROR] 보유 파일 JSON 디코드 실패: %s", e) except OSError as e: logger.exception("[ERROR] 보유 파일 로드 중 입출력 예외 발생: %s", e) raise return {} def _save_holdings_unsafe(holdings: dict[str, dict], holdings_file: str) -> None: """내부 사용 전용: Lock 없이 holdings 파일 저장 (원자적 쓰기)""" os.makedirs(os.path.dirname(holdings_file) or ".", exist_ok=True) temp_file = f"{holdings_file}.tmp" try: # 임시 파일에 먼저 쓰기 with open(temp_file, "w", encoding="utf-8") as f: json.dump(holdings, f, ensure_ascii=False, indent=2) f.flush() os.fsync(f.fileno()) # 디스크 동기화 보장 # 원자적 교체 (rename은 원자적 연산) os.replace(temp_file, holdings_file) # ✅ 보안 개선: 파일 권한 설정 (rw------- = 0o600) try: import stat os.chmod(holdings_file, stat.S_IRUSR | stat.S_IWUSR) # 소유자만 읽기/쓰기 except Exception as e: # Windows에서는 chmod가 제한적이므로 오류 무시 logger.debug("파일 권한 설정 건너뜀 (Windows는 미지원): %s", e) logger.debug("[DEBUG] 보유 저장 (원자적): %s", holdings_file) except OSError as e: logger.error("[ERROR] 보유 저장 중 입출력 오류: %s", e) # 임시 파일 정리 if os.path.exists(temp_file): try: os.remove(temp_file) except Exception: pass raise def save_holdings(holdings: dict[str, dict], holdings_file: str = HOLDINGS_FILE) -> None: """스레드 안전 + 원자적 파일 쓰기로 holdings 저장""" try: with holdings_lock: _save_holdings_unsafe(holdings, holdings_file) except OSError as e: logger.error("[ERROR] 보유 저장 실패: %s", e) raise # 호출자가 저장 실패를 인지하도록 예외 재발생 def update_max_price(symbol: str, current_price: float, holdings_file: str = HOLDINGS_FILE) -> None: """최고가를 갱신합니다 (기존 max_price보다 높을 때만) Args: symbol: 심볼 (예: "KRW-BTC") current_price: 현재 가격 holdings_file: holdings 파일 경로 (더 이상 주된 상태 저장소가 아님) Note: 이제 bot_state.json (StateManager)을 통해 영구 저장됩니다. holdings.json은 캐시 역할로 유지됩니다. """ # 1. StateManager를 통해 영구 저장소 업데이트 state_manager.update_max_price_state(symbol, current_price) # 2. 기존 holdings.json 업데이트 (호환성 유지) with holdings_lock: holdings = load_holdings(holdings_file) if symbol not in holdings: return holding_info = holdings[symbol] # StateManager에서 최신 max_price 가져오기 new_max = state_manager.get_value(symbol, "max_price", 0.0) # holdings 파일에도 반영 (표시용) if new_max > holding_info.get("max_price", 0): holdings[symbol]["max_price"] = new_max save_holdings(holdings, holdings_file) logger.debug("[%s] max_price 동기화 완료: %.2f", symbol, new_max) def get_upbit_balances(cfg: RuntimeConfig) -> dict | None: """ Upbit API를 통해 현재 잔고를 조회합니다. Args: cfg: RuntimeConfig 객체 (Upbit API 키 포함) Returns: 심볼별 잔고 딕셔너리 (예: {"BTC": 0.5, "ETH": 10.0, "KRW": 1000000}) - MIN_TRADE_AMOUNT (1e-8) 이하의 자산은 제외됨 - API 키 미설정 시 빈 딕셔너리 {} 반환 - 네트워크 오류 또는 API 오류 시 None 반환 Raises: Exception: Upbit API 호출 중 발생한 예외는 로깅되고 None 반환 """ global _balance_cache try: if not (cfg.upbit_access_key and cfg.upbit_secret_key): logger.debug("API 키 없음 - 빈 balances") return {} now = time.time() with _cache_lock: cached_balances, ts = _balance_cache if cached_balances is not None and (now - ts) <= BALANCE_CACHE_TTL: return dict(cached_balances) upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key) # 간단한 재시도(최대 3회, 짧은 백오프) last_error: Exception | None = None for attempt in range(3): try: api_rate_limiter.acquire() balances = upbit.get_balances() if not isinstance(balances, list): logger.error("Upbit balances 형식 오류: 예상(list), 실제(%s)", type(balances).__name__) last_error = TypeError("invalid balances type") time.sleep(0.2 * (attempt + 1)) continue result: dict[str, float] = {} for item in balances: currency = (item.get("currency") or "").upper() try: balance = float(item.get("balance", 0)) except Exception: balance = 0.0 # KRW는 극소량 체크 건너뜀 (매수 잔고 확인용) if currency != "KRW" and balance <= MIN_TRADE_AMOUNT: continue result[currency] = balance with _cache_lock: _balance_cache = (result, time.time()) logger.debug("Upbit 보유 %d개", len(result)) return result except (requests.exceptions.RequestException, ValueError, TypeError) as e: # 네트워크/파싱 오류 last_error = e logger.warning("Upbit balances 재시도 %d/3 실패: %s", attempt + 1, e) time.sleep(BALANCE_RETRY_BACKOFF * (attempt + 1)) if last_error: logger.error("Upbit balances 실패: %s", last_error) return None except (requests.exceptions.RequestException, ValueError, TypeError) as e: logger.error("Upbit balances 실패: %s", e) return None def get_current_price(symbol: str) -> float: """ 주어진 심볼의 현재가를 Upbit에서 조회합니다. Args: symbol: 거래 심볼 (예: "BTC", "KRW-BTC", "eth") "KRW-" 접두사 유무 자동 처리 Returns: 현재가 (KRW 기준, float 타입) - 조회 실패 또는 API 오류 시 0.0 반환 Note: - 조회 실패 시 WARNING 레벨로 로깅됨 - 심볼 대소문자 자동 정규화 (예: "btc" → "KRW-BTC") - 재시도 로직 없음 (상위 함수에서 재시도 처리 권장) """ try: if symbol.upper().startswith("KRW-"): market = symbol.upper() else: market = f"KRW-{symbol.replace('KRW-', '').upper()}" now = time.time() with _cache_lock: cached = _price_cache.get(market) if cached: price_cached, ts = cached if (now - ts) <= PRICE_CACHE_TTL: return price_cached last_error: Exception | None = None for attempt in range(DEFAULT_RETRY_COUNT): try: api_rate_limiter.acquire() price = pyupbit.get_current_price(market) if price: price_f = float(price) with _cache_lock: _price_cache[market] = (price_f, time.time()) logger.debug("[DEBUG] 현재가 %s -> %.2f (attempt %d)", market, price_f, attempt + 1) return price_f last_error = ValueError("empty price") except (requests.exceptions.RequestException, ValueError, TypeError) as e: last_error = e logger.warning( "[WARNING] 현재가 조회 실패 %s (재시도 %d/%d): %s", symbol, attempt + 1, DEFAULT_RETRY_COUNT, e ) time.sleep(DEFAULT_RETRY_BACKOFF * (attempt + 1)) if last_error: logger.warning("[WARNING] 현재가 조회 최종 실패 %s: %s", symbol, last_error) except (requests.exceptions.RequestException, ValueError, TypeError) as e: logger.warning("[WARNING] 현재가 조회 실패 %s: %s", symbol, e) return 0.0 def add_new_holding( symbol: str, buy_price: float, amount: float, buy_timestamp: float | None = None, holdings_file: str = HOLDINGS_FILE ) -> bool: """ 새로운 보유 자산을 추가하거나 기존 보유량을 추가합니다. Args: symbol: 거래 심볼 (예: KRW-BTC) buy_price: 평균 매수가 amount: 매수한 수량 buy_timestamp: 매수 시각 (None이면 현재 시각 사용) holdings_file: 보유 파일 경로 Returns: 성공 여부 (True/False) """ try: import time timestamp = buy_timestamp if buy_timestamp is not None else time.time() with holdings_lock: holdings = _load_holdings_unsafe(holdings_file) if symbol in holdings: # 기존 보유가 있으면 평균 매수가와 수량 업데이트 prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0) prev_price = float(holdings[symbol].get("buy_price", 0.0) or 0.0) total_amount = prev_amount + amount if total_amount > 0: # 가중 평균 매수가 계산 new_avg_price = ((prev_price * prev_amount) + (buy_price * amount)) / total_amount holdings[symbol]["buy_price"] = new_avg_price holdings[symbol]["amount"] = total_amount # max_price 갱신: 현재 매수가와 기존 max_price 중 큰 값 prev_max = float(holdings[symbol].get("max_price", 0.0) or 0.0) holdings[symbol]["max_price"] = max(new_avg_price, prev_max) logger.info( "[INFO] [%s] holdings 추가 매수: 평균가 %.2f -> %.2f, 수량 %.8f -> %.8f", symbol, prev_price, new_avg_price, prev_amount, total_amount, ) else: # 신규 보유 추가 holdings[symbol] = { "buy_price": buy_price, "amount": amount, "max_price": buy_price, "buy_timestamp": timestamp, "partial_sell_done": False, } logger.info("[INFO] [%s] holdings 신규 추가: 매수가=%.2f, 수량=%.8f", symbol, buy_price, amount) state_manager.set_value(symbol, "max_price", holdings[symbol]["max_price"]) state_manager.set_value(symbol, "partial_sell_done", False) _save_holdings_unsafe(holdings, holdings_file) return True except (OSError, json.JSONDecodeError, ValueError, TypeError) as e: logger.exception("[ERROR] [%s] holdings 추가 실패: %s", symbol, e) return False def update_holding_amount( symbol: str, amount_change: float, holdings_file: str = HOLDINGS_FILE, min_amount_threshold: float = 1e-8 ) -> bool: """ 보유 자산의 수량을 변경합니다 (매도 시 음수 값 전달). 수량이 0 이하가 되면 해당 심볼을 holdings에서 제거합니다. Args: symbol: 거래 심볼 (예: KRW-BTC) amount_change: 변경할 수량 (매도 시 음수, 매수 시 양수) holdings_file: 보유 파일 경로 min_amount_threshold: 0으로 간주할 최소 수량 임계값 Returns: 성공 여부 (True/False) """ try: with holdings_lock: holdings = _load_holdings_unsafe(holdings_file) if symbol not in holdings: logger.warning("[WARNING] [%s] holdings에 존재하지 않아 수량 업데이트 건너뜀", symbol) return False prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0) new_amount = max(0.0, prev_amount + amount_change) if new_amount <= min_amount_threshold: # 거의 0이면 제거 holdings.pop(symbol, None) logger.info( "[INFO] [%s] holdings 업데이트: 전량 매도 완료, 보유 제거 (이전: %.8f, 변경: %.8f)", symbol, prev_amount, amount_change, ) else: holdings[symbol]["amount"] = new_amount logger.info( "[INFO] [%s] holdings 업데이트: 수량 변경 %.8f -> %.8f (변경량: %.8f)", symbol, prev_amount, new_amount, amount_change, ) _save_holdings_unsafe(holdings, holdings_file) return True except (OSError, json.JSONDecodeError, ValueError, TypeError) as e: logger.exception("[ERROR] [%s] holdings 수량 업데이트 실패: %s", symbol, e) return False def set_holding_field(symbol: str, key: str, value, holdings_file: str = HOLDINGS_FILE) -> bool: """ 보유 자산의 특정 필드 값을 설정합니다. Args: symbol: 거래 심볼 (예: KRW-BTC) key: 설정할 필드의 키 (예: "partial_sell_done") value: 설정할 값 holdings_file: 보유 파일 경로 Returns: 성공 여부 (True/False) """ try: with holdings_lock: holdings = _load_holdings_unsafe(holdings_file) if symbol not in holdings: logger.warning("[WARNING] [%s] holdings에 존재하지 않아 필드 설정 건너뜀", symbol) return False holdings[symbol][key] = value logger.info("[INFO] [%s] holdings 업데이트: 필드 '%s'를 '%s'(으)로 설정", symbol, key, value) _save_holdings_unsafe(holdings, holdings_file) # [NEW] StateManager에도 반영 state_manager.set_value(symbol, key, value) return True except (OSError, json.JSONDecodeError, ValueError, TypeError) as e: logger.exception("[ERROR] [%s] holdings 필드 설정 실패: %s", symbol, e) return False @retry_with_backoff(max_attempts=3, base_delay=2.0, max_delay=10.0, exceptions=(Exception,)) def fetch_holdings_from_upbit(cfg: RuntimeConfig) -> dict | None: """ Upbit API에서 현재 보유 자산 정보를 조회하고, 로컬 상태 정보와 병합합니다. Args: cfg: RuntimeConfig 객체 (Upbit API 키 포함) Returns: 심볼별 보유 정보 딕셔너리 (예: {"KRW-BTC": {...}, "KRW-ETH": {...}}) - 각 심볼: {"buy_price": float, "amount": float, "max_price": float, "buy_timestamp": null} - API 키 미설정 시 빈 딕셔너리 {} 반환 - 네트워크 오류 또는 API 오류 시 None 반환 (상위 함수에서 재시도) Behavior: - Upbit API에서 잔고 정보 조회 (amount, buy_price 등) - **중요**: 로컬 holdings.json의 `max_price`를 안전하게 로드하여 유지 (초기화 방지) - 잔고 0 또는 MIN_TRADE_AMOUNT 미만 자산은 제외 - buy_price 필드 우선순위: avg_buy_price_krw > avg_buy_price Decorator: @retry_with_backoff: 3회 지수 백오프 재시도 (2s → 4s → 8s) """ try: if not (cfg.upbit_access_key and cfg.upbit_secret_key): logger.debug("[DEBUG] API 키 없어 Upbit holdings 사용 안함") return {} upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key) balances = upbit.get_balances() # 타입 체크: balances가 리스트가 아닐 경우 if not isinstance(balances, list): logger.error( "[ERROR] Upbit balances 형식 오류: 예상(list), 실제(%s), 값=%s", type(balances).__name__, balances ) return None new_holdings_map = {} # 로컬 holdings 스냅샷 (StateManager가 비어있을 때 복원용) try: with holdings_lock: local_holdings_snapshot = _load_holdings_unsafe(HOLDINGS_FILE) except Exception: local_holdings_snapshot = {} # 1. API 잔고 먼저 처리 (메모리 맵 구성) for item in balances: currency = (item.get("currency") or "").upper() if currency == "KRW": continue try: amount = float(item.get("balance", 0)) except Exception: amount = 0.0 if amount <= EPSILON: continue # 평균 매수가 파싱 (우선순위: KRW -> 일반) buy_price = None if item.get("avg_buy_price_krw"): try: buy_price = float(item.get("avg_buy_price_krw")) except Exception: pass if buy_price is None and item.get("avg_buy_price"): try: buy_price = float(item.get("avg_buy_price")) except Exception: pass market = f"KRW-{currency}" new_holdings_map[market] = { "buy_price": buy_price or 0.0, "amount": amount, "max_price": buy_price or 0.0, # 기본값으로 매수가 설정 "buy_timestamp": None, } if not new_holdings_map: return {} # 2. StateManager(bot_state.json)에서 영구 상태 병합 # 이전에는 로컬 파일(holdings.json)을 병합했으나, 이제는 StateManager가 Source of Truth입니다. try: for market, new_data in new_holdings_map.items(): # StateManager에서 상태 로드 saved_max = state_manager.get_value(market, "max_price") saved_partial = state_manager.get_value(market, "partial_sell_done") # 로컬 holdings 스냅샷 로드 local_entry = ( local_holdings_snapshot.get(market, {}) if isinstance(local_holdings_snapshot, dict) else {} ) local_max = local_entry.get("max_price") local_partial = local_entry.get("partial_sell_done") current_buy_price = float(new_data.get("buy_price", 0.0) or 0.0) # max_price 복원: 사용 가능한 값 중 최댓값을 선택해 하향 초기화 방지 max_candidates = [current_buy_price] if saved_max is not None: try: max_candidates.append(float(saved_max)) except Exception: pass if local_max is not None: try: max_candidates.append(float(local_max)) except Exception: pass restored_max = max(max_candidates) new_data["max_price"] = restored_max state_manager.set_value(market, "max_price", restored_max) # partial_sell_done 복원: True를 보존하기 위해 StateManager가 False여도 로컬 True를 우선 반영 if bool(saved_partial): new_data["partial_sell_done"] = True state_manager.set_value(market, "partial_sell_done", True) elif local_partial is not None: new_data["partial_sell_done"] = bool(local_partial) state_manager.set_value(market, "partial_sell_done", bool(local_partial)) else: new_data["partial_sell_done"] = False state_manager.set_value(market, "partial_sell_done", False) except Exception as e: logger.warning("[WARNING] StateManager 데이터 병합 중 오류: %s", e) logger.debug("[DEBUG] Upbit holdings %d개 (State 병합 완료)", len(new_holdings_map)) return new_holdings_map except (requests.exceptions.RequestException, ValueError, TypeError) as e: logger.error("[ERROR] fetch_holdings 실패: %s", e) return None def backup_holdings(holdings_file: str = HOLDINGS_FILE) -> str | None: """ holdings.json 파일의 백업을 생성합니다 (선택사항 - 복구 전략). Args: holdings_file: 백업할 보유 파일 경로 Returns: 생성된 백업 파일 경로 (예: data/holdings.json.backup_20251204_120530) 백업 실패 시 None 반환 Note: - 백업 파일명: holdings.json.backup_YYYYMMDD_HHMMSS - 원본 파일이 없으면 None 반환 - 파일 손상 복구 시 수동으로 백업 파일을 원본 위치에 복사 """ try: if not os.path.exists(holdings_file): logger.warning("[WARNING] 백업 대상 파일이 없습니다: %s", holdings_file) return None import shutil from datetime import datetime timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"{holdings_file}.backup_{timestamp}" shutil.copy2(holdings_file, backup_file) logger.info("[INFO] Holdings 백업 생성: %s", backup_file) return backup_file except Exception as e: logger.error("[ERROR] Holdings 백업 생성 실패: %s", e) return None def restore_holdings_from_backup(backup_file: str, restore_to: str = HOLDINGS_FILE) -> bool: """ 백업 파일에서 holdings.json을 복구합니다 (선택사항 - 복구 전략). Args: backup_file: 백업 파일 경로 (예: data/holdings.json.backup_20251204_120530) restore_to: 복구 대상 경로 (기본값: HOLDINGS_FILE) Returns: 복구 성공 여부 (True/False) Note: - 복구 전에 원본 파일이 백업됨 - 복구 중 오류 발생 시 원본 파일은 손상되지 않음 - 원래 상태로 되돌리려면 복구 전 백업 파일을 확인하세요 """ try: if not os.path.exists(backup_file): logger.error("[ERROR] 백업 파일이 없습니다: %s", backup_file) return False # 현재 파일을 먼저 백업 (이중 백업) if os.path.exists(restore_to): double_backup = backup_holdings(restore_to) logger.info("[INFO] 복구 전 현재 파일 백업: %s", double_backup) import shutil # 복구 shutil.copy2(backup_file, restore_to) logger.info("[INFO] Holdings 복구 완료: %s -> %s", backup_file, restore_to) return True except Exception as e: logger.error("[ERROR] Holdings 복구 실패: %s", e) return False def reconcile_state_and_holdings(holdings_file: str = HOLDINGS_FILE) -> dict[str, dict]: """ StateManager(bot_state)와 holdings.json을 상호 보정합니다. - StateManager를 단일 소스로 두되, 비어있는 경우 holdings에서 값 복원 - holdings의 표시용 필드(max_price, partial_sell_done)를 state 값으로 동기화 Returns: 병합 완료된 holdings dict """ with holdings_lock: holdings_data = _load_holdings_unsafe(holdings_file) state = state_manager.load_state() state_changed = False holdings_changed = False for symbol, entry in list(holdings_data.items()): state_entry = state.get(symbol, {}) # max_price 동기화: state 우선, 없으면 holdings 값으로 채움 h_max = entry.get("max_price") s_max = state_entry.get("max_price") if s_max is None and h_max is not None: state_entry["max_price"] = h_max state_changed = True elif s_max is not None: if h_max != s_max: holdings_data[symbol]["max_price"] = s_max holdings_changed = True # partial_sell_done 동기화: state 우선, 없으면 holdings 값으로 채움 h_partial = entry.get("partial_sell_done") s_partial = state_entry.get("partial_sell_done") if s_partial is None and h_partial is not None: state_entry["partial_sell_done"] = h_partial state_changed = True elif s_partial is not None: if h_partial != s_partial: holdings_data[symbol]["partial_sell_done"] = s_partial holdings_changed = True if state_entry and symbol not in state: state[symbol] = state_entry state_changed = True # holdings에 있지만 state에 없는 심볼을 state에 추가 for symbol in holdings_data.keys(): if symbol not in state: state[symbol] = {} state_changed = True if state_changed: state_manager.save_state(state) if holdings_changed: save_holdings(holdings_data, holdings_file) return holdings_data