# 데이터 저장 및 Upbit 동기화 로직 분석 **분석 날짜:** 2025-04-XX **대상 파일:** holdings.json, pending_orders.json, trades.json **결론:** ✅ **정상적으로 구현되어 있음** --- ## 📋 개요 AutoCoinTrader는 3개의 핵심 JSON 파일로 상태를 관리하며, 각 파일은 **원자적 쓰기**, **재시도 로직**, **Upbit API 동기화**가 구현되어 있습니다. | 파일 | 역할 | Upbit 동기화 | 업데이트 주기 | |------|------|-------------|--------------| | `holdings.json` | 현재 보유 자산 | ✅ 자동 동기화 | 매 루프마다 | | `trades.json` | 거래 히스토리 | ❌ 로컬 전용 | 매수/매도 시 | | `pending_orders.json` | 미확인 주문 | ❌ 로컬 전용 | 주문 시 | --- ## 1️⃣ holdings.json - 보유 자산 관리 ### 파일 구조 ```json { "KRW-BTC": { "buy_price": 50000000.0, "amount": 0.001, "max_price": 51000000.0, "buy_timestamp": 1701234567.89, "partial_sell_done": false } } ``` ### Upbit 동기화 로직 #### ✅ **fetch_holdings_from_upbit()** (src/holdings.py) ```python @retry_with_backoff(max_attempts=3, base_delay=2.0, max_delay=10.0) def fetch_holdings_from_upbit(cfg: "RuntimeConfig") -> dict | None: """ Upbit API에서 현재 보유 자산 정보를 조회하고, 로컬 상태와 병합합니다. """ # 1. Upbit API 호출 upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key) balances = upbit.get_balances() # 2. 기존 holdings 파일에서 max_price 불러오기 existing_holdings = load_holdings(HOLDINGS_FILE) # 3. 새로운 holdings 생성 holdings = {} for item in balances: currency = item.get("currency").upper() amount = float(item.get("balance", 0)) buy_price = float(item.get("avg_buy_price_krw") or item.get("avg_buy_price") or 0) # 기존 max_price 유지 (매도 조건 판정 용) max_price = existing_holdings[market].get("max_price") if market in existing_holdings else buy_price holdings[f"KRW-{currency}"] = { "buy_price": buy_price, "amount": amount, "max_price": max_price, "buy_timestamp": None } return holdings ``` #### 동기화 시점: **main.py - process_symbols_and_holdings()** ```python # main.py line 155-164 if cfg.upbit_access_key and cfg.upbit_secret_key: from src.holdings import save_holdings, fetch_holdings_from_upbit updated_holdings = fetch_holdings_from_upbit(cfg) # ← Upbit API 호출 if updated_holdings is not None: holdings = updated_holdings save_holdings(holdings, HOLDINGS_FILE) # ← 로컬 파일 저장 else: logger.error("Upbit에서 보유 정보를 가져오지 못했습니다. 이번 주기에서는 매도 분석을 건너뜁니다.") ``` **동기화 주기:** - **매 루프마다** (매수 조건 확인 후, 매도 조건 확인 전) - 재시도: 3회 (exponential backoff: 2s → 4s → 8s) - 실패 시: 매도 분석 건너뜀 (안전 조치) ### 로컬 업데이트 시점 #### 1. 매수 완료 시 (`add_new_holding`) ```python # src/holdings.py line 153-220 def add_new_holding(symbol: str, buy_price: float, amount: float, ...) -> bool: with holdings_lock: holdings = _load_holdings_unsafe(holdings_file) if symbol in holdings: # 기존 보유: 평균 매수가 계산 total_amount = prev_amount + amount new_avg_price = ((prev_price * prev_amount) + (buy_price * amount)) / total_amount holdings[symbol]["buy_price"] = new_avg_price holdings[symbol]["amount"] = total_amount holdings[symbol]["max_price"] = max(new_avg_price, prev_max) else: # 신규 보유 추가 holdings[symbol] = { "buy_price": buy_price, "amount": amount, "max_price": buy_price, "buy_timestamp": timestamp, "partial_sell_done": False } _save_holdings_unsafe(holdings, holdings_file) # 원자적 저장 ``` **호출 위치:** - `src/order.py` line 909: 매수 체결 완료 시 - `src/order.py` line 885-920: 타임아웃/부분체결 시 체결량만큼 자동 반영 #### 2. 매도 완료 시 (`update_holding_amount`) ```python # src/holdings.py line 223-270 def update_holding_amount(symbol: str, amount_change: float, ...) -> bool: with holdings_lock: holdings = _load_holdings_unsafe(holdings_file) new_amount = max(0.0, prev_amount + amount_change) # amount_change는 음수 if new_amount <= min_amount_threshold: # 거의 0이면 제거 holdings.pop(symbol, None) logger.info("[%s] holdings 업데이트: 전량 매도 완료, 보유 제거", symbol) else: holdings[symbol]["amount"] = new_amount logger.info("[%s] holdings 업데이트: 수량 변경 %.8f -> %.8f", symbol, prev_amount, new_amount) _save_holdings_unsafe(holdings, holdings_file) ``` ### 원자적 쓰기 보장 ```python # src/holdings.py line 44-63 def _save_holdings_unsafe(holdings: dict, holdings_file: str) -> None: temp_file = f"{holdings_file}.tmp" # 1. 임시 파일에 먼저 쓰기 with open(temp_file, "w", encoding="utf-8") as f: json.dump(holdings, f, ensure_ascii=False, indent=2) f.flush() os.fsync(f.fileno()) # 디스크 동기화 보장 # 2. 원자적 교체 (rename은 원자적 연산) os.replace(temp_file, holdings_file) ``` **안전 장치:** - 임시 파일 사용 → 원본 파일 손상 방지 - `os.fsync()` → 디스크 동기화 보장 - `os.replace()` → 원자적 교체 (중단 시 원본 보존) - `holdings_lock` → 스레드 안전성 --- ## 2️⃣ trades.json - 거래 히스토리 ### 파일 구조 ```json [ { "symbol": "KRW-BTC", "side": "buy", "amount_krw": 100000, "dry_run": false, "price": 50000000.0, "status": "filled", "timestamp": 1701234567.89, "buy_price": 49000000.0, "sell_price": 50000000.0, "profit_rate": 2.04 } ] ``` ### Upbit 동기화 **❌ 동기화 없음 (로컬 전용)** - 이유: 히스토리 데이터로, Upbit API에서 조회 불필요 - 용도: 성과 분석, 수익률 계산, 백테스트 ### 로컬 업데이트 시점 #### 1. 매수/매도 시 (`record_trade`) ```python # src/signals.py line 235-276 def record_trade(trade: dict, trades_file: str = TRADES_FILE, critical: bool = True) -> None: trades = [] # 1. 기존 trades 파일 로드 if os.path.exists(trades_file): try: with open(trades_file, "r", encoding="utf-8") as f: trades = json.load(f) except json.JSONDecodeError as e: # 파일 손상 시 백업 logger.warning("거래기록 파일 손상 감지, 백업 후 새로 시작: %s", e) backup_file = f"{trades_file}.corrupted.{int(time.time())}" os.rename(trades_file, backup_file) trades = [] trades.append(trade) # 2. 원자적 쓰기 temp_file = f"{trades_file}.tmp" with open(temp_file, "w", encoding="utf-8") as f: json.dump(trades, f, ensure_ascii=False, indent=2) f.flush() os.fsync(f.fileno()) os.replace(temp_file, trades_file) ``` **호출 위치:** - `src/signals.py` line 497: 매수 신호 발생 시 - `src/signals.py` line 534: 매도 신호 발생 시 - 매수/매도 완료, 시뮬레이션, 알림만 발생 모두 기록 ### 원자적 쓰기 보장 - ✅ 임시 파일 사용 (`trades_file.tmp`) - ✅ 디스크 동기화 (`os.fsync()`) - ✅ 원자적 교체 (`os.replace()`) - ✅ 손상 파일 자동 백업 (`.corrupted.timestamp`) --- ## 3️⃣ pending_orders.json - 미확인 주문 ### 파일 구조 ```json [ { "token": "abc123def456...", "order": { "uuid": "order-uuid-from-upbit", "market": "KRW-BTC", "side": "buy", "price": 50000000.0, "volume": 0.001 }, "timestamp": 1701234567.89 } ] ``` ### Upbit 동기화 **❌ 동기화 없음 (로컬 전용)** - 이유: 사용자 확인 대기 토큰 저장용 - 용도: 주문 확인 대기 목록 관리 ### 로컬 업데이트 시점 #### 주문 생성 시 (`_write_pending_order`) ```python # src/order.py line 74-90 def _write_pending_order(token: str, order: dict, pending_file: str = PENDING_ORDERS_FILE): with _pending_order_lock: pending = [] # 기존 pending_orders 로드 if os.path.exists(pending_file): with open(pending_file, "r", encoding="utf-8") as f: try: pending = json.load(f) except Exception: pending = [] # 새로운 주문 추가 pending.append({ "token": token, "order": order, "timestamp": time.time() }) # 파일 저장 with open(pending_file, "w", encoding="utf-8") as f: json.dump(pending, f, ensure_ascii=False, indent=2) ``` **호출 위치:** - 주문 확인 필요 시 (`require_env_confirm=True` 설정 시) - 현재 코드에서는 사용 흔적 없음 (레거시 기능) ### 주의사항 **⚠️ 원자적 쓰기 미구현** - 임시 파일 미사용 → 중단 시 파일 손상 위험 - `os.fsync()` 미사용 → 디스크 동기화 보장 없음 - **개선 권장**: holdings.json과 동일한 방식 적용 --- ## 🔄 전체 데이터 흐름 ### 매수 시나리오 ``` 1. 매수 신호 발생 (signals.py) ↓ 2. execute_buy_order_with_confirmation() 호출 ↓ 3. place_buy_order_upbit() → Upbit API 주문 ↓ 4. monitor_order_upbit() → 체결 모니터링 ↓ 5. add_new_holding() → holdings.json 업데이트 (로컬) ↓ 6. record_trade() → trades.json 기록 (로컬) ↓ 7. [다음 루프] fetch_holdings_from_upbit() → Upbit API로 동기화 ✅ ↓ 8. save_holdings() → holdings.json 덮어쓰기 (동기화 완료) ``` ### 매도 시나리오 ``` 1. 매도 조건 만족 (signals.py) ↓ 2. execute_sell_order_with_confirmation() 호출 ↓ 3. place_sell_order_upbit() → Upbit API 주문 ↓ 4. monitor_order_upbit() → 체결 모니터링 ↓ 5. update_holding_amount() → holdings.json 수량 감소 (로컬) ↓ 6. record_trade() → trades.json 기록 (로컬) ↓ 7. [다음 루프] fetch_holdings_from_upbit() → Upbit API로 동기화 ✅ ↓ 8. save_holdings() → holdings.json 덮어쓰기 (동기화 완료) ``` --- ## 📊 동기화 전략 정리 | 동작 | holdings.json | trades.json | pending_orders.json | |------|--------------|-------------|---------------------| | **매수 직후** | 로컬 추가 (add_new_holding) | 로컬 기록 | 사용 안 함 | | **매도 직후** | 로컬 업데이트 (update_holding_amount) | 로컬 기록 | 사용 안 함 | | **다음 루프** | Upbit API 동기화 ✅ | 동기화 안 함 | 동기화 안 함 | ### holdings.json 동기화 흐름 ``` 로컬 파일 (주문 직후) ↓ Upbit API (다음 루프) ↓ 로컬 파일 덮어쓰기 (동기화 완료) ``` **동기화 간격:** 약 1-5분 (루프 주기에 따름) --- ## ✅ 구현 품질 평가 ### 잘된 점 | 항목 | 평가 | 구현 | |------|------|------| | **원자적 쓰기** | ✅ 우수 | holdings.json, trades.json | | **재시도 로직** | ✅ 우수 | fetch_holdings_from_upbit (3회) | | **스레드 안전성** | ✅ 우수 | holdings_lock, _pending_order_lock | | **손상 파일 복구** | ✅ 우수 | trades.json 자동 백업 | | **Upbit 동기화** | ✅ 구현됨 | holdings.json (매 루프) | ### 개선 가능한 점 | 항목 | 현재 상태 | 권장 개선 | |------|----------|----------| | **pending_orders.json** | ⚠️ 원자적 쓰기 없음 | 임시 파일 + os.fsync() 추가 | | **trades.json** | ⚠️ 파일 크기 무한 증가 | 주기적 압축 또는 분할 저장 | | **동기화 실패** | ⚠️ 매도 분석 건너뜀 | 로컬 데이터로 대체 로직 추가 | --- ## 🎯 결론 ### ✅ **정상적으로 구현되어 있음** 1. **holdings.json** - ✅ Upbit API 동기화: 매 루프마다 - ✅ 재시도 로직: 3회 exponential backoff - ✅ 원자적 쓰기: 임시 파일 + os.replace() - ✅ 스레드 안전성: holdings_lock 2. **trades.json** - ✅ 원자적 쓰기: 임시 파일 + os.fsync() - ✅ 손상 파일 자동 백업 - ❌ Upbit 동기화 없음 (로컬 전용, 정상) 3. **pending_orders.json** - ⚠️ 원자적 쓰기 미구현 (개선 권장) - ❌ Upbit 동기화 없음 (로컬 전용, 정상) - ℹ️ 현재 사용 중단된 기능 ### 동작 보장 - ✅ 매수/매도 직후: 로컬 파일 즉시 업데이트 - ✅ 다음 루프: Upbit API로 holdings 동기화 - ✅ 네트워크 오류 시: 재시도 3회 → 실패 시 안전하게 건너뜀 - ✅ 파일 손상 시: 자동 백업 + 복구 --- **최종 평가:** 🟢 **Production Ready** 현재 구현은 프로덕션 환경에서 안전하게 사용 가능합니다. Upbit API 동기화와 로컬 파일 저장이 적절히 분리되어 있으며, 원자적 쓰기와 재시도 로직으로 안정성이 확보되어 있습니다.