Files
AutoCoinTrader2/docs/data_sync_analysis.md
2025-12-09 21:39:23 +09:00

427 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 데이터 저장 및 Upbit 동기화 로직 분석
**분석 날짜:** 2025-04-XX
**대상 파일:** holdings.json, pending_orders.json, trades.json
**결론:****정상적으로 구현되어 있음**
---
## 📋 개요
AutoCoinTrader는 3개의 핵심 JSON 파일로 상태를 관리하며, 각 파일은 **원자적 쓰기**, **재시도 로직**, **Upbit API 동기화**가 구현되어 있습니다.
| 파일 | 역할 | Upbit 동기화 | 업데이트 주기 |
|------|------|-------------|--------------|
| `holdings.json` | 현재 보유 자산 | ✅ 자동 동기화 | 매 루프마다 |
| `trades.json` | 거래 히스토리 | ❌ 로컬 전용 | 매수/매도 시 |
| `pending_orders.json` | 미확인 주문 | ❌ 로컬 전용 | 주문 시 |
---
## 1⃣ holdings.json - 보유 자산 관리
### 파일 구조
```json
{
"KRW-BTC": {
"buy_price": 50000000.0,
"amount": 0.001,
"max_price": 51000000.0,
"buy_timestamp": 1701234567.89,
"partial_sell_done": false
}
}
```
### Upbit 동기화 로직
#### ✅ **fetch_holdings_from_upbit()** (src/holdings.py)
```python
@retry_with_backoff(max_attempts=3, base_delay=2.0, max_delay=10.0)
def fetch_holdings_from_upbit(cfg: "RuntimeConfig") -> dict | None:
"""
Upbit API에서 현재 보유 자산 정보를 조회하고, 로컬 상태와 병합합니다.
"""
# 1. Upbit API 호출
upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key)
balances = upbit.get_balances()
# 2. 기존 holdings 파일에서 max_price 불러오기
existing_holdings = load_holdings(HOLDINGS_FILE)
# 3. 새로운 holdings 생성
holdings = {}
for item in balances:
currency = item.get("currency").upper()
amount = float(item.get("balance", 0))
buy_price = float(item.get("avg_buy_price_krw") or item.get("avg_buy_price") or 0)
# 기존 max_price 유지 (매도 조건 판정 용)
max_price = existing_holdings[market].get("max_price") if market in existing_holdings else buy_price
holdings[f"KRW-{currency}"] = {
"buy_price": buy_price,
"amount": amount,
"max_price": max_price,
"buy_timestamp": None
}
return holdings
```
#### 동기화 시점: **main.py - process_symbols_and_holdings()**
```python
# main.py line 155-164
if cfg.upbit_access_key and cfg.upbit_secret_key:
from src.holdings import save_holdings, fetch_holdings_from_upbit
updated_holdings = fetch_holdings_from_upbit(cfg) # ← Upbit API 호출
if updated_holdings is not None:
holdings = updated_holdings
save_holdings(holdings, HOLDINGS_FILE) # ← 로컬 파일 저장
else:
logger.error("Upbit에서 보유 정보를 가져오지 못했습니다. 이번 주기에서는 매도 분석을 건너뜁니다.")
```
**동기화 주기:**
- **매 루프마다** (매수 조건 확인 후, 매도 조건 확인 전)
- 재시도: 3회 (exponential backoff: 2s → 4s → 8s)
- 실패 시: 매도 분석 건너뜀 (안전 조치)
### 로컬 업데이트 시점
#### 1. 매수 완료 시 (`add_new_holding`)
```python
# src/holdings.py line 153-220
def add_new_holding(symbol: str, buy_price: float, amount: float, ...) -> bool:
with holdings_lock:
holdings = _load_holdings_unsafe(holdings_file)
if symbol in holdings:
# 기존 보유: 평균 매수가 계산
total_amount = prev_amount + amount
new_avg_price = ((prev_price * prev_amount) + (buy_price * amount)) / total_amount
holdings[symbol]["buy_price"] = new_avg_price
holdings[symbol]["amount"] = total_amount
holdings[symbol]["max_price"] = max(new_avg_price, prev_max)
else:
# 신규 보유 추가
holdings[symbol] = {
"buy_price": buy_price,
"amount": amount,
"max_price": buy_price,
"buy_timestamp": timestamp,
"partial_sell_done": False
}
_save_holdings_unsafe(holdings, holdings_file) # 원자적 저장
```
**호출 위치:**
- `src/order.py` line 909: 매수 체결 완료 시
- `src/order.py` line 885-920: 타임아웃/부분체결 시 체결량만큼 자동 반영
#### 2. 매도 완료 시 (`update_holding_amount`)
```python
# src/holdings.py line 223-270
def update_holding_amount(symbol: str, amount_change: float, ...) -> bool:
with holdings_lock:
holdings = _load_holdings_unsafe(holdings_file)
new_amount = max(0.0, prev_amount + amount_change) # amount_change는 음수
if new_amount <= min_amount_threshold: # 거의 0이면 제거
holdings.pop(symbol, None)
logger.info("[%s] holdings 업데이트: 전량 매도 완료, 보유 제거", symbol)
else:
holdings[symbol]["amount"] = new_amount
logger.info("[%s] holdings 업데이트: 수량 변경 %.8f -> %.8f", symbol, prev_amount, new_amount)
_save_holdings_unsafe(holdings, holdings_file)
```
### 원자적 쓰기 보장
```python
# src/holdings.py line 44-63
def _save_holdings_unsafe(holdings: dict, holdings_file: str) -> None:
temp_file = f"{holdings_file}.tmp"
# 1. 임시 파일에 먼저 쓰기
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(holdings, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno()) # 디스크 동기화 보장
# 2. 원자적 교체 (rename은 원자적 연산)
os.replace(temp_file, holdings_file)
```
**안전 장치:**
- 임시 파일 사용 → 원본 파일 손상 방지
- `os.fsync()` → 디스크 동기화 보장
- `os.replace()` → 원자적 교체 (중단 시 원본 보존)
- `holdings_lock` → 스레드 안전성
---
## 2⃣ trades.json - 거래 히스토리
### 파일 구조
```json
[
{
"symbol": "KRW-BTC",
"side": "buy",
"amount_krw": 100000,
"dry_run": false,
"price": 50000000.0,
"status": "filled",
"timestamp": 1701234567.89,
"buy_price": 49000000.0,
"sell_price": 50000000.0,
"profit_rate": 2.04
}
]
```
### Upbit 동기화
**❌ 동기화 없음 (로컬 전용)**
- 이유: 히스토리 데이터로, Upbit API에서 조회 불필요
- 용도: 성과 분석, 수익률 계산, 백테스트
### 로컬 업데이트 시점
#### 1. 매수/매도 시 (`record_trade`)
```python
# src/signals.py line 235-276
def record_trade(trade: dict, trades_file: str = TRADES_FILE, critical: bool = True) -> None:
trades = []
# 1. 기존 trades 파일 로드
if os.path.exists(trades_file):
try:
with open(trades_file, "r", encoding="utf-8") as f:
trades = json.load(f)
except json.JSONDecodeError as e:
# 파일 손상 시 백업
logger.warning("거래기록 파일 손상 감지, 백업 후 새로 시작: %s", e)
backup_file = f"{trades_file}.corrupted.{int(time.time())}"
os.rename(trades_file, backup_file)
trades = []
trades.append(trade)
# 2. 원자적 쓰기
temp_file = f"{trades_file}.tmp"
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(trades, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno())
os.replace(temp_file, trades_file)
```
**호출 위치:**
- `src/signals.py` line 497: 매수 신호 발생 시
- `src/signals.py` line 534: 매도 신호 발생 시
- 매수/매도 완료, 시뮬레이션, 알림만 발생 모두 기록
### 원자적 쓰기 보장
- ✅ 임시 파일 사용 (`trades_file.tmp`)
- ✅ 디스크 동기화 (`os.fsync()`)
- ✅ 원자적 교체 (`os.replace()`)
- ✅ 손상 파일 자동 백업 (`.corrupted.timestamp`)
---
## 3⃣ pending_orders.json - 미확인 주문
### 파일 구조
```json
[
{
"token": "abc123def456...",
"order": {
"uuid": "order-uuid-from-upbit",
"market": "KRW-BTC",
"side": "buy",
"price": 50000000.0,
"volume": 0.001
},
"timestamp": 1701234567.89
}
]
```
### Upbit 동기화
**❌ 동기화 없음 (로컬 전용)**
- 이유: 사용자 확인 대기 토큰 저장용
- 용도: 주문 확인 대기 목록 관리
### 로컬 업데이트 시점
#### 주문 생성 시 (`_write_pending_order`)
```python
# src/order.py line 74-90
def _write_pending_order(token: str, order: dict, pending_file: str = PENDING_ORDERS_FILE):
with _pending_order_lock:
pending = []
# 기존 pending_orders 로드
if os.path.exists(pending_file):
with open(pending_file, "r", encoding="utf-8") as f:
try:
pending = json.load(f)
except Exception:
pending = []
# 새로운 주문 추가
pending.append({
"token": token,
"order": order,
"timestamp": time.time()
})
# 파일 저장
with open(pending_file, "w", encoding="utf-8") as f:
json.dump(pending, f, ensure_ascii=False, indent=2)
```
**호출 위치:**
- 주문 확인 필요 시 (`require_env_confirm=True` 설정 시)
- 현재 코드에서는 사용 흔적 없음 (레거시 기능)
### 주의사항
**⚠️ 원자적 쓰기 미구현**
- 임시 파일 미사용 → 중단 시 파일 손상 위험
- `os.fsync()` 미사용 → 디스크 동기화 보장 없음
- **개선 권장**: holdings.json과 동일한 방식 적용
---
## 🔄 전체 데이터 흐름
### 매수 시나리오
```
1. 매수 신호 발생 (signals.py)
2. execute_buy_order_with_confirmation() 호출
3. place_buy_order_upbit() → Upbit API 주문
4. monitor_order_upbit() → 체결 모니터링
5. add_new_holding() → holdings.json 업데이트 (로컬)
6. record_trade() → trades.json 기록 (로컬)
7. [다음 루프] fetch_holdings_from_upbit() → Upbit API로 동기화 ✅
8. save_holdings() → holdings.json 덮어쓰기 (동기화 완료)
```
### 매도 시나리오
```
1. 매도 조건 만족 (signals.py)
2. execute_sell_order_with_confirmation() 호출
3. place_sell_order_upbit() → Upbit API 주문
4. monitor_order_upbit() → 체결 모니터링
5. update_holding_amount() → holdings.json 수량 감소 (로컬)
6. record_trade() → trades.json 기록 (로컬)
7. [다음 루프] fetch_holdings_from_upbit() → Upbit API로 동기화 ✅
8. save_holdings() → holdings.json 덮어쓰기 (동기화 완료)
```
---
## 📊 동기화 전략 정리
| 동작 | holdings.json | trades.json | pending_orders.json |
|------|--------------|-------------|---------------------|
| **매수 직후** | 로컬 추가 (add_new_holding) | 로컬 기록 | 사용 안 함 |
| **매도 직후** | 로컬 업데이트 (update_holding_amount) | 로컬 기록 | 사용 안 함 |
| **다음 루프** | Upbit API 동기화 ✅ | 동기화 안 함 | 동기화 안 함 |
### holdings.json 동기화 흐름
```
로컬 파일 (주문 직후)
Upbit API (다음 루프)
로컬 파일 덮어쓰기 (동기화 완료)
```
**동기화 간격:** 약 1-5분 (루프 주기에 따름)
---
## ✅ 구현 품질 평가
### 잘된 점
| 항목 | 평가 | 구현 |
|------|------|------|
| **원자적 쓰기** | ✅ 우수 | holdings.json, trades.json |
| **재시도 로직** | ✅ 우수 | fetch_holdings_from_upbit (3회) |
| **스레드 안전성** | ✅ 우수 | holdings_lock, _pending_order_lock |
| **손상 파일 복구** | ✅ 우수 | trades.json 자동 백업 |
| **Upbit 동기화** | ✅ 구현됨 | holdings.json (매 루프) |
### 개선 가능한 점
| 항목 | 현재 상태 | 권장 개선 |
|------|----------|----------|
| **pending_orders.json** | ⚠️ 원자적 쓰기 없음 | 임시 파일 + os.fsync() 추가 |
| **trades.json** | ⚠️ 파일 크기 무한 증가 | 주기적 압축 또는 분할 저장 |
| **동기화 실패** | ⚠️ 매도 분석 건너뜀 | 로컬 데이터로 대체 로직 추가 |
---
## 🎯 결론
### ✅ **정상적으로 구현되어 있음**
1. **holdings.json**
- ✅ Upbit API 동기화: 매 루프마다
- ✅ 재시도 로직: 3회 exponential backoff
- ✅ 원자적 쓰기: 임시 파일 + os.replace()
- ✅ 스레드 안전성: holdings_lock
2. **trades.json**
- ✅ 원자적 쓰기: 임시 파일 + os.fsync()
- ✅ 손상 파일 자동 백업
- ❌ Upbit 동기화 없음 (로컬 전용, 정상)
3. **pending_orders.json**
- ⚠️ 원자적 쓰기 미구현 (개선 권장)
- ❌ Upbit 동기화 없음 (로컬 전용, 정상)
- 현재 사용 중단된 기능
### 동작 보장
- ✅ 매수/매도 직후: 로컬 파일 즉시 업데이트
- ✅ 다음 루프: Upbit API로 holdings 동기화
- ✅ 네트워크 오류 시: 재시도 3회 → 실패 시 안전하게 건너뜀
- ✅ 파일 손상 시: 자동 백업 + 복구
---
**최종 평가:** 🟢 **Production Ready**
현재 구현은 프로덕션 환경에서 안전하게 사용 가능합니다. Upbit API 동기화와 로컬 파일 저장이 적절히 분리되어 있으며, 원자적 쓰기와 재시도 로직으로 안정성이 확보되어 있습니다.