최초 프로젝트 업로드 (Script Auto Commit)

This commit is contained in:
2025-12-03 22:40:47 +09:00
commit dd9acf62a3
39 changed files with 5251 additions and 0 deletions

4
src/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .common import logger
from .indicators import fetch_ohlcv, compute_macd_hist, ta
from .signals import evaluate_sell_conditions
from .notifications import send_telegram

111
src/common.py Normal file
View File

@@ -0,0 +1,111 @@
import os
import logging
from pathlib import Path
import logging.handlers
import gzip
import shutil
LOG_DIR = os.getenv("LOG_DIR", "logs")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "AutoCoinTrader.log")
logger = logging.getLogger("macd_alarm")
_logger_configured = False
# 거래소 및 계산 상수
# 부동소수점 비교용 엡실론 (일반적 정밀도)
FLOAT_EPSILON = 1e-10
# 거래소별 최소 수량 (Upbit 기준)
MIN_TRADE_AMOUNT = 1e-8 # 0.00000001 (암호화폐 최소 단위)
# 최소 주문 금액 (KRW)
MIN_KRW_ORDER = 5000 # Upbit 최소 주문 금액
# 데이터 파일 경로 상수 (중앙 집중 관리)
DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)
HOLDINGS_FILE = str(DATA_DIR / "holdings.json")
TRADES_FILE = str(DATA_DIR / "trades.json")
PENDING_ORDERS_FILE = str(DATA_DIR / "pending_orders.json")
class CompressedRotatingFileHandler(logging.handlers.RotatingFileHandler):
"""RotatingFileHandler with gzip compression for rotated logs."""
def rotation_filename(self, default_name):
"""Append .gz to rotated log files."""
return default_name + ".gz"
def rotate(self, source, dest):
"""Compress the rotated log file."""
if os.path.exists(source):
with open(source, "rb") as f_in:
with gzip.open(dest, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(source)
def setup_logger(dry_run: bool):
"""
Configure logging with rotation and compression.
Args:
dry_run: If True, also output to console. If False, only to file.
Log Rotation Strategy:
- Size-based: 10MB per file, keep 7 backups (total ~80MB)
- Time-based: Daily rotation, keep 30 days
- Compression: Old logs are gzipped (saves ~70% space)
Log Levels (production recommendation):
- dry_run=True: INFO (development/testing)
- dry_run=False: WARNING (production - only important events)
"""
global logger, _logger_configured
if _logger_configured:
return
logger.handlers.clear()
# Use WARNING level for production, INFO for development
effective_level = getattr(logging, LOG_LEVEL, logging.INFO if dry_run else logging.WARNING)
logger.setLevel(effective_level)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(threadName)s] - %(message)s")
# Console handler (only in dry_run mode)
if dry_run:
ch = logging.StreamHandler()
ch.setLevel(effective_level)
ch.setFormatter(formatter)
logger.addHandler(ch)
# Size-based rotating file handler with compression
fh_size = CompressedRotatingFileHandler(
LOG_FILE, maxBytes=10 * 1024 * 1024, backupCount=7, encoding="utf-8" # 10MB per file # Keep 7 backups
)
fh_size.setLevel(effective_level)
fh_size.setFormatter(formatter)
logger.addHandler(fh_size)
# Time-based rotating file handler (daily rotation, keep 30 days)
daily_log_file = os.path.join(LOG_DIR, "AutoCoinTrader_daily.log")
fh_time = logging.handlers.TimedRotatingFileHandler(
daily_log_file, when="midnight", interval=1, backupCount=30, encoding="utf-8"
)
fh_time.setLevel(effective_level)
fh_time.setFormatter(formatter)
fh_time.suffix = "%Y-%m-%d" # Add date suffix to rotated files
logger.addHandler(fh_time)
_logger_configured = True
logger.info(
"[SYSTEM] 로그 설정 완료: level=%s, size_rotation=%dMB×%d, daily_rotation=%d",
logging.getLevelName(effective_level),
10,
7,
30,
)

259
src/config.py Normal file
View File

@@ -0,0 +1,259 @@
import os, json
from dataclasses import dataclass
from typing import Optional
from .common import logger
def get_env_or_none(key: str) -> str | None:
"""환경변수를 로드하되, 없으면 None 반환 (빈 문자열도 None 처리)"""
value = os.getenv(key)
return value if value else None
def get_default_config() -> dict:
"""기본 설정 반환 (config.json 로드 실패 시 사용)"""
return {
"symbols_file": "config/symbols.txt",
"symbol_delay": 1.0,
"candle_count": 200,
"buy_check_interval_minutes": 240,
"stop_loss_check_interval_minutes": 60,
"profit_taking_check_interval_minutes": 240,
"balance_warning_interval_hours": 24,
"min_amount_threshold": 1e-8,
"loop": True,
"dry_run": True,
"max_threads": 3,
"telegram_parse_mode": "HTML",
"trading_mode": "signal_only",
"auto_trade": {
"enabled": False,
"buy_enabled": False,
"buy_amount_krw": 10000,
"min_order_value_krw": 5000,
"telegram_max_retries": 3,
"order_monitor_max_errors": 5,
},
}
def load_config() -> dict:
paths = [os.path.join("config", "config.json"), "config.json"]
example_paths = [os.path.join("config", "config.example.json"), "config.example.json"]
for p in paths:
if os.path.exists(p):
try:
with open(p, "r", encoding="utf-8") as f:
cfg = json.load(f)
logger.info("설정 파일 로드: %s", p)
return cfg
except json.JSONDecodeError as e:
logger.error("설정 파일 JSON 파싱 실패: %s, 기본 설정 사용", e)
return get_default_config()
for p in example_paths:
if os.path.exists(p):
try:
with open(p, "r", encoding="utf-8") as f:
cfg = json.load(f)
logger.warning("기본 설정 없음; 예제 사용: %s", p)
return cfg
except json.JSONDecodeError:
pass
logger.warning("설정 파일 없음: config/config.json, 기본 설정 사용")
return get_default_config()
def read_symbols(path: str) -> list:
syms = []
syms_set = set()
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
s = line.strip()
if not s or s.startswith("#"):
continue
if s in syms_set:
logger.warning("[SYSTEM] 중복 심볼 무시: %s", s)
continue
syms_set.add(s)
syms.append(s)
logger.info("[SYSTEM] 심볼 %d개 로드: %s", len(syms), path)
except Exception as e:
logger.exception("[ERROR] 심볼 로드 실패: %s", e)
return syms
@dataclass(frozen=True)
class RuntimeConfig:
timeframe: str
indicator_timeframe: str
candle_count: int
symbol_delay: float
interval: int
loop: bool
dry_run: bool
max_threads: int
telegram_parse_mode: Optional[str]
trading_mode: str
telegram_bot_token: Optional[str]
telegram_chat_id: Optional[str]
upbit_access_key: Optional[str]
upbit_secret_key: Optional[str]
aggregate_alerts: bool = False
benchmark: bool = False
telegram_test: bool = False
config: dict = None # 원본 config 포함
def validate_telegram_token(token: str) -> bool:
"""텔레그램 봇 토큰 형식 검증 (형식: 숫자:영숫자_-)"""
import re
if not token:
return False
# 텔레그램 토큰 형식: 123456789:ABCdefGHIjklMNOpqrsTUVwxyz-_1234567
# 첫 부분: 8-10자리 숫자, 두 번째 부분: 35자 이상
pattern = r"^\d{8,10}:[A-Za-z0-9_-]{35,}$"
return bool(re.match(pattern, token))
def build_runtime_config(cfg_dict: dict) -> RuntimeConfig:
# 설정값 검증
candle_count = int(cfg_dict.get("candle_count", 200))
if candle_count < 1:
logger.warning("[WARNING] candle_count는 1 이상이어야 합니다. 기본값 200 사용")
candle_count = 200
max_threads = int(cfg_dict.get("max_threads", 3))
if max_threads < 1:
logger.warning("[WARNING] max_threads는 1 이상이어야 합니다. 기본값 3 사용")
max_threads = 3
symbol_delay = float(cfg_dict.get("symbol_delay", 1.0))
if symbol_delay < 0:
logger.warning("[WARNING] symbol_delay는 0 이상이어야 합니다. 기본값 1.0 사용")
symbol_delay = 1.0
# timeframe은 동적으로 결정되므로 기본값만 설정 (실제로는 매수/매도 주기에 따라 변경됨)
timeframe = "1h" # 기본값
aggregate_alerts = bool(cfg_dict.get("aggregate_alerts", False)) or bool(
os.getenv("AGGREGATE_ALERTS", "False").lower() in ("1", "true", "yes")
)
benchmark = bool(cfg_dict.get("benchmark", False))
telegram_test = os.getenv("TELEGRAM_TEST", "0") == "1"
# auto_trade 서브 설정 논리 관계 검증 및 교정
at = cfg_dict.get("auto_trade", {}) or {}
loss_threshold = float(at.get("loss_threshold", -5.0))
p1 = float(at.get("profit_threshold_1", 10.0))
p2 = float(at.get("profit_threshold_2", 30.0))
d1 = float(at.get("drawdown_1", 5.0))
d2 = float(at.get("drawdown_2", 15.0))
# 손절 임계값 검증 (음수여야 함)
if loss_threshold >= 0:
logger.warning("[WARNING] loss_threshold(%.2f)는 음수여야 합니다 (예: -5.0). 기본값 -5.0 적용", loss_threshold)
loss_threshold = -5.0
elif loss_threshold < -50:
logger.warning(
"[WARNING] loss_threshold(%.2f)가 너무 작습니다 (최대 손실 50%% 초과). " "극단적인 손절선입니다.",
loss_threshold,
)
# 수익률 임계값 검증 (양수, 순서 관계, 합리적 범위)
if p1 <= 0 or p2 <= 0:
logger.warning("[WARNING] 수익률 임계값은 양수여야 합니다 -> 기본값 10/30 재설정")
p1, p2 = 10.0, 30.0
elif p1 >= p2:
logger.warning(
"[WARNING] profit_threshold_1(%.2f) < profit_threshold_2(%.2f) 조건 위반 " "-> 기본값 10/30 적용", p1, p2
)
p1, p2 = 10.0, 30.0
elif p1 < 5 or p2 > 200:
logger.warning(
"[WARNING] 수익률 임계값 범위 권장 벗어남 (p1=%.2f, p2=%.2f). " "권장 범위: 5%% <= p1 < p2 <= 200%%", p1, p2
)
# 드로우다운 임계값 검증 (양수, 순서 관계, 합리적 범위)
if d1 <= 0 or d2 <= 0:
logger.warning("[WARNING] drawdown 임계값은 양수여야 합니다 -> 기본값 5/15 재설정")
d1, d2 = 5.0, 15.0
elif d1 >= d2:
logger.warning("[WARNING] drawdown_1(%.2f) < drawdown_2(%.2f) 조건 위반 -> 기본값 5/15 적용", d1, d2)
d1, d2 = 5.0, 15.0
elif d1 > 20 or d2 > 50:
logger.warning(
"[WARNING] drawdown 값이 너무 큽니다 (d1=%.2f, d2=%.2f). "
"최대 손실 허용치를 확인하세요. 권장: d1 <= 20%%, d2 <= 50%%",
d1,
d2,
)
# 교정된 값 반영
at["profit_threshold_1"] = p1
at["profit_threshold_2"] = p2
at["drawdown_1"] = d1
at["drawdown_2"] = d2
cfg_dict["auto_trade"] = at
# 환경변수 로드
telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
telegram_chat = os.getenv("TELEGRAM_CHAT_ID")
upbit_access = os.getenv("UPBIT_ACCESS_KEY")
upbit_secret = os.getenv("UPBIT_SECRET_KEY")
# dry_run 및 trading_mode 확인
dry_run = bool(cfg_dict.get("dry_run", False))
trading_mode = cfg_dict.get("trading_mode", "signal_only")
# 실거래 모드 시 필수 API 키 검증
if not dry_run and trading_mode in ("auto_trade", "mixed"):
if not (upbit_access and upbit_secret):
raise ValueError(
"[CRITICAL] 실거래 모드(dry_run=False)에서는 UPBIT_ACCESS_KEY 및 "
"UPBIT_SECRET_KEY 환경변수가 필수입니다. "
".env 파일을 확인하거나 환경변수를 설정하십시오."
)
logger.info(
"[SECURITY] Upbit API 키 로드 완료 (access_key 길이: %d, secret_key 길이: %d)",
len(upbit_access),
len(upbit_secret),
)
# 텔레그램 토큰 검증 (설정되어 있으면)
if telegram_token and not validate_telegram_token(telegram_token):
logger.warning(
"[WARNING] TELEGRAM_BOT_TOKEN 형식이 올바르지 않습니다 (형식: 숫자:영숫자_-). "
"알림 전송이 실패할 수 있습니다."
)
return RuntimeConfig(
timeframe=timeframe,
indicator_timeframe=timeframe,
candle_count=candle_count,
symbol_delay=symbol_delay,
interval=int(cfg_dict.get("interval", 60)),
loop=bool(cfg_dict.get("loop", False)),
dry_run=dry_run,
max_threads=max_threads,
telegram_parse_mode=cfg_dict.get("telegram_parse_mode"),
trading_mode=trading_mode,
telegram_bot_token=telegram_token,
telegram_chat_id=telegram_chat,
upbit_access_key=upbit_access,
upbit_secret_key=upbit_secret,
aggregate_alerts=aggregate_alerts,
benchmark=benchmark,
telegram_test=telegram_test,
config=cfg_dict,
)
def get_symbols_file(config: dict) -> str:
"""Determine the symbols file path with fallback logic."""
default_path = "config/symbols.txt" if os.path.exists("config/symbols.txt") else "symbols.txt"
return config.get("symbols_file", default_path)
# Define valid trading modes as constants
TRADING_MODES = ("signal_only", "auto_trade", "mixed")

333
src/holdings.py Normal file
View File

@@ -0,0 +1,333 @@
import os, json, pyupbit
from .common import logger, MIN_TRADE_AMOUNT, FLOAT_EPSILON, HOLDINGS_FILE
from .retry_utils import retry_with_backoff
import threading
# 부동소수점 비교용 임계값 (MIN_TRADE_AMOUNT와 동일한 용도)
EPSILON = FLOAT_EPSILON
# 파일 잠금을 위한 RLock 객체 (재진입 가능)
holdings_lock = threading.RLock()
def _load_holdings_unsafe(holdings_file: str) -> dict[str, dict]:
"""내부 사용 전용: Lock 없이 holdings 파일 로드"""
if os.path.exists(holdings_file):
if os.path.getsize(holdings_file) == 0:
logger.debug("[DEBUG] 보유 파일이 비어있습니다: %s", holdings_file)
return {}
with open(holdings_file, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def load_holdings(holdings_file: str = HOLDINGS_FILE) -> dict[str, dict]:
"""
holdings 파일을 로드합니다.
Returns:
심볼별 보유 정보 (symbol -> {amount, buy_price, ...})
"""
try:
with holdings_lock:
return _load_holdings_unsafe(holdings_file)
except json.JSONDecodeError as e:
logger.error("[ERROR] 보유 파일 JSON 디코드 실패: %s", e)
except Exception as e:
logger.exception("[ERROR] 보유 파일 로드 중 예외 발생: %s", e)
return {}
def _save_holdings_unsafe(holdings: dict[str, dict], holdings_file: str) -> None:
"""내부 사용 전용: Lock 없이 holdings 파일 저장 (원자적 쓰기)"""
os.makedirs(os.path.dirname(holdings_file) or ".", exist_ok=True)
temp_file = f"{holdings_file}.tmp"
try:
# 임시 파일에 먼저 쓰기
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(holdings, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno()) # 디스크 동기화 보장
# 원자적 교체 (rename은 원자적 연산)
os.replace(temp_file, holdings_file)
logger.debug("[DEBUG] 보유 저장 (원자적): %s", holdings_file)
except Exception as e:
logger.error("[ERROR] 보유 저장 중 오류: %s", e)
# 임시 파일 정리
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except Exception:
pass
raise
def save_holdings(holdings: dict[str, dict], holdings_file: str = HOLDINGS_FILE) -> None:
"""스레드 안전 + 원자적 파일 쓰기로 holdings 저장"""
try:
with holdings_lock:
_save_holdings_unsafe(holdings, holdings_file)
except Exception as e:
logger.error("[ERROR] 보유 저장 실패: %s", e)
raise # 호출자가 저장 실패를 인지하도록 예외 재발생
def get_upbit_balances(cfg: "RuntimeConfig") -> dict | None:
try:
if not (cfg.upbit_access_key and cfg.upbit_secret_key):
logger.debug("API 키 없음 - 빈 balances")
return {}
upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key)
balances = upbit.get_balances()
# 타입 체크: balances가 리스트가 아닐 경우
if not isinstance(balances, list):
logger.error("Upbit balances 형식 오류: 예상(list), 실제(%s)", type(balances).__name__)
return None
result = {}
for item in balances:
currency = (item.get("currency") or "").upper()
try:
balance = float(item.get("balance", 0))
except Exception:
balance = 0.0
if balance <= MIN_TRADE_AMOUNT:
continue
result[currency] = balance
logger.debug("Upbit 보유 %d", len(result))
return result
except Exception as e:
logger.error("Upbit balances 실패: %s", e)
return None
def get_current_price(symbol: str) -> float:
try:
if symbol.upper().startswith("KRW-"):
market = symbol.upper()
else:
market = f"KRW-{symbol.replace('KRW-','').upper()}"
# 실시간 현재가(ticker)를 조회하도록 변경
price = pyupbit.get_current_price(market)
logger.debug("[DEBUG] 현재가 %s -> %.2f", market, price)
return float(price) if price else 0.0
except Exception as e:
logger.warning("[WARNING] 현재가 조회 실패 %s: %s", symbol, e)
return 0.0
def add_new_holding(
symbol: str, buy_price: float, amount: float, buy_timestamp: float | None = None, holdings_file: str = HOLDINGS_FILE
) -> bool:
"""
새로운 보유 자산을 추가하거나 기존 보유량을 추가합니다.
Args:
symbol: 거래 심볼 (예: KRW-BTC)
buy_price: 평균 매수가
amount: 매수한 수량
buy_timestamp: 매수 시각 (None이면 현재 시각 사용)
holdings_file: 보유 파일 경로
Returns:
성공 여부 (True/False)
"""
try:
import time
timestamp = buy_timestamp if buy_timestamp is not None else time.time()
with holdings_lock:
holdings = _load_holdings_unsafe(holdings_file)
if symbol in holdings:
# 기존 보유가 있으면 평균 매수가와 수량 업데이트
prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0)
prev_price = float(holdings[symbol].get("buy_price", 0.0) or 0.0)
total_amount = prev_amount + amount
if total_amount > 0:
# 가중 평균 매수가 계산
new_avg_price = ((prev_price * prev_amount) + (buy_price * amount)) / total_amount
holdings[symbol]["buy_price"] = new_avg_price
holdings[symbol]["amount"] = total_amount
# max_price 갱신: 현재 매수가와 기존 max_price 중 큰 값
prev_max = float(holdings[symbol].get("max_price", 0.0) or 0.0)
holdings[symbol]["max_price"] = max(new_avg_price, prev_max)
logger.info(
"[INFO] [%s] holdings 추가 매수: 평균가 %.2f -> %.2f, 수량 %.8f -> %.8f",
symbol,
prev_price,
new_avg_price,
prev_amount,
total_amount,
)
else:
# 신규 보유 추가
holdings[symbol] = {
"buy_price": buy_price,
"amount": amount,
"max_price": buy_price,
"buy_timestamp": timestamp,
"partial_sell_done": False,
}
logger.info("[INFO] [%s] holdings 신규 추가: 매수가=%.2f, 수량=%.8f", symbol, buy_price, amount)
_save_holdings_unsafe(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[ERROR] [%s] holdings 추가 실패: %s", symbol, e)
return False
def update_holding_amount(
symbol: str, amount_change: float, holdings_file: str = HOLDINGS_FILE, min_amount_threshold: float = 1e-8
) -> bool:
"""
보유 자산의 수량을 변경합니다 (매도 시 음수 값 전달).
수량이 0 이하가 되면 해당 심볼을 holdings에서 제거합니다.
Args:
symbol: 거래 심볼 (예: KRW-BTC)
amount_change: 변경할 수량 (매도 시 음수, 매수 시 양수)
holdings_file: 보유 파일 경로
min_amount_threshold: 0으로 간주할 최소 수량 임계값
Returns:
성공 여부 (True/False)
"""
try:
with holdings_lock:
holdings = _load_holdings_unsafe(holdings_file)
if symbol not in holdings:
logger.warning("[WARNING] [%s] holdings에 존재하지 않아 수량 업데이트 건너뜀", symbol)
return False
prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0)
new_amount = max(0.0, prev_amount + amount_change)
if new_amount <= min_amount_threshold: # 거의 0이면 제거
holdings.pop(symbol, None)
logger.info(
"[INFO] [%s] holdings 업데이트: 전량 매도 완료, 보유 제거 (이전: %.8f, 변경: %.8f)",
symbol,
prev_amount,
amount_change,
)
else:
holdings[symbol]["amount"] = new_amount
logger.info(
"[INFO] [%s] holdings 업데이트: 수량 변경 %.8f -> %.8f (변경량: %.8f)",
symbol,
prev_amount,
new_amount,
amount_change,
)
_save_holdings_unsafe(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[ERROR] [%s] holdings 수량 업데이트 실패: %s", symbol, e)
return False
def set_holding_field(symbol: str, key: str, value, holdings_file: str = HOLDINGS_FILE) -> bool:
"""
보유 자산의 특정 필드 값을 설정합니다.
Args:
symbol: 거래 심볼 (예: KRW-BTC)
key: 설정할 필드의 키 (예: "partial_sell_done")
value: 설정할 값
holdings_file: 보유 파일 경로
Returns:
성공 여부 (True/False)
"""
try:
with holdings_lock:
holdings = _load_holdings_unsafe(holdings_file)
if symbol not in holdings:
logger.warning("[WARNING] [%s] holdings에 존재하지 않아 필드 설정 건너뜀", symbol)
return False
holdings[symbol][key] = value
logger.info("[INFO] [%s] holdings 업데이트: 필드 '%s''%s'(으)로 설정", symbol, key, value)
_save_holdings_unsafe(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[ERROR] [%s] holdings 필드 설정 실패: %s", symbol, e)
return False
@retry_with_backoff(max_attempts=3, base_delay=2.0, max_delay=10.0, exceptions=(Exception,))
def fetch_holdings_from_upbit(cfg: "RuntimeConfig") -> dict | None:
try:
if not (cfg.upbit_access_key and cfg.upbit_secret_key):
logger.debug("[DEBUG] API 키 없어 Upbit holdings 사용 안함")
return {}
upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key)
balances = upbit.get_balances()
# 타입 체크: balances가 리스트가 아닐 경우
if not isinstance(balances, list):
logger.error(
"[ERROR] Upbit balances 형식 오류: 예상(list), 실제(%s), 값=%s", type(balances).__name__, balances
)
return None
holdings = {}
# 기존 holdings 파일에서 max_price 불러오기
existing_holdings = load_holdings(HOLDINGS_FILE)
for item in balances:
currency = (item.get("currency") or "").upper()
if currency == "KRW":
continue
try:
amount = float(item.get("balance", 0))
except Exception:
amount = 0.0
if amount <= EPSILON:
continue
buy_price = None
if item.get("avg_buy_price_krw"):
try:
buy_price = float(item.get("avg_buy_price_krw"))
except Exception:
buy_price = None
if buy_price is None and item.get("avg_buy_price"):
try:
buy_price = float(item.get("avg_buy_price"))
except Exception:
buy_price = None
market = f"KRW-{currency}"
# 기존 max_price 유지 (실시간 가격은 매도 검사 시점에 조회)
prev_max_price = None
if existing_holdings and market in existing_holdings:
prev_max_price = existing_holdings[market].get("max_price")
if prev_max_price is not None:
try:
prev_max_price = float(prev_max_price)
except Exception:
prev_max_price = None
# max_price는 기존 값 유지 또는 buy_price 사용
max_price = prev_max_price if prev_max_price is not None else (buy_price or 0)
holdings[market] = {
"buy_price": buy_price or 0.0,
"amount": amount,
"max_price": max_price,
"buy_timestamp": None,
}
logger.debug("[DEBUG] Upbit holdings %d", len(holdings))
return holdings
except Exception as e:
logger.error("[ERROR] fetch_holdings 실패: %s", e)
return None

167
src/indicators.py Normal file
View File

@@ -0,0 +1,167 @@
import os
import time
import random
import threading
import pandas as pd
import pandas_ta as ta
import pyupbit
from requests.exceptions import RequestException, Timeout, ConnectionError
from .common import logger
__all__ = ["fetch_ohlcv", "compute_macd_hist", "compute_sma", "ta", "DataFetchError", "clear_ohlcv_cache"]
class DataFetchError(Exception):
"""Custom exception for data fetching failures."""
pass
# OHLCV 데이터 캐시 (TTL 5분)
_ohlcv_cache = {}
_cache_lock = threading.RLock() # 캐시 동시 접근 보호 (재진입 가능)
CACHE_TTL = 300 # 5분
def clear_ohlcv_cache():
"""캐시 초기화 (테스트 또는 주문 체결 후 호출)"""
global _ohlcv_cache
with _cache_lock:
_ohlcv_cache.clear()
logger.debug("[CACHE] OHLCV 캐시 초기화")
def _clean_expired_cache():
"""만료된 캐시 항목 제거"""
global _ohlcv_cache
with _cache_lock:
now = time.time()
expired_keys = [k for k, (_, cached_time) in _ohlcv_cache.items() if now - cached_time >= CACHE_TTL]
for k in expired_keys:
del _ohlcv_cache[k]
if expired_keys:
logger.debug("[CACHE] 만료된 캐시 %d개 제거", len(expired_keys))
def fetch_ohlcv(
symbol: str, timeframe: str, limit: int = 200, log_buffer: list = None, use_cache: bool = True
) -> pd.DataFrame:
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
# 캐시 확인
cache_key = (symbol, timeframe, limit)
now = time.time()
if use_cache and cache_key in _ohlcv_cache:
cached_df, cached_time = _ohlcv_cache[cache_key]
if now - cached_time < CACHE_TTL:
_buf("debug", f"[CACHE HIT] OHLCV: {symbol} {timeframe} (age: {int(now - cached_time)}s)")
return cached_df.copy() # 복사본 반환으로 원본 보호
else:
# 만료된 캐시 제거
del _ohlcv_cache[cache_key]
_buf("debug", f"[CACHE EXPIRED] OHLCV: {symbol} {timeframe}")
# 주기적으로 만료 캐시 정리
if len(_ohlcv_cache) > 10:
_clean_expired_cache()
_buf("debug", f"[CACHE MISS] OHLCV 수집 시작: {symbol} {timeframe}")
tf_map = {
"1m": "minute1",
"3m": "minute3",
"5m": "minute5",
"15m": "minute15",
"30m": "minute30",
"1h": "minute60",
"4h": "minute240",
"1d": "day",
"1w": "week",
}
py_tf = tf_map.get(timeframe, timeframe)
max_attempts = int(os.getenv("MAX_FETCH_ATTEMPTS", "5"))
base_backoff = float(os.getenv("BASE_BACKOFF", "1.0"))
jitter_factor = float(os.getenv("BACKOFF_JITTER", "0.5"))
max_total_backoff = float(os.getenv("MAX_TOTAL_BACKOFF", "300"))
cumulative_sleep = 0.0
for attempt in range(1, max_attempts + 1):
try:
df = pyupbit.get_ohlcv(symbol, interval=py_tf, count=limit)
if df is None or df.empty:
_buf("warning", f"OHLCV 빈 결과: {symbol}")
raise RuntimeError("empty ohlcv")
# 'close' 컬럼 검증 및 안전한 처리
if "close" not in df.columns:
if len(df.columns) >= 4:
# pyupbit OHLCV 순서: open(0), high(1), low(2), close(3), volume(4)
df = df.rename(columns={df.columns[3]: "close"})
_buf("warning", f"'close' 컬럼 누락, 4번째 컬럼 사용: {symbol}")
else:
raise DataFetchError(f"OHLCV 데이터에 'close' 컬럼이 없고, 컬럼 수가 4개 미만: {symbol}")
# 캐시 저장 (Lock 보호)
if use_cache:
with _cache_lock:
_ohlcv_cache[cache_key] = (df.copy(), time.time())
_buf("debug", f"[CACHE SAVE] OHLCV: {symbol} {timeframe}")
_buf("debug", f"OHLCV 수집 완료: {symbol}")
return df
except Exception as e:
is_network_err = isinstance(e, (RequestException, Timeout, ConnectionError))
_buf("warning", f"OHLCV 수집 실패 (시도 {attempt}/{max_attempts}): {symbol} -> {e}")
if not is_network_err:
_buf("error", f"네트워크 비관련 오류; 재시도하지 않음: {e}")
raise DataFetchError(f"네트워크 비관련 오류로 OHLCV 수집 실패: {e}")
if attempt == max_attempts:
_buf("error", f"OHLCV: 최대 재시도 도달 ({symbol})")
raise DataFetchError(f"OHLCV 수집 최대 재시도({max_attempts}) 도달: {symbol}")
sleep_time = base_backoff * (2 ** (attempt - 1))
sleep_time = sleep_time + random.uniform(0, jitter_factor * sleep_time)
if cumulative_sleep + sleep_time > max_total_backoff:
logger.warning("누적 재시도 대기시간 초과 (%s)", symbol)
raise DataFetchError(f"OHLCV 수집 누적 대기시간 초과: {symbol}")
cumulative_sleep += sleep_time
_buf("debug", f"{sleep_time:.2f}초 후 재시도")
time.sleep(sleep_time)
raise DataFetchError(f"OHLCV 수집 로직의 마지막에 도달했습니다. 이는 발생해서는 안 됩니다: {symbol}")
def compute_macd_hist(close_series: pd.Series, log_buffer: list = None) -> pd.Series:
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
try:
macd_df = ta.macd(close_series, fast=12, slow=26, signal=9)
hist_cols = [c for c in macd_df.columns if "MACDh" in c]
if not hist_cols:
_buf("error", "MACD histogram column not found")
raise ValueError("MACD histogram 컬럼을 찾을 수 없습니다")
return macd_df[hist_cols[0]]
except Exception as e:
_buf("error", f"MACD 계산 실패: {e}")
raise # 예외를 호출자에게 전파하여 명시적 처리 강제
def compute_sma(close_series: pd.Series, window: int, log_buffer: list = None) -> pd.Series:
"""단순 이동평균선(SMA) 계산"""
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
try:
return close_series.rolling(window=window).mean()
except Exception as e:
_buf("error", f"SMA{window} 계산 실패: {e}")
raise # 예외를 호출자에게 전파하여 명시적 처리 강제

108
src/notifications.py Normal file
View File

@@ -0,0 +1,108 @@
import os
import threading
import time
import requests
from .common import logger
__all__ = ["send_telegram", "send_telegram_with_retry", "report_error", "send_startup_test_message"]
def send_telegram_with_retry(
token: str,
chat_id: str,
text: str,
add_thread_prefix: bool = True,
parse_mode: str = None,
max_retries: int | None = None,
) -> bool:
"""
재시도 로직이 포함된 텔레그램 메시지 전송
Args:
token: 텔레그램 봇 토큰
chat_id: 채팅 ID
text: 메시지 내용
add_thread_prefix: 스레드 prefix 추가 여부
parse_mode: HTML/Markdown 파싱 모드
max_retries: 최대 재시도 횟수 (None이면 기본값 3)
Returns:
성공 여부 (True/False)
"""
if max_retries is None:
max_retries = 3
for attempt in range(max_retries):
try:
# 이제 send_telegram은 실패 시 예외를 발생시킴
send_telegram(token, chat_id, text, add_thread_prefix, parse_mode)
return True
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2**attempt # Exponential backoff: 1s, 2s, 4s
logger.warning(
"텔레그램 전송 실패 (시도 %d/%d), %d초 후 재시도: %s", attempt + 1, max_retries, wait_time, e
)
time.sleep(wait_time)
else:
logger.error("텔레그램 전송 최종 실패 (%d회 시도): %s", max_retries, e)
return False
return False
def send_telegram(token: str, chat_id: str, text: str, add_thread_prefix: bool = True, parse_mode: str = None):
"""
텔레그램 메시지를 한 번 전송합니다. 실패 시 예외를 발생시킵니다.
"""
if add_thread_prefix:
thread_name = threading.current_thread().name
# 기본 Thread-N 이름이면 prefix 생략 (의미 없는 정보)
if not thread_name.startswith("Thread-"):
payload_text = f"[{thread_name}] {text}"
else:
payload_text = text
else:
payload_text = text
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": payload_text}
if parse_mode:
payload["parse_mode"] = parse_mode
try:
resp = requests.post(url, json=payload, timeout=10)
resp.raise_for_status() # 2xx 상태 코드가 아니면 HTTPError 발생
logger.debug("텔레그램 메시지 전송 성공: %s", text[:80])
return True
except requests.exceptions.RequestException as e:
logger.warning("텔레그램 API 요청 실패: %s", e)
raise # 예외를 다시 발생시켜 호출자가 처리하도록 함
def report_error(bot_token: str, chat_id: str, message: str, dry_run: bool):
"""
Report an error via Telegram.
"""
if not dry_run and bot_token and chat_id:
# 재시도 로직이 포함된 함수 사용
send_telegram_with_retry(bot_token, chat_id, message, add_thread_prefix=True)
def send_startup_test_message(bot_token: str, chat_id: str, parse_mode: str, dry_run: bool):
"""
Send a startup test message to verify Telegram settings.
"""
if dry_run:
logger.info("[dry-run] Telegram 테스트 메시지 전송 생략")
return
if bot_token and chat_id:
test_msg = "[테스트] Telegram 설정 확인용 메시지입니다. 봇/채팅 설정이 올바르면 이 메시지가 도착합니다."
logger.info("텔레그램 테스트 메시지 전송 시도")
# 재시도 로직이 포함된 함수 사용
if send_telegram_with_retry(bot_token, chat_id, test_msg, add_thread_prefix=False, parse_mode=parse_mode):
logger.info("텔레그램 테스트 메시지 전송 성공")
else:
logger.warning("텔레그램 테스트 메시지 전송 실패")
else:
logger.warning("TELEGRAM_TEST=1 이지만 TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID가 설정되어 있지 않습니다")

759
src/order.py Normal file
View File

@@ -0,0 +1,759 @@
import os
import time
import json
import secrets
import threading
import pyupbit
from .common import logger, MIN_KRW_ORDER, HOLDINGS_FILE, TRADES_FILE, PENDING_ORDERS_FILE
from .notifications import send_telegram
def adjust_price_to_tick_size(price: float) -> float:
"""
Upbit 호가 단위에 맞춰 가격을 조정합니다.
pyupbit.get_tick_size를 사용하여 실시간 호가 단위를 가져옵니다.
"""
try:
tick_size = pyupbit.get_tick_size(price)
adjusted_price = round(price / tick_size) * tick_size
return adjusted_price
except Exception as e:
logger.warning("호가 단위 조정 실패: %s. 원본 가격 사용.", e)
return price
def _make_confirm_token(length: int = 16) -> str:
return secrets.token_hex(length)
_pending_order_lock = threading.Lock()
def _write_pending_order(token: str, order: dict, pending_file: str = PENDING_ORDERS_FILE):
with _pending_order_lock:
try:
pending = []
if os.path.exists(pending_file):
with open(pending_file, "r", encoding="utf-8") as f:
try:
pending = json.load(f)
except Exception:
pending = []
pending.append({"token": token, "order": order, "timestamp": time.time()})
with open(pending_file, "w", encoding="utf-8") as f:
json.dump(pending, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.exception("pending_orders 기록 실패: %s", e)
_confirmation_lock = threading.Lock()
def _check_confirmation(token: str, timeout: int = 300) -> bool:
start = time.time()
confirm_file = f"confirm_{token}"
while time.time() - start < timeout:
# 1. Atomic file check
try:
os.rename(confirm_file, f"{confirm_file}.processed")
logger.info("토큰 파일 확인 성공: %s", confirm_file)
return True
except FileNotFoundError:
pass
except Exception as e:
logger.warning("토큰 파일 처리 오류: %s", e)
time.sleep(2)
return False
def notify_order_result(
symbol: str, monitor_result: dict, config: dict, telegram_token: str, telegram_chat_id: str
) -> bool:
if not (telegram_token and telegram_chat_id):
return False
notify_cfg = config.get("notify", {}) if config else {}
final_status = monitor_result.get("final_status", "unknown")
filled = monitor_result.get("filled_volume", 0.0)
remaining = monitor_result.get("remaining_volume", 0.0)
attempts = monitor_result.get("attempts", 0)
should_notify = False
msg = ""
if final_status == "filled" and notify_cfg.get("order_filled", True):
should_notify = True
msg = f"[주문완료] {symbol}\n체결량: {filled:.8f}\n상태: 완전체결"
elif final_status in ("partial", "timeout", "cancelled"):
if final_status == "partial" and notify_cfg.get("order_partial", True):
should_notify = True
msg = f"[부분체결] {symbol}\n체결량: {filled:.8f}\n잔여량: {remaining:.8f}"
elif final_status == "timeout" and notify_cfg.get("order_partial", True):
should_notify = True
msg = f"[타임아웃] {symbol}\n체결량: {filled:.8f}\n잔여량: {remaining:.8f}\n재시도: {attempts}"
elif final_status == "cancelled" and notify_cfg.get("order_cancelled", True):
should_notify = True
msg = f"[주문취소] {symbol}\n취소 사유: 사용자 미확인 또는 오류"
elif final_status in ("error", "unknown") and notify_cfg.get("order_error", True):
should_notify = True
msg = f"[주문오류] {symbol}\n상태: {final_status}\n마지막 확인: {monitor_result.get('last_checked', 'N/A')}"
if should_notify and msg:
try:
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False)
return True
except Exception as e:
logger.exception("주문 결과 알림 전송 실패: %s", e)
return False
return False
def _calculate_and_add_profit_rate(trade_record: dict, symbol: str, monitor: dict):
"""
매도 거래 기록에 수익률 정보를 계산하여 추가합니다.
"""
try:
from .holdings import load_holdings, get_current_price
holdings = load_holdings(HOLDINGS_FILE)
if symbol not in holdings:
return
buy_price = float(holdings[symbol].get("buy_price", 0.0) or 0.0)
# 실제 평균 매도가 계산
sell_price = 0.0
if monitor and monitor.get("last_order"):
last_order = monitor["last_order"]
trades = last_order.get("trades", [])
if trades:
total_krw = sum(float(t.get("price", 0)) * float(t.get("volume", 0)) for t in trades)
total_volume = sum(float(t.get("volume", 0)) for t in trades)
if total_volume > 0:
sell_price = total_krw / total_volume
# 매도가가 없으면 현재가 사용
if sell_price <= 0:
sell_price = get_current_price(symbol)
# 수익률 계산 및 기록 추가
if buy_price > 0 and sell_price > 0:
profit_rate = ((sell_price - buy_price) / buy_price) * 100
trade_record["buy_price"] = buy_price
trade_record["sell_price"] = sell_price
trade_record["profit_rate"] = round(profit_rate, 2)
logger.info(
"[%s] 매도 수익률: %.2f%% (매수가: %.2f, 매도가: %.2f)", symbol, profit_rate, buy_price, sell_price
)
else:
logger.warning("[%s] 수익률 계산 불가: buy_price=%.2f, sell_price=%.2f", symbol, buy_price, sell_price)
trade_record["profit_rate"] = None
except Exception as e:
logger.warning("매도 수익률 계산 중 오류 (기록은 계속 진행): %s", e)
trade_record["profit_rate"] = None
def place_buy_order_upbit(market: str, amount_krw: float, cfg: "RuntimeConfig") -> dict:
"""
Upbit API를 이용한 매수 주문 (시장가 또는 지정가)
Args:
market: 거래 시장 (예: KRW-BTC)
amount_krw: 매수할 KRW 금액
cfg: RuntimeConfig 객체
Returns:
주문 결과 딕셔너리
"""
from .holdings import get_current_price
# config에서 buy_price_slippage_pct 읽기
auto_trade_cfg = cfg.config.get("auto_trade", {})
slippage_pct = float(auto_trade_cfg.get("buy_price_slippage_pct", 0.0))
if cfg.dry_run:
price = get_current_price(market)
limit_price = price * (1 + slippage_pct / 100) if price > 0 and slippage_pct > 0 else price
logger.info(
"[place_buy_order_upbit][dry-run] %s 매수 금액=%.2f KRW, 지정가=%.2f", market, amount_krw, limit_price
)
return {
"market": market,
"side": "buy",
"amount_krw": amount_krw,
"price": limit_price,
"status": "simulated",
"timestamp": time.time(),
}
if not (cfg.upbit_access_key and cfg.upbit_secret_key):
msg = "Upbit API 키 없음: 매수 주문을 실행할 수 없습니다"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
try:
upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key)
price = get_current_price(market)
# 현재가 검증
if price <= 0:
msg = f"[매수 실패] {market}: 현재가 조회 실패 (price={price})"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
limit_price = price * (1 + slippage_pct / 100) if price > 0 and slippage_pct > 0 else price
resp = None
if slippage_pct > 0 and limit_price > 0:
# 지정가 매수: 호가 단위에 맞춰 가격 조정
adjusted_limit_price = adjust_price_to_tick_size(limit_price)
volume = amount_krw / adjusted_limit_price
# 🔒 안전성 검증: 파라미터 최종 확인
if adjusted_limit_price <= 0 or volume <= 0:
msg = f"[매수 실패] {market}: 비정상 파라미터 (price={adjusted_limit_price}, volume={volume})"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
# pyupbit API: buy_limit_order(ticker, price, volume)
# - ticker: 마켓 심볼 (예: "KRW-BTC")
# - price: 지정가 (KRW, 예: 50000000)
# - volume: 매수 수량 (코인 개수, 예: 0.001)
logger.info(
"[매수 주문 전 검증] %s | 지정가=%.2f KRW | 수량=%.8f개 | 예상 총액=%.2f KRW",
market,
adjusted_limit_price,
volume,
adjusted_limit_price * volume,
)
resp = upbit.buy_limit_order(market, adjusted_limit_price, volume)
logger.info(
"✅ Upbit 지정가 매수 주문 완료: %s | 지정가=%.2f (조정전: %.2f) | 수량=%.8f | 목표금액=%.2f KRW",
market,
adjusted_limit_price,
limit_price,
volume,
amount_krw,
)
else:
# 시장가 매수: amount_krw 금액만큼 시장가로 매수
# pyupbit API: buy_market_order(ticker, price)
# - ticker: 마켓 심볼
# - price: 매수할 KRW 금액 (예: 15000)
logger.info("[매수 주문 전 검증] %s | 시장가 매수 | 금액=%.2f KRW", market, amount_krw)
resp = upbit.buy_market_order(market, amount_krw)
logger.info("✅ Upbit 시장가 매수 주문 완료: %s | 금액=%.2f KRW", market, amount_krw)
if isinstance(resp, dict):
order_uuid = resp.get("uuid")
logger.info("Upbit 주문 응답: uuid=%s", order_uuid)
else:
logger.info("Upbit 주문 응답: %s", resp)
result = {
"market": market,
"side": "buy",
"amount_krw": amount_krw,
"price": limit_price if slippage_pct > 0 else None,
"status": "placed",
"response": resp,
"timestamp": time.time(),
}
try:
order_uuid = None
if isinstance(resp, dict):
order_uuid = resp.get("uuid") or resp.get("id") or resp.get("txid")
if order_uuid:
monitor_res = monitor_order_upbit(order_uuid, cfg.upbit_access_key, cfg.upbit_secret_key)
result["monitor"] = monitor_res
result["status"] = monitor_res.get("final_status", result["status"]) or result["status"]
except Exception:
logger.debug("매수 주문 모니터링 중 예외 발생", exc_info=True)
return result
except Exception as e:
logger.exception("Upbit 매수 주문 실패: %s", e)
return {"error": str(e), "status": "failed", "timestamp": time.time()}
def place_sell_order_upbit(market: str, amount: float, cfg: "RuntimeConfig") -> dict:
"""
Upbit API를 이용한 시장가 매도 주문
Args:
market: 거래 시장 (예: KRW-BTC)
amount: 매도할 코인 수량
cfg: RuntimeConfig 객체
Returns:
주문 결과 딕셔너리
"""
if cfg.dry_run:
logger.info("[place_sell_order_upbit][dry-run] %s 매도 수량=%.8f", market, amount)
return {"market": market, "side": "sell", "amount": amount, "status": "simulated", "timestamp": time.time()}
if not (cfg.upbit_access_key and cfg.upbit_secret_key):
msg = "Upbit API 키 없음: 매도 주문을 실행할 수 없습니다"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
try:
upbit = pyupbit.Upbit(cfg.upbit_access_key, cfg.upbit_secret_key)
# 최소 주문 금액(설정값, 기본 5,000 KRW) 이하일 경우 매도 건너뜀
try:
from .holdings import get_current_price
current_price = float(get_current_price(market))
except Exception:
current_price = 0.0
# 현재가 조회 실패 시 안전하게 매도 차단
if current_price <= 0:
msg = f"[매도 실패] {market}\n사유: 현재가 조회 실패\n매도 수량: {amount:.8f}"
logger.error(msg)
return {
"market": market,
"side": "sell",
"amount": amount,
"status": "failed",
"error": "price_unavailable",
"timestamp": time.time(),
}
estimated_value = amount * current_price
# 최소 주문 금액 안전 파싱 (누락/형식 오류 대비)
raw_min = cfg.config.get("auto_trade", {}).get("min_order_value_krw")
try:
min_order_value = float(raw_min)
except (TypeError, ValueError):
logger.warning(
"[WARNING] min_order_value_krw 설정 누락/비정상 -> 기본값 %d 사용 (raw=%s)", MIN_KRW_ORDER, raw_min
)
min_order_value = float(MIN_KRW_ORDER)
if estimated_value < min_order_value:
msg = f"[매도 건너뜀] {market}\n사유: 최소 주문 금액 미만\n추정 금액: {estimated_value:.0f} KRW < 최소 {min_order_value:.0f} KRW\n매도 수량: {amount:.8f}"
logger.warning(msg)
return {
"market": market,
"side": "sell",
"amount": amount,
"status": "skipped_too_small",
"reason": "min_order_value",
"estimated_value": estimated_value,
"timestamp": time.time(),
}
# ===== 매도 API 안전 검증 (Critical Safety Check) =====
# pyupbit API: sell_market_order(ticker, volume)
# - ticker: 마켓 코드 (예: "KRW-BTC")
# - volume: 매도할 코인 수량 (개수, not KRW)
# 잘못된 사용 예시: sell_market_order("KRW-BTC", 500000) → BTC 500,000개 매도 시도! ❌
# 올바른 사용 예시: sell_market_order("KRW-BTC", 0.01) → BTC 0.01개 매도 ✅
if amount <= 0:
msg = f"[매도 실패] {market}: 비정상 수량 (amount={amount})"
logger.error(msg)
return {
"market": market,
"side": "sell",
"amount": amount,
"status": "failed",
"error": "invalid_amount",
"timestamp": time.time(),
}
# 매도 전 파라미터 검증 로그 (안전장치)
logger.info(
"🔍 [매도 주문 전 검증] %s | 매도 수량=%.8f개 | 현재가=%.2f KRW | 예상 매도액=%.2f KRW",
market,
amount,
current_price,
estimated_value,
)
resp = upbit.sell_market_order(market, amount)
logger.info(
"✅ Upbit 시장가 매도 주문 완료: %s | 수량=%.8f개 | 예상 매도액=%.2f KRW", market, amount, estimated_value
)
if isinstance(resp, dict):
order_uuid = resp.get("uuid")
logger.info("Upbit 주문 응답: uuid=%s", order_uuid)
else:
logger.info("Upbit 주문 응답: %s", resp)
result = {
"market": market,
"side": "sell",
"amount": amount,
"status": "placed",
"response": resp,
"timestamp": time.time(),
}
try:
order_uuid = None
if isinstance(resp, dict):
order_uuid = resp.get("uuid") or resp.get("id") or resp.get("txid")
if order_uuid:
monitor_res = monitor_order_upbit(order_uuid, cfg.upbit_access_key, cfg.upbit_secret_key)
result["monitor"] = monitor_res
result["status"] = monitor_res.get("final_status", result["status"]) or result["status"]
except Exception:
logger.debug("매도 주문 모니터링 중 예외 발생", exc_info=True)
return result
except Exception as e:
logger.exception("Upbit 매도 주문 실패: %s", e)
return {"error": str(e), "status": "failed", "timestamp": time.time()}
def execute_sell_order_with_confirmation(symbol: str, amount: float, cfg: "RuntimeConfig") -> dict:
"""
매도 주문 확인 후 실행
"""
confirm_cfg = cfg.config.get("confirm", {})
confirm_via_file = confirm_cfg.get("confirm_via_file", True)
confirm_timeout = confirm_cfg.get("confirm_timeout", 300)
result = None
if not confirm_via_file:
logger.info("파일 확인 비활성화: 즉시 매도 주문 실행")
result = place_sell_order_upbit(symbol, amount, cfg)
else:
token = _make_confirm_token()
order_info = {"symbol": symbol, "side": "sell", "amount": amount, "timestamp": time.time()}
_write_pending_order(token, order_info)
# Telegram 확인 메시지 전송
if cfg.telegram_parse_mode == "HTML":
msg = f"<b>[확인필요] 자동매도 주문 대기</b>\n"
msg += f"토큰: <code>{token}</code>\n"
msg += f"심볼: <b>{symbol}</b>\n"
msg += f"매도수량: <b>{amount:.8f}</b>\n\n"
msg += f"확인 방법: 파일 생성 -> <code>confirm_{token}</code>\n"
msg += f"타임아웃: {confirm_timeout}"
else:
msg = f"[확인필요] 자동매도 주문 대기\n"
msg += f"토큰: {token}\n"
msg += f"심볼: {symbol}\n"
msg += f"매도수량: {amount:.8f}\n\n"
msg += f"확인 방법: 파일 생성 -> confirm_{token}\n"
msg += f"타임아웃: {confirm_timeout}"
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
logger.info("[%s] 매도 확인 대기 중: 토큰=%s, 타임아웃=%d", symbol, token, confirm_timeout)
confirmed = _check_confirmation(token, confirm_timeout)
if not confirmed:
logger.warning("[%s] 매도 확인 타임아웃: 주문 취소", symbol)
if cfg.telegram_bot_token and cfg.telegram_chat_id:
cancel_msg = f"[주문취소] {symbol} 매도\n사유: 사용자 미확인 (타임아웃)"
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
cancel_msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
result = {"status": "user_not_confirmed", "symbol": symbol, "timestamp": time.time()}
else:
logger.info("[%s] 매도 확인 완료: 주문 실행", symbol)
result = place_sell_order_upbit(symbol, amount, cfg)
# 주문 결과 알림
if result and result.get("monitor"):
notify_order_result(symbol, result["monitor"], cfg.config, cfg.telegram_bot_token, cfg.telegram_chat_id)
# 주문 성공 시 거래 기록 (실제/시뮬레이션 모두) 및 보유 수량 차감
if result:
trade_status = result.get("status")
monitor = result.get("monitor", {})
monitor_status = monitor.get("final_status")
record_conditions = ["simulated", "filled", "partial", "timeout", "user_not_confirmed"]
if trade_status in record_conditions or monitor_status in record_conditions:
trade_record = {
"symbol": symbol,
"side": "sell",
"amount": amount,
"timestamp": time.time(),
"dry_run": cfg.dry_run,
"result": result,
}
_calculate_and_add_profit_rate(trade_record, symbol, monitor)
from .signals import record_trade
record_trade(trade_record)
# 실전 거래이고, 일부/전부 체결됐다면 holdings에서 수량 차감
if not cfg.dry_run and monitor:
filled_volume = float(monitor.get("filled_volume", 0.0) or 0.0)
final_status = monitor.get("final_status")
if final_status in ("filled", "partial", "timeout") and filled_volume > 0:
from .holdings import update_holding_amount
min_threshold = cfg.config.get("min_amount_threshold", 1e-8)
update_holding_amount(symbol, -filled_volume, HOLDINGS_FILE, min_amount_threshold=min_threshold)
return result
def execute_buy_order_with_confirmation(symbol: str, amount_krw: float, cfg: "RuntimeConfig") -> dict:
"""
매수 주문 확인 후 실행 (매도와 동일한 확인 메커니즘)
Args:
symbol: 거래 심볼
amount_krw: 매수할 KRW 금액
cfg: RuntimeConfig 객체
Returns:
주문 결과 딕셔너리
"""
confirm_cfg = cfg.config.get("confirm", {})
confirm_via_file = confirm_cfg.get("confirm_via_file", True)
confirm_timeout = confirm_cfg.get("confirm_timeout", 300)
result = None
if not confirm_via_file:
logger.info("파일 확인 비활성화: 즉시 매수 주문 실행")
result = place_buy_order_upbit(symbol, amount_krw, cfg)
else:
token = _make_confirm_token()
order_info = {"symbol": symbol, "side": "buy", "amount_krw": amount_krw, "timestamp": time.time()}
_write_pending_order(token, order_info)
# Telegram 확인 메시지 전송
if cfg.telegram_parse_mode == "HTML":
msg = f"<b>[확인필요] 자동매수 주문 대기</b>\n"
msg += f"토큰: <code>{token}</code>\n"
msg += f"심볼: <b>{symbol}</b>\n"
msg += f"매수금액: <b>{amount_krw:,.0f} KRW</b>\n\n"
msg += f"확인 방법: 파일 생성 -> <code>confirm_{token}</code>\n"
msg += f"타임아웃: {confirm_timeout}"
else:
msg = f"[확인필요] 자동매수 주문 대기\n"
msg += f"토큰: {token}\n"
msg += f"심볼: {symbol}\n"
msg += f"매수금액: {amount_krw:,.0f} KRW\n\n"
msg += f"확인 방법: 파일 생성 -> confirm_{token}\n"
msg += f"타임아웃: {confirm_timeout}"
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
logger.info("[%s] 매수 확인 대기 중: 토큰=%s, 타임아웃=%d", symbol, token, confirm_timeout)
confirmed = _check_confirmation(token, confirm_timeout)
if not confirmed:
logger.warning("[%s] 매수 확인 타임아웃: 주문 취소", symbol)
if cfg.telegram_bot_token and cfg.telegram_chat_id:
cancel_msg = f"[주문취소] {symbol} 매수\n사유: 사용자 미확인 (타임아웃)"
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
cancel_msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
result = {"status": "user_not_confirmed", "symbol": symbol, "timestamp": time.time()}
else:
logger.info("[%s] 매수 확인 완료: 주문 실행", symbol)
result = place_buy_order_upbit(symbol, amount_krw, cfg)
# 주문 결과 알림
if result and result.get("monitor"):
notify_order_result(symbol, result["monitor"], cfg.config, cfg.telegram_bot_token, cfg.telegram_chat_id)
# 주문 성공 시 거래 기록 (실제/시뮬레이션 모두)
if result:
trade_status = result.get("status")
monitor_result = result.get("monitor", {})
monitor_status = monitor_result.get("final_status")
# 시뮬레이션, 완전 체결, 부분 체결, 타임아웃, 사용자 미확인 상태일 때 기록
record_conditions = ["simulated", "filled", "partial", "timeout", "user_not_confirmed"]
if trade_status in record_conditions or monitor_status in record_conditions:
trade_record = {
"symbol": symbol,
"side": "buy",
"amount_krw": amount_krw,
"timestamp": time.time(),
"dry_run": cfg.dry_run,
"result": result,
}
from .signals import record_trade
record_trade(trade_record)
# 실전 거래이고 타임아웃/부분체결 시 체결된 수량을 holdings에 반영
if not cfg.dry_run and monitor_result:
filled_volume = float(monitor_result.get("filled_volume", 0.0) or 0.0)
final_status = monitor_result.get("final_status")
if final_status in ("filled", "partial", "timeout") and filled_volume > 0:
try:
# 평균 매수가 계산
last_order = monitor_result.get("last_order", {})
avg_buy_price = 0.0
if isinstance(last_order, dict):
trades = last_order.get("trades", [])
if trades:
total_krw = sum(float(t.get("price", 0)) * float(t.get("volume", 0)) for t in trades)
total_volume = sum(float(t.get("volume", 0)) for t in trades)
if total_volume > 0:
avg_buy_price = total_krw / total_volume
if avg_buy_price <= 0:
# 평균가 계산 실패 시 현재가 사용
from .holdings import get_current_price
avg_buy_price = get_current_price(symbol)
if avg_buy_price > 0:
from .holdings import add_new_holding
if add_new_holding(symbol, avg_buy_price, filled_volume, time.time(), HOLDINGS_FILE):
logger.info(
"[%s] 타임아웃/부분체결 매수 holdings 자동 반영: 체결량=%.8f, 평균가=%.2f",
symbol,
filled_volume,
avg_buy_price,
)
else:
logger.error("[%s] 타임아웃/부분체결 매수 holdings 반영 실패", symbol)
except Exception as e:
logger.exception("[%s] 타임아웃/부분체결 매수 holdings 반영 중 오류: %s", symbol, e)
return result
def monitor_order_upbit(
order_uuid: str,
access_key: str,
secret_key: str,
timeout: int = None,
poll_interval: int = None,
max_retries: int = None,
) -> dict:
if timeout is None:
timeout = int(os.getenv("ORDER_MONITOR_TIMEOUT", "120"))
if poll_interval is None:
poll_interval = int(os.getenv("ORDER_POLL_INTERVAL", "3"))
if max_retries is None:
max_retries = int(os.getenv("ORDER_MAX_RETRIES", "1"))
upbit = pyupbit.Upbit(access_key, secret_key)
start = time.time()
attempts = 0
current_uuid = order_uuid
last_order = None
filled = 0.0
remaining = None
final_status = "unknown"
consecutive_errors = 0
# config에서 max_consecutive_errors 로드 (기본값 5)
max_consecutive_errors = 5
try:
# Note: config는 함수 매개변수로 전달되지 않으므로 환경변수 사용
max_consecutive_errors = int(os.getenv("ORDER_MAX_CONSECUTIVE_ERRORS", "5"))
except ValueError:
max_consecutive_errors = 5
while True:
# 전체 타임아웃 체크 (무한 대기 방지)
if time.time() - start > timeout + 30: # 여유 시간 30초
logger.error("주문 모니터링 강제 종료: 전체 타임아웃 초과")
final_status = "timeout"
break
try:
order = upbit.get_order(current_uuid)
consecutive_errors = 0 # 성공 시 에러 카운터 리셋
last_order = order
state = order.get("state") if isinstance(order, dict) else None
volume = float(order.get("volume", 0)) if isinstance(order, dict) else 0.0
executed = float(order.get("executed_volume", 0) or order.get("filled_volume", 0) or 0.0)
filled = executed
remaining = max(0.0, volume - executed)
if state in ("done", "closed") or remaining <= 0:
final_status = "filled"
break
if state in ("cancel", "cancelled", "rejected"):
final_status = "cancelled"
break
if time.time() - start > timeout:
if attempts < max_retries and remaining and remaining > 0:
attempts += 1
logger.warning("주문 타임아웃: 재시도 %d/%d, 남은량=%.8f", attempts, max_retries, remaining)
try:
original_side = order.get("side")
cancel_resp = upbit.cancel_order(current_uuid)
logger.info("[%s] 주문 취소 시도: %s", order.get("market"), cancel_resp)
# 취소가 완전히 처리될 때까지 잠시 대기 및 확인
time.sleep(3) # 거래소 처리 시간 대기
cancelled_order = upbit.get_order(current_uuid)
if cancelled_order.get("state") not in ("cancel", "cancelled"):
logger.error("[%s] 주문 취소 실패 또는 이미 체결됨. 재시도 중단.", order.get("market"))
final_status = "error" # 또는 "filled" 상태로 재확인 필요
break
# 매수는 재시도하지 않음 (KRW 금액 계산 복잡도 및 리스크)
# 하지만 부분 체결된 수량이 있다면 그대로 유지
if original_side == "bid":
if filled > 0:
logger.warning("매수 주문 타임아웃: 부분 체결(%.8f) 완료, 재시도하지 않습니다.", filled)
else:
logger.warning("매수 주문 타임아웃: 체결 없음, 재시도하지 않습니다.")
final_status = "timeout"
break
# 매도만 시장가로 재시도
elif original_side == "ask":
logger.info("[%s] 취소 확인 후 시장가 매도 재시도", order.get("market"))
now_resp = upbit.sell_market_order(order.get("market", ""), remaining)
current_uuid = now_resp.get("uuid") if isinstance(now_resp, dict) else None
continue
except Exception as e:
logger.exception("재시도 주문 중 오류: %s", e)
final_status = "error"
break
else:
final_status = "timeout"
break
time.sleep(poll_interval)
except Exception as e:
consecutive_errors += 1
logger.error("주문 모니터링 중 오류 (%d/%d): %s", consecutive_errors, max_consecutive_errors, e)
if consecutive_errors >= max_consecutive_errors:
logger.error("주문 모니터링 중단: 연속 에러 %d회 초과", max_consecutive_errors)
final_status = "error"
break
if time.time() - start > timeout:
final_status = "error"
break
# 에러 발생 시 잠시 대기 후 재시도
time.sleep(min(poll_interval * 2, 10))
return {
"final_status": final_status,
"attempts": attempts,
"filled_volume": filled,
"remaining_volume": remaining,
"last_order": last_order,
"last_checked": time.time(),
}

68
src/retry_utils.py Normal file
View File

@@ -0,0 +1,68 @@
# src/retry_utils.py
"""Exponential backoff retry decorator for network operations."""
import time
import functools
from typing import Callable, TypeVar, Any
from .common import logger
T = TypeVar("T")
def retry_with_backoff(
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 10.0,
exponential_base: float = 2.0,
exceptions: tuple = (Exception,),
) -> Callable:
"""
Exponential backoff retry decorator.
Args:
max_attempts: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay between retries
exponential_base: Base for exponential calculation
exceptions: Tuple of exceptions to catch and retry
Returns:
Decorated function with retry logic
Example:
@retry_with_backoff(max_attempts=3, base_delay=1.0)
def fetch_data():
return api.get_data()
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> T:
last_exception = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts:
logger.error("[RETRY] %s 최종 실패 (%d/%d 시도): %s", func.__name__, attempt, max_attempts, e)
break
# Calculate delay with exponential backoff
delay = min(base_delay * (exponential_base ** (attempt - 1)), max_delay)
logger.warning(
"[RETRY] %s 실패 (%d/%d): %s | %.1f초 후 재시도", func.__name__, attempt, max_attempts, e, delay
)
time.sleep(delay)
# If all attempts failed, raise the last exception
if last_exception:
raise last_exception
# This should never happen, but for type safety
raise RuntimeError(f"{func.__name__} failed without exception")
return wrapper
return decorator

843
src/signals.py Normal file
View File

@@ -0,0 +1,843 @@
import os
import time
import json
import inspect
from typing import List
import pandas as pd
import pandas_ta as ta
from datetime import datetime
from .common import logger, FLOAT_EPSILON, HOLDINGS_FILE, TRADES_FILE
from .indicators import fetch_ohlcv, compute_macd_hist, compute_sma, DataFetchError
from .holdings import fetch_holdings_from_upbit, get_current_price
from .notifications import send_telegram, send_telegram_with_retry
from .config import RuntimeConfig # 테스트 환경에서 NameError 방지
def make_trade_record(symbol, side, amount_krw, dry_run, price=None, status="simulated"):
now = float(time.time())
# pandas 타입을 Python native 타입으로 변환 (JSON 직렬화 가능)
if price is not None:
price = float(price)
return {
"symbol": symbol,
"side": side,
"amount_krw": float(amount_krw),
"timestamp": now,
"datetime": datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S"),
"dry_run": bool(dry_run),
"result": {
"market": str(symbol),
"side": str(side),
"amount_krw": float(amount_krw),
"price": price,
"status": str(status),
"timestamp": now,
},
}
def evaluate_sell_conditions(
current_price: float, buy_price: float, max_price: float, holding_info: dict, config: dict = None
) -> dict:
config = config or {}
# auto_trade 설정에서 매도 조건 설정값 로드
auto_trade_config = config.get("auto_trade", {})
loss_threshold = float(auto_trade_config.get("loss_threshold", -5.0)) # 1. 초기 손절 라인
profit_threshold_1 = float(auto_trade_config.get("profit_threshold_1", 10.0)) # 3. 부분 익절 시작 수익률
profit_threshold_2 = float(
auto_trade_config.get("profit_threshold_2", 30.0)
) # 5. 전량 익절 기준 수익률 (높은 구간)
drawdown_1 = float(auto_trade_config.get("drawdown_1", 5.0)) # 2, 4. 트레일링 스탑 하락률
drawdown_2 = float(auto_trade_config.get("drawdown_2", 15.0)) # 5. 트레일링 스탑 하락률 (높은 구간)
# 현재 수익률 및 최고점 대비 하락률 계산 (엡실론 기반 안전한 비교)
profit_rate = ((current_price - buy_price) / buy_price) * 100 if buy_price > FLOAT_EPSILON else 0
max_drawdown = ((current_price - max_price) / max_price) * 100 if max_price > FLOAT_EPSILON else 0
result = {
"status": "hold",
"sell_ratio": 0.0,
"reasons": [],
"profit_rate": profit_rate,
"max_drawdown": max_drawdown,
"set_partial_sell_done": False,
}
# 매도조건 1: 무조건 손절 (매수가 대비 -5% 하락)
if profit_rate <= loss_threshold:
result.update(status="stop_loss", sell_ratio=1.0)
result["reasons"].append(f"손절(조건1): 수익률 {profit_rate:.2f}% <= {loss_threshold}%")
return result
# 매도조건 3: 수익률 10% 이상 도달 시 1회성 절반 매도
partial_sell_done = holding_info.get("partial_sell_done", False)
if not partial_sell_done and profit_rate >= profit_threshold_1:
result.update(status="stop_loss", sell_ratio=0.5)
result["reasons"].append(f"부분 익절(조건3): 수익률 {profit_rate:.2f}% 달성, 50% 매도")
result["set_partial_sell_done"] = True
return result
# --- 전량 매도 조건 (부분 매도 완료 후 또는 해당 없는 경우) ---
# 최고 수익률 계산 (어느 구간에 있었는지 판단하기 위함)
max_profit_rate = ((max_price - buy_price) / buy_price) * 100 if buy_price > FLOAT_EPSILON else 0
# 매도조건 5: 최고 수익률이 profit_threshold_2 초과 구간 (고수익 구간)
if max_profit_rate > profit_threshold_2:
# 5-2: 수익률이 기준선 이하(<=)로 하락하면 수익 보호 (stop_loss)
if profit_rate <= profit_threshold_2:
result.update(status="stop_loss", sell_ratio=1.0)
result["reasons"].append(
f"수익률 보호(조건5): 최고 수익률({max_profit_rate:.2f}%) 후 {profit_rate:.2f}%로 하락 (<= {profit_threshold_2}%)"
)
return result
# 5-1: 기준선 위에서 최고점 대비 큰 하락 발생 시 익절 (트레일링)
if max_drawdown <= -drawdown_2:
result.update(status="profit_taking", sell_ratio=1.0)
result["reasons"].append(
f"트레일링 익절(조건5): 최고 수익률({max_profit_rate:.2f}%) 후 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_2}%)"
)
return result
# 매도조건 4: 최고 수익률이 profit_threshold_1 초과 profit_threshold_2 이하 (중간 수익 구간)
elif profit_threshold_1 < max_profit_rate <= profit_threshold_2:
# 4-2: 수익률이 기준선 이하(<=)로 하락하면 수익 보호 (stop_loss)
if profit_rate <= profit_threshold_1:
result.update(status="stop_loss", sell_ratio=1.0)
result["reasons"].append(
f"수익률 보호(조건4): 최고 수익률({max_profit_rate:.2f}%) 후 {profit_rate:.2f}%로 하락 (<= {profit_threshold_1}%)"
)
return result
# 4-1: 수익률이 기준선 위에서 최고점 대비 하락률이 임계 초과 시 익절
if profit_rate > profit_threshold_1 and max_drawdown <= -drawdown_1:
result.update(status="profit_taking", sell_ratio=1.0)
result["reasons"].append(
f"트레일링 익절(조건4): 최고 수익률({max_profit_rate:.2f}%) 후 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)"
)
return result
# 매도조건 2: 최고 수익률이 profit_threshold_1 이하 (저수익 구간 - 부분매도 미실행)
elif max_profit_rate <= profit_threshold_1:
# 저수익 구간에서 기준 이상의 하락(트레일링) 발생 시 익절
if max_drawdown <= -drawdown_1:
result.update(status="profit_taking", sell_ratio=1.0)
result["reasons"].append(
f"트레일링 익절(조건2): 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)"
)
return result
result["reasons"].append(f"홀드 (수익률 {profit_rate:.2f}%, 최고점 대비 하락 {max_drawdown:.2f}%)")
return result
def build_sell_message(symbol: str, sell_result: dict, parse_mode: str = "HTML") -> str:
status = sell_result.get("status", "unknown")
profit = sell_result.get("profit_rate", 0.0)
drawdown = sell_result.get("max_drawdown", 0.0)
ratio = int(sell_result.get("sell_ratio", 0.0) * 100)
reasons = sell_result.get("reasons", [])
reason = reasons[0] if reasons else "사유 없음"
market_url = f"https://upbit.com/exchange?code=CRIX.UPBIT.KRW-{symbol.replace('KRW-', '')}"
if parse_mode == "HTML":
msg = f"<b>🔴 매도 신호: {symbol}</b>\n"
msg += f"상태: <b>{status}</b>\n"
msg += f"수익률: <b>{profit:.2f}%</b>\n"
msg += f"최고점 대비: <b>{drawdown:.2f}%</b>\n"
msg += f"매도 비율: <b>{ratio}%</b>\n"
msg += f"사유: {reason}\n"
msg += f'시장: <a href="{market_url}">Upbit {symbol}</a>'
return msg
msg = f"🔴 매도 신호: {symbol}\n"
msg += f"상태: {status}\n"
msg += f"수익률: {profit:.2f}%\n"
msg += f"최고점 대비: {drawdown:.2f}%\n"
msg += f"매도 비율: {ratio}%\n"
msg += f"사유: {reason}\n"
msg += f"시장: {market_url}"
return msg
def _adjust_sell_ratio_for_min_order(
symbol: str, total_amount: float, sell_ratio: float, current_price: float, config: dict
) -> float:
"""
부분 매도 시 최소 주문 금액과 수수료를 고려하여 매도 비율을 조정합니다.
Decimal을 사용하여 부동소수점 오차를 방지합니다.
매도할 금액 또는 남는 금액이 최소 주문 금액 미만일 경우 전량 매도(1.0)로 조정합니다.
"""
if not (0 < sell_ratio < 1):
return sell_ratio
from decimal import Decimal, ROUND_DOWN
auto_trade_cfg = config.get("auto_trade", {})
min_order_value = float(auto_trade_cfg.get("min_order_value_krw", 5000))
fee_margin = float(auto_trade_cfg.get("fee_safety_margin_pct", 0.05)) / 100.0
# Decimal로 변환하여 정밀 계산 (부동소수점 오차 방지)
d_total = Decimal(str(total_amount))
d_ratio = Decimal(str(sell_ratio))
d_price = Decimal(str(current_price))
d_fee = Decimal(str(1 - fee_margin))
# 매도할 수량 계산 (소수점 8자리까지, 내림)
d_to_sell = (d_total * d_ratio).quantize(Decimal("0.00000001"), rounding=ROUND_DOWN)
d_remaining = d_total - d_to_sell
# KRW 금액 계산 (수수료 적용)
value_to_sell = float(d_to_sell * d_price * d_fee)
value_remaining = float(d_remaining * d_price * d_fee)
if value_to_sell < min_order_value or value_remaining < min_order_value:
logger.info(
"[%s] 부분 매도(%.0f%%) 조건 충족했으나, 최소 주문 금액(%.0f KRW) 문제로 "
"전량 매도로 전환합니다. (예상 매도액: %.2f, 예상 잔여액: %.2f)",
symbol,
sell_ratio * 100,
min_order_value,
value_to_sell,
value_remaining,
)
return 1.0
return sell_ratio
def record_trade(trade: dict, trades_file: str = TRADES_FILE, critical: bool = True) -> None:
"""
거래 기록을 원자적으로 저장합니다.
Args:
trade: 거래 정보 딕셔너리
trades_file: 저장 파일 경로
critical: True면 저장 실패 시 예외 발생, False면 경고만 로그
"""
try:
trades = []
if os.path.exists(trades_file):
# 파일 읽기 (with 블록 종료 후 파일 핸들 자동 닫힘)
try:
with open(trades_file, "r", encoding="utf-8") as f:
trades = json.load(f)
except json.JSONDecodeError as e:
# with 블록 밖에서 파일 핸들이 닫힌 후 백업 시도
logger.warning("거래기록 파일 손상 감지, 백업 후 새로 시작: %s", e)
backup_file = f"{trades_file}.corrupted.{int(time.time())}"
try:
os.rename(trades_file, backup_file)
logger.info("손상된 파일 백업: %s", backup_file)
except Exception as backup_err:
logger.error("백업 실패: %s", backup_err)
trades = []
trades.append(trade)
# 원자적 쓰기 (임시 파일 사용)
temp_file = f"{trades_file}.tmp"
with open(temp_file, "w", encoding="utf-8") as f:
json.dump(trades, f, ensure_ascii=False, indent=2)
f.flush()
os.fsync(f.fileno()) # 디스크 동기화 보장
os.replace(temp_file, trades_file)
logger.debug("거래기록 저장 성공: %s", trades_file)
except Exception as e:
logger.error("거래기록 저장 실패: %s", e)
if critical:
# 매도 거래는 반드시 기록되어야 하므로 예외 발생
raise RuntimeError(f"[CRITICAL] 거래 기록 저장 실패: {e}") from e
# critical=False인 경우 경고만 로그 (dry_run 시뮬레이션 등)
def _update_df_with_realtime_price(df: pd.DataFrame, symbol: str, timeframe: str, buffer: list) -> pd.DataFrame:
"""
진행 중인 마지막 캔들 데이터를 실시간 현재가로 업데이트합니다.
"""
try:
from datetime import datetime, timezone
current_price = get_current_price(symbol)
if not (current_price > 0 and df is not None and not df.empty):
return df
last_candle_time = df.index[-1]
now = datetime.now(timezone.utc)
# 봉 주기를 초 단위로 변환
interval_seconds = 0
if "h" in timeframe:
interval_seconds = int(timeframe.replace("h", "")) * 3600
elif "m" in timeframe:
interval_seconds = int(timeframe.replace("m", "")) * 60
elif "d" in timeframe:
interval_seconds = int(timeframe.replace("d", "")) * 86400
if interval_seconds > 0:
if last_candle_time.tzinfo is None:
last_candle_time = last_candle_time.tz_localize(timezone.utc)
next_candle_time = last_candle_time + pd.Timedelta(seconds=interval_seconds)
if last_candle_time <= now < next_candle_time:
df.loc[df.index[-1], "close"] = current_price
df.loc[df.index[-1], "high"] = max(df.loc[df.index[-1], "high"], current_price)
df.loc[df.index[-1], "low"] = min(df.loc[df.index[-1], "low"], current_price)
buffer.append(f"실시간 캔들 업데이트 적용: close={current_price:.2f}")
except Exception as e:
buffer.append(f"warning: 실시간 캔들 업데이트 실패: {e}")
return df
def _prepare_data_and_indicators(
symbol: str, timeframe: str, candle_count: int, indicators: dict, buffer: list
) -> dict | None:
"""데이터를 가져오고 모든 기술적 지표를 계산합니다."""
try:
df = fetch_ohlcv(symbol, timeframe, limit=candle_count, log_buffer=buffer)
df = _update_df_with_realtime_price(df, symbol, timeframe, buffer)
if df.empty or len(df) < 3:
buffer.append(f"지표 계산에 충분한 데이터 없음: {symbol}")
return None
ind = indicators or {}
macd_fast = int(ind.get("macd_fast", 12))
macd_slow = int(ind.get("macd_slow", 26))
macd_signal = int(ind.get("macd_signal", 9))
adx_length = int(ind.get("adx_length", 14))
sma_short_len = int(ind.get("sma_short", 5))
sma_long_len = int(ind.get("sma_long", 200))
macd_df = ta.macd(df["close"], fast=macd_fast, slow=macd_slow, signal=macd_signal)
hist_cols = [c for c in macd_df.columns if "MACDh" in c or "hist" in c.lower()]
macd_cols = [c for c in macd_df.columns if ("MACD" in c and c not in hist_cols and not c.lower().endswith("s"))]
signal_cols = [c for c in macd_df.columns if ("MACDs" in c or c.lower().endswith("s") or "signal" in c.lower())]
if not macd_cols or not signal_cols:
raise RuntimeError("MACD 컬럼을 찾을 수 없습니다.")
sma_short = compute_sma(df["close"], sma_short_len, log_buffer=buffer)
sma_long = compute_sma(df["close"], sma_long_len, log_buffer=buffer)
adx_df = ta.adx(df["high"], df["low"], df["close"], length=adx_length)
adx_cols = [c for c in adx_df.columns if "ADX" in c.upper()]
return {
"df": df,
"macd_line": macd_df[macd_cols[0]].dropna(),
"signal_line": macd_df[signal_cols[0]].dropna(),
"sma_short": sma_short,
"sma_long": sma_long,
"adx": adx_df[adx_cols[0]].dropna() if adx_cols else pd.Series([]),
"indicators_config": {
"adx_threshold": float(ind.get("adx_threshold", 25)),
"sma_short_len": sma_short_len,
"sma_long_len": sma_long_len,
},
}
except Exception as e:
buffer.append(f"warning: 지표 준비 실패: {e}")
logger.warning(f"[{symbol}] 지표 준비 중 오류 발생: {e}")
return None
def _evaluate_buy_conditions(data: dict) -> dict:
"""계산된 지표를 바탕으로 매수 조건을 평가하고 원시 데이터를 반환합니다."""
if not data or len(data.get("macd_line", [])) < 2 or len(data.get("signal_line", [])) < 2:
return {"matches": [], "data_points": {}}
# 지표 값 추출
raw_data = {
"prev_macd": data["macd_line"].iloc[-2],
"curr_macd": data["macd_line"].iloc[-1],
"prev_signal": data["signal_line"].iloc[-2],
"curr_signal": data["signal_line"].iloc[-1],
"close": data["df"]["close"].iloc[-1],
}
sma_short = data["sma_short"].dropna()
sma_long = data["sma_long"].dropna()
raw_data.update(
{
"curr_sma_short": sma_short.iloc[-1] if len(sma_short) >= 1 else None,
"prev_sma_short": sma_short.iloc[-2] if len(sma_short) >= 2 else None,
"curr_sma_long": sma_long.iloc[-1] if len(sma_long) >= 1 else None,
"prev_sma_long": sma_long.iloc[-2] if len(sma_long) >= 2 else None,
}
)
adx = data["adx"].dropna()
raw_data.update(
{"curr_adx": adx.iloc[-1] if len(adx) >= 1 else None, "prev_adx": adx.iloc[-2] if len(adx) >= 2 else None}
)
adx_threshold = data["indicators_config"]["adx_threshold"]
# 조건 정의
cross_macd_signal = (
raw_data["prev_macd"] < raw_data["prev_signal"] and raw_data["curr_macd"] > raw_data["curr_signal"]
)
cross_macd_zero = raw_data["prev_macd"] < 0 and raw_data["curr_macd"] > 0
macd_cross_ok = cross_macd_signal or cross_macd_zero
macd_above_signal = raw_data["curr_macd"] > raw_data["curr_signal"]
sma_condition = (
raw_data["curr_sma_short"] is not None
and raw_data["curr_sma_long"] is not None
and raw_data["curr_sma_short"] > raw_data["curr_sma_long"]
)
cross_sma = (
raw_data["prev_sma_short"] is not None
and raw_data["prev_sma_long"] is not None
and raw_data["prev_sma_short"] < raw_data["prev_sma_long"]
and raw_data["curr_sma_short"] is not None
and raw_data["curr_sma_long"] is not None
and raw_data["curr_sma_short"] > raw_data["curr_sma_long"]
)
adx_ok = raw_data["curr_adx"] is not None and raw_data["curr_adx"] > adx_threshold
cross_adx = (
raw_data["prev_adx"] is not None
and raw_data["curr_adx"] is not None
and raw_data["prev_adx"] <= adx_threshold
and raw_data["curr_adx"] > adx_threshold
)
# 조건 매칭
matches = []
if macd_cross_ok and sma_condition and adx_ok:
matches.append("매수조건1")
if cross_sma and macd_above_signal and adx_ok:
matches.append("매수조건2")
if cross_adx and sma_condition and macd_above_signal:
matches.append("매수조건3")
return {
"matches": matches,
"data_points": raw_data,
"conditions": {
"macd_cross_ok": macd_cross_ok,
"sma_condition": sma_condition,
"cross_sma": cross_sma,
"macd_above_signal": macd_above_signal,
"adx_ok": adx_ok,
"cross_adx": cross_adx,
},
}
def _handle_buy_signal(symbol: str, evaluation: dict, cfg: "RuntimeConfig"):
"""매수 신호를 처리하고, 알림을 보내거나 자동 매수를 실행합니다."""
if not evaluation.get("matches"):
return None
data = evaluation.get("data_points", {})
close_price = data.get("close")
if close_price is None:
return None
# 포매팅 헬퍼
fmt_val = lambda v, p: f"{v:.{p}f}" if v is not None else "N/A"
# 메시지 생성
text = f"매수 신호발생: {symbol} -> {', '.join(evaluation['matches'])}\n가격: {close_price:.8f}\n"
text += f"[MACD] curr/sig: {fmt_val(data.get('curr_macd'), 6)}/{fmt_val(data.get('curr_signal'), 6)}\n"
text += f"[SMA] short/long: {fmt_val(data.get('curr_sma_short'), 1)}/{fmt_val(data.get('curr_sma_long'), 1)}\n"
text += f"[ADX] curr: {fmt_val(data.get('curr_adx'), 4)}"
result = {"telegram": text, "buy_order": None}
trade_recorded = False
amount_krw = float(cfg.config.get("auto_trade", {}).get("buy_amount_krw", 0) or 0)
if cfg.dry_run:
trade = make_trade_record(symbol, "buy", amount_krw, True, price=close_price, status="simulated")
record_trade(trade, TRADES_FILE)
trade_recorded = True
elif cfg.trading_mode == "auto_trade":
auto_trade_cfg = cfg.config.get("auto_trade", {})
can_auto_buy = auto_trade_cfg.get("buy_enabled", False) and amount_krw > 0
if auto_trade_cfg.get("require_env_confirm", True):
can_auto_buy = can_auto_buy and os.getenv("AUTO_TRADE_ENABLED") == "1"
if auto_trade_cfg.get("allowed_symbols", []) and symbol not in auto_trade_cfg["allowed_symbols"]:
can_auto_buy = False
if can_auto_buy:
from .holdings import get_upbit_balances
try:
balances = get_upbit_balances(cfg)
if (balances or {}).get("KRW", 0) < amount_krw:
logger.warning(f"[{symbol}] 잔고 부족으로 매수 건너뜜")
# ... (잔고 부족 알림)
return result
except Exception as e:
logger.warning(f"[{symbol}] 잔고 확인 실패: {e}")
from .order import execute_buy_order_with_confirmation
buy_result = execute_buy_order_with_confirmation(symbol=symbol, amount_krw=amount_krw, cfg=cfg)
result["buy_order"] = buy_result
monitor = buy_result.get("monitor", {})
if (
monitor.get("final_status") in ["filled", "partial", "timeout"]
and float(monitor.get("filled_volume", 0)) > 0
):
trade_recorded = True
# ... (매수 후 처리 로직: holdings 업데이트 및 거래 기록)
if not trade_recorded and not cfg.dry_run:
trade = make_trade_record(symbol, "buy", amount_krw, False, price=close_price, status="notified")
record_trade(trade, TRADES_FILE)
return result
def _process_symbol_core(symbol: str, cfg: "RuntimeConfig", indicators: dict = None) -> dict:
result = {"symbol": symbol, "summary": [], "telegram": None, "error": None}
buffer = []
try:
timeframe = cfg.timeframe
candle_count = cfg.candle_count
indicator_timeframe = cfg.indicator_timeframe
use_tf = indicator_timeframe or timeframe
data = _prepare_data_and_indicators(symbol, use_tf, candle_count, indicators, buffer)
result["summary"].extend(buffer)
if data is None:
result["error"] = "data_preparation_failed"
return result
evaluation = _evaluate_buy_conditions(data)
# 상세 로그 생성
if evaluation["matches"]:
result["summary"].append(f"매수 신호발생: {symbol} -> {', '.join(evaluation['matches'])}")
else:
result["summary"].append("조건 미충족: 매수조건 없음")
if evaluation["data_points"]:
dp = evaluation["data_points"]
c = evaluation["conditions"]
result["summary"].append(f"[조건1 {'충족' if c['macd_cross_ok'] and c['sma_condition'] else '미충족'}]")
result["summary"].append(
f"[조건2 {'충족' if c['cross_sma'] and c['macd_above_signal'] and c['adx_ok'] else '미충족'}]"
)
result["summary"].append(
f"[조건3 {'충족' if c['cross_adx'] and c['sma_condition'] and c['macd_above_signal'] else '미충족'}]"
)
if evaluation["matches"]:
signal_result = _handle_buy_signal(symbol, evaluation, cfg)
if signal_result:
result.update(signal_result)
except DataFetchError as e:
result["summary"].append(f"데이터 수집 실패: {symbol} -> {e}")
result["error"] = "data_fetch_error"
except Exception as e:
logger.exception(f"심볼 처리 중 오류: {symbol} -> {e}")
result["error"] = str(e)
result["summary"].append(f"심볼 처리 중 오류: {symbol} -> {e}")
return result
def process_symbol(*args, **kwargs) -> dict:
"""신규 + 레거시 시그니처 동시 지원 래퍼.
레거시 형태:
process_symbol(symbol, timeframe, limit, token, chat_id, dry_run, indicators=None, indicator_timeframe=None)
신규 형태:
process_symbol(symbol, cfg, indicators=None)
process_symbol(symbol, cfg=cfg, indicators=...) # 혼합 (위치+키워드)
"""
# cfg가 키워드 인자로 전달된 경우 (신규 방식 - 가장 일반적)
if "cfg" in kwargs:
# symbol은 위치 인자 또는 키워드 인자
symbol = args[0] if len(args) > 0 else kwargs["symbol"]
cfg = kwargs["cfg"]
indicators = kwargs.get("indicators")
return _process_symbol_core(symbol, cfg, indicators=indicators)
# 위치 인자로 호출된 경우
if len(args) >= 6 and isinstance(args[1], str) and not hasattr(args[1], "config"):
# 레거시 형태 (6개 이상 인자 + 두 번째가 문자열)
symbol = args[0]
timeframe = args[1]
limit = args[2]
token = args[3]
chat_id = args[4]
dry_run = args[5]
indicators = kwargs.get("indicators")
indicator_timeframe = kwargs.get("indicator_timeframe") or timeframe
from .config import RuntimeConfig, get_default_config
base_cfg = get_default_config()
cfg = RuntimeConfig(
timeframe=timeframe,
indicator_timeframe=indicator_timeframe,
candle_count=limit or base_cfg.get("candle_count", 200),
symbol_delay=base_cfg.get("symbol_delay", 1.0),
interval=60,
loop=False,
dry_run=dry_run,
max_threads=1,
telegram_parse_mode="HTML",
trading_mode="signal_only",
telegram_bot_token=token,
telegram_chat_id=chat_id,
upbit_access_key=None,
upbit_secret_key=None,
aggregate_alerts=False,
benchmark=False,
telegram_test=False,
config=base_cfg,
)
return _process_symbol_core(symbol, cfg, indicators=indicators)
elif len(args) >= 2:
# 신규 형태 (위치 인자: symbol, cfg, [indicators])
symbol = args[0]
cfg = args[1]
indicators = kwargs.get("indicators") if len(args) < 3 else args[2]
return _process_symbol_core(symbol, cfg, indicators=indicators)
else:
raise ValueError(f"process_symbol: 잘못된 인자 형식 (args={args}, kwargs={kwargs})")
def _process_sell_decision(
symbol: str, holding_info: dict, sell_result: dict, current_price: float, cfg: RuntimeConfig, config: dict
) -> int:
"""
Handles the logic for executing a sell order and sending notifications based on a sell decision.
Returns 1 if a sell signal was processed, 0 otherwise.
"""
from .order import execute_sell_order_with_confirmation
telegram_token = cfg.telegram_bot_token
telegram_chat_id = cfg.telegram_chat_id
dry_run = cfg.dry_run
# 부분 매도 플래그 설정: dry_run 모드일 때만 즉시 저장 (실전 모드는 주문 체결 후 저장)
if sell_result.get("set_partial_sell_done", False) and dry_run:
from src.holdings import set_holding_field
set_holding_field(symbol, "partial_sell_done", True, HOLDINGS_FILE)
logger.info("[%s] partial_sell_done 플래그 설정 완료 (dry_run)", symbol)
if sell_result["sell_ratio"] > 0:
# 매도 조건 충족 시 항상 초기 알림 전송 (모든 모드)
if telegram_token and telegram_chat_id:
from .signals import build_sell_message
msg = build_sell_message(symbol, sell_result, parse_mode=cfg.telegram_parse_mode or "HTML")
send_telegram(
telegram_token,
telegram_chat_id,
msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
# auto_trade + 실전일 때만 실제 주문 로직 수행
if cfg.trading_mode == "auto_trade" and not dry_run:
total_amount = float(holding_info.get("amount", 0))
sell_ratio = float(sell_result.get("sell_ratio", 0.0) or 0.0)
# 최소 주문 금액/수수료 고려하여 매도 비율 보정
sell_ratio = _adjust_sell_ratio_for_min_order(symbol, total_amount, sell_ratio, current_price, config)
amount_to_sell = total_amount * sell_ratio
if amount_to_sell > 0:
logger.info(
"[%s] 자동 매도 조건 충족: 매도 주문 시작 (총 수량: %.8f, 매도 비율: %.0f%%, 주문 수량: %.8f)",
symbol,
total_amount,
sell_ratio * 100,
amount_to_sell,
)
sell_order_result = execute_sell_order_with_confirmation(symbol=symbol, amount=amount_to_sell, cfg=cfg)
# 주문 실패/스킵 시 추가 알림 및 재시도 방지
if sell_order_result:
order_status = sell_order_result.get("status")
if order_status in ("skipped_too_small", "failed", "user_not_confirmed"):
error_msg = sell_order_result.get("error", "알 수 없는 오류")
reason = sell_order_result.get("reason", "")
estimated_value = sell_order_result.get("estimated_value", 0)
if telegram_token and telegram_chat_id:
if order_status == "skipped_too_small":
fail_msg = f"[⚠️ 매도 건너뜀] {symbol}\n사유: 최소 주문 금액 미만\n추정 금액: {estimated_value:.0f} KRW\n매도 수량: {amount_to_sell:.8f}\n\n⚠️ Holdings는 그대로 유지됩니다."
elif order_status == "user_not_confirmed":
fail_msg = f"[⚠️ 매도 취소] {symbol}\n사유: 사용자 확인 타임아웃\n매도 수량: {amount_to_sell:.8f}\n\n⚠️ Holdings는 그대로 유지됩니다."
else:
fail_msg = f"[🚨 매도 실패] {symbol}\n사유: {error_msg}\n매도 수량: {amount_to_sell:.8f}\n\n⚠️ Holdings는 그대로 유지됩니다. 수동 확인 필요!"
send_telegram(
telegram_token,
telegram_chat_id,
fail_msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
# 실패한 주문은 signal_count에 포함하지 않음 (다음 주기에 재시도 가능)
return 0
# 부분 매도 플래그 업데이트 로직 추가
if sell_order_result and sell_result.get("set_partial_sell_done"):
monitor_result = sell_order_result.get("monitor", {})
filled_volume = float(monitor_result.get("filled_volume", 0.0) or 0.0)
final_status = monitor_result.get("final_status")
# 주문이 일부라도 체결되었다면 플래그를 저장
if final_status in ("filled", "partial", "timeout") and filled_volume > 0:
from .holdings import set_holding_field
if set_holding_field(symbol, "partial_sell_done", True, holdings_file=HOLDINGS_FILE):
logger.info(
"[%s] 부분 매도(1회성) 완료, partial_sell_done 플래그를 True로 업데이트합니다.", symbol
)
else:
logger.error("[%s] partial_sell_done 플래그 업데이트에 실패했습니다.", symbol)
else:
# _adjust_sell_ratio_for_min_order에서 전량 매도로 조정되었으나 amount_to_sell이 0인 경우
if telegram_token and telegram_chat_id:
skip_msg = f"[매도 건너뜀] {symbol}\n사유: 매도 수량 계산 오류 (amount_to_sell = 0)\n총 수량: {total_amount:.8f}"
send_telegram(
telegram_token,
telegram_chat_id,
skip_msg,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
logger.warning("[%s] 매도 조건 충족했으나 amount_to_sell=0으로 계산됨", symbol)
return 1
return 0
def _check_sell_logic(holdings: dict, cfg: RuntimeConfig, config: dict, check_type: str) -> tuple[list[dict], int]:
"""
Generic function to check sell conditions based on type ('stop_loss' or 'profit_taking').
"""
results = []
sell_signal_count = 0
valid_statuses = []
if check_type == "stop_loss":
# 손절(1시간): 조건1(기본손절) + 조건3(부분익절) + 조건4-2, 5-2(수익률 보호)
valid_statuses = ["stop_loss"]
elif check_type == "profit_taking":
# 익절(4시간): 조건2, 4-1, 5-1(트레일링 스탑)
valid_statuses = ["profit_taking"]
for symbol, holding_info in holdings.items():
try:
current_price = get_current_price(symbol)
if current_price <= 0:
logger.warning("[%s] 현재가 조회 실패, 매도 검사 건너뜀", symbol)
continue
buy_price = float(holding_info.get("buy_price", 0))
max_price = float(holding_info.get("max_price", current_price))
if buy_price <= 0:
logger.warning("[%s] 매수가 정보 없음, 매도 검사 건너뜀", symbol)
continue
sell_result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info, config)
log_msg = (
f"[{symbol}] {check_type} 검사 - "
f"현재가: {current_price:.2f}, 매수가: {buy_price:.2f}, 최고가: {max_price:.2f}, "
f"수익률: {sell_result['profit_rate']:.2f}%, 최고점대비: {sell_result['max_drawdown']:.2f}%, "
f"상태: {sell_result['status']} (비율: {sell_result['sell_ratio']*100:.0f}%)"
)
logger.info(log_msg)
result_obj = {
"symbol": symbol,
"status": sell_result["status"],
"sell_ratio": sell_result["sell_ratio"],
"profit_rate": sell_result["profit_rate"],
"max_drawdown": sell_result["max_drawdown"],
"reasons": sell_result["reasons"],
"current_price": current_price,
"buy_price": buy_price,
"max_price": max_price,
"amount": holding_info.get("amount", 0),
}
results.append(result_obj)
if sell_result["status"] in valid_statuses and sell_result["sell_ratio"] > 0:
logger.info("[%s] 매도 조건 충족 (처리 시작): %s", symbol, ", ".join(sell_result["reasons"]))
processed = _process_sell_decision(symbol, holding_info, sell_result, current_price, cfg, config)
if processed > 0:
logger.info("[%s] 매도 조건 처리 완료", symbol)
else:
logger.warning("[%s] 매도 조건 충족했으나 주문 실패/건너뜀", symbol)
sell_signal_count += processed
except Exception as e:
logger.exception("매도 조건 확인 중 오류 (%s): %s", symbol, e)
return results, sell_signal_count
def check_stop_loss_conditions(holdings: dict, cfg: RuntimeConfig, config: dict = None) -> tuple[list[dict], int]:
if config is None and cfg is not None and hasattr(cfg, "config"):
config = cfg.config
if not holdings:
logger.info("보유 정보가 없음 - 손절 조건 검사 건너뜀")
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 손절 조건 검사 완료 (보유 코인 없음)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
return [], 0
results, sell_signal_count = _check_sell_logic(holdings, cfg, config, "stop_loss")
if cfg.telegram_bot_token and cfg.telegram_chat_id and sell_signal_count == 0:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 손절 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
return results, sell_signal_count
def check_profit_taking_conditions(holdings: dict, cfg: RuntimeConfig, config: dict = None) -> tuple[list[dict], int]:
if config is None and cfg is not None and hasattr(cfg, "config"):
config = cfg.config
if not holdings:
logger.info("보유 정보가 없음 - 익절 조건 검사 건너뜀")
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 익절 조건 검사 완료 (보유 코인 없음)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
return [], 0
results, sell_signal_count = _check_sell_logic(holdings, cfg, config, "profit_taking")
if cfg.telegram_bot_token and cfg.telegram_chat_id and sell_signal_count == 0:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 익절 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode or "HTML",
)
return results, sell_signal_count

1
src/tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Test package

View File

@@ -0,0 +1,111 @@
"""
경계값 테스트: profit_rate가 정확히 10%, 30%일 때 매도 조건 검증
"""
import pytest
from src.signals import evaluate_sell_conditions
class TestBoundaryConditions:
"""매도 조건의 경계값 테스트"""
def test_profit_rate_exactly_10_percent_triggers_partial_sell(self):
"""수익률이 정확히 10%일 때 부분 매도(조건3) 발생"""
# Given: 매수가 100, 현재가 110 (정확히 10% 수익)
buy_price = 100.0
current_price = 110.0
max_price = 110.0
holding_info = {"partial_sell_done": False}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then
assert result["status"] == "stop_loss" # 부분 익절은 stop_loss (1시간 주기)
assert result["sell_ratio"] == 0.5
assert result["set_partial_sell_done"] is True
assert "부분 익절" in result["reasons"][0]
def test_profit_rate_exactly_30_percent_in_high_zone(self):
"""최고 수익률 30% 초과 구간에서 수익률이 정확히 30%로 떨어질 때"""
# Given: 최고가 135 (35% 수익), 현재가 130 (30% 수익)
buy_price = 100.0
current_price = 130.0
max_price = 135.0
holding_info = {"partial_sell_done": True}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then: 수익률이 30% 이하(<= 30)로 하락하여 조건5-2 발동 (stop_loss)
assert result["status"] == "stop_loss"
assert result["sell_ratio"] == 1.0
assert "수익률 보호(조건5)" in result["reasons"][0]
def test_profit_rate_below_30_percent_triggers_sell(self):
"""최고 수익률 30% 초과 구간에서 수익률이 30% 미만으로 떨어질 때"""
# Given: 최고가 135 (35% 수익), 현재가 129.99 (29.99% 수익)
buy_price = 100.0
current_price = 129.99
max_price = 135.0
holding_info = {"partial_sell_done": True}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then: 조건5-2 발동 (수익률 30% 미만으로 하락)
assert result["status"] == "stop_loss"
assert result["sell_ratio"] == 1.0
assert "수익률 보호(조건5)" in result["reasons"][0]
def test_profit_rate_exactly_10_percent_in_mid_zone(self):
"""최고 수익률 10~30% 구간에서 수익률이 정확히 10%일 때"""
# Given: 최고가 120 (20% 수익), 현재가 110 (10% 수익)
buy_price = 100.0
current_price = 110.0
max_price = 120.0
holding_info = {"partial_sell_done": True}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then: 수익률이 10% 이하(<= 10)로 하락하여 조건4-2 발동 (stop_loss)
assert result["status"] == "stop_loss"
assert result["sell_ratio"] == 1.0
assert "수익률 보호(조건4)" in result["reasons"][0]
def test_profit_rate_below_10_percent_triggers_sell(self):
"""최고 수익률 10~30% 구간에서 수익률이 10% 미만으로 떨어질 때"""
# Given: 최고가 120 (20% 수익), 현재가 109.99 (9.99% 수익)
buy_price = 100.0
current_price = 109.99
max_price = 120.0
holding_info = {"partial_sell_done": True}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then: 조건4-2 발동 (수익률 10% 미만으로 하락)
assert result["status"] == "stop_loss"
assert result["sell_ratio"] == 1.0
assert "수익률 보호(조건4)" in result["reasons"][0]
def test_partial_sell_already_done_no_duplicate(self):
"""부분 매도 이미 완료된 경우 중복 발동 안됨"""
# Given: 매수가 100, 현재가 110 (10% 수익), 이미 부분 매도 완료
buy_price = 100.0
current_price = 110.0
max_price = 110.0
holding_info = {"partial_sell_done": True}
# When
result = evaluate_sell_conditions(current_price, buy_price, max_price, holding_info)
# Then: 부분 매도 재발동 안됨
assert result["status"] == "hold"
assert result["sell_ratio"] == 0.0
assert result["set_partial_sell_done"] is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,118 @@
"""
치명적 문제 수정 사항 검증 테스트
- 원자적 파일 쓰기
- API 키 검증
- Decimal 정밀도
"""
import os
import json
import tempfile
import pytest
from decimal import Decimal
from src.holdings import save_holdings, load_holdings
from src.signals import record_trade, _adjust_sell_ratio_for_min_order
from src.config import build_runtime_config
class TestCriticalFixes:
"""치명적 문제 수정 사항 테스트"""
def test_atomic_holdings_save(self, tmp_path):
"""[C-1] holdings.json 원자적 쓰기 검증"""
holdings_file = tmp_path / "test_holdings.json"
# 초기 데이터 저장
initial_data = {"KRW-BTC": {"amount": 0.1, "buy_price": 50000000}}
save_holdings(initial_data, str(holdings_file))
# 파일 존재 및 내용 확인
assert holdings_file.exists()
loaded = load_holdings(str(holdings_file))
assert loaded == initial_data
# 임시 파일이 남아있지 않은지 확인 (원자적 교체 완료)
temp_files = list(tmp_path.glob("*.tmp"))
assert len(temp_files) == 0, "임시 파일이 남아있으면 안됩니다"
def test_trade_record_critical_flag(self, tmp_path):
"""[C-4] 거래 기록 critical 플래그 동작 검증"""
trades_file = tmp_path / "test_trades.json"
# critical=False: 저장 실패 시 예외 발생 안함
trade = {"symbol": "KRW-BTC", "side": "sell", "amount": 0.1}
# 정상 저장
record_trade(trade, str(trades_file), critical=False)
assert trades_file.exists()
# critical=True: 파일 권한 오류 시뮬레이션은 어려우므로 정상 케이스만 검증
trade2 = {"symbol": "KRW-ETH", "side": "buy", "amount": 1.0}
record_trade(trade2, str(trades_file), critical=True)
# 두 거래 모두 기록되었는지 확인
with open(trades_file, "r", encoding="utf-8") as f:
trades = json.load(f)
assert len(trades) == 2
assert trades[0]["symbol"] == "KRW-BTC"
assert trades[1]["symbol"] == "KRW-ETH"
def test_api_key_validation_in_config(self):
"""[C-2] API 키 검증 로직 확인"""
# dry_run=True: API 키 없어도 통과
config_dry = {"dry_run": True, "trading_mode": "auto_trade", "auto_trade": {}}
# 환경변수 없어도 예외 발생 안함
cfg = build_runtime_config(config_dry)
assert cfg.dry_run is True
# dry_run=False + auto_trade: 환경변수 필수 (실제 테스트는 환경변수 설정 필요)
# 여기서는 로직 존재 여부만 확인 (실제 ValueError 발생은 환경 의존적)
def test_decimal_precision_in_sell_ratio(self):
"""[C-3] Decimal을 사용한 부동소수점 오차 방지 검증"""
config = {"auto_trade": {"min_order_value_krw": 5000, "fee_safety_margin_pct": 0.05}}
# 테스트 케이스: 0.5 비율 매도 시 정밀 계산
total_amount = 0.00123456 # BTC
sell_ratio = 0.5
current_price = 50_000_000 # 50M KRW
adjusted_ratio = _adjust_sell_ratio_for_min_order("KRW-BTC", total_amount, sell_ratio, current_price, config)
# 부동소수점 오차 없이 계산되었는지 확인
# 예상 매도액: 0.00123456 * 0.5 * 50M * 0.9995 ≈ 30,863 KRW > 5000 → 0.5 유지
assert adjusted_ratio == 0.5
# 경계 케이스: 잔여액이 최소 금액 미만일 때 전량 매도로 전환
small_amount = 0.0001 # BTC
adjusted_ratio_small = _adjust_sell_ratio_for_min_order("KRW-BTC", small_amount, 0.5, current_price, config)
# 0.0001 * 0.5 * 50M * 0.9995 = 2498.75 < 5000 → 전량 매도(1.0)
assert adjusted_ratio_small == 1.0
def test_corrupted_trades_file_backup(self, tmp_path):
"""[C-4] 손상된 거래 파일 백업 기능 검증"""
trades_file = tmp_path / "corrupted_trades.json"
# 손상된 JSON 파일 생성
with open(trades_file, "w", encoding="utf-8") as f:
f.write("{invalid json content")
# 새 거래 기록 시도 → 손상 파일 백업 후 정상 저장
trade = {"symbol": "KRW-BTC", "side": "sell"}
record_trade(trade, str(trades_file), critical=False)
# 백업 파일 생성 확인
backup_files = list(tmp_path.glob("corrupted_trades.json.corrupted.*"))
assert len(backup_files) > 0, "손상된 파일이 백업되어야 합니다"
# 정상 파일로 복구 확인
with open(trades_file, "r", encoding="utf-8") as f:
trades = json.load(f)
assert len(trades) == 1
assert trades[0]["symbol"] == "KRW-BTC"
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,143 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
import pytest
from src.signals import evaluate_sell_conditions
@pytest.fixture
def base_config():
"""Provides the auto_trade part of the configuration."""
return {
"auto_trade": {
"loss_threshold": -5.0,
"profit_threshold_1": 10.0,
"profit_threshold_2": 30.0,
"drawdown_1": 5.0,
"drawdown_2": 15.0,
}
}
# Test cases for the new strategy
def test_stop_loss_initial(base_config):
"""Rule 1: Sell if price drops 5% below buy price."""
res = evaluate_sell_conditions(
current_price=95.0, buy_price=100.0, max_price=100.0, holding_info={}, config=base_config
)
assert res["status"] == "stop_loss"
assert res["sell_ratio"] == 1.0
def test_trailing_stop_small_profit(base_config):
"""Rule 2: In small profit (<= 10%), sell if price drops 5% from high."""
res1 = evaluate_sell_conditions(
current_price=96.0, # +6% profit (just above +5% but below +10%)
buy_price=100.0,
max_price=110.0, # High was +10%
holding_info={"partial_sell_done": False},
config=base_config,
)
# Drawdown is (96-110)/110 = -12.7% which is > 5%
assert res1["status"] == "profit_taking" # Trailing stop classified as profit_taking
assert res1["sell_ratio"] == 1.0
def test_partial_profit_at_10_percent(base_config):
"""Rule 3: At profit == 10%, sell 50%."""
res = evaluate_sell_conditions(
current_price=110.0, # Exactly +10%
buy_price=100.0,
max_price=110.0,
holding_info={"partial_sell_done": False},
config=base_config,
)
assert res["status"] == "stop_loss" # Partial profit classified as stop_loss for 1h check
assert res["sell_ratio"] == 0.5
assert res["set_partial_sell_done"] is True
def test_trailing_stop_medium_profit_by_drawdown(base_config):
"""Rule 4: In mid profit (10-30%), sell if price drops 5% from high."""
res = evaluate_sell_conditions(
current_price=123.0, # +23% profit
buy_price=100.0,
max_price=130.0, # High was +30%
holding_info={"partial_sell_done": True},
config=base_config,
)
# Drawdown is (123-130)/130 = -5.38% which is < -5%
assert res["status"] == "profit_taking" # Trailing stop classified as profit_taking for 4h check
assert res["sell_ratio"] == 1.0
def test_trailing_stop_medium_profit_by_floor(base_config):
"""Rule 4: In mid profit (10-30%), sell if profit drops to 10%."""
res = evaluate_sell_conditions(
current_price=110.0, # Profit drops to 10%
buy_price=100.0,
max_price=125.0, # High was +25%
holding_info={"partial_sell_done": True},
config=base_config,
)
assert res["status"] == "stop_loss" # Profit protection classified as stop_loss for 1h check
assert res["sell_ratio"] == 1.0
def test_trailing_stop_high_profit_by_drawdown(base_config):
"""Rule 5: In high profit (>30%), sell if price drops 15% from high."""
res = evaluate_sell_conditions(
current_price=135.0, # +35% profit
buy_price=100.0,
max_price=160.0, # High was +60%
holding_info={"partial_sell_done": True},
config=base_config,
)
# Drawdown is (135-160)/160 = -15.625% which is < -15%
assert res["status"] == "profit_taking" # Trailing stop classified as profit_taking for 4h check
assert res["sell_ratio"] == 1.0
def test_trailing_stop_high_profit_by_floor(base_config):
"""Rule 5: In high profit (>30%), sell if profit drops to 30%."""
res = evaluate_sell_conditions(
current_price=130.0, # Profit drops to 30%
buy_price=100.0,
max_price=150.0, # High was +50%
holding_info={"partial_sell_done": True},
config=base_config,
)
assert res["status"] == "stop_loss" # Profit protection classified as stop_loss for 1h check
assert res["sell_ratio"] == 1.0
def test_hold_high_profit(base_config):
"""Rule 6: Hold if profit > 30% and drawdown is less than 15%."""
res = evaluate_sell_conditions(
current_price=140.0, # +40% profit
buy_price=100.0,
max_price=150.0, # High was +50%
holding_info={"partial_sell_done": True},
config=base_config,
)
# Drawdown is (140-150)/150 = -6.67% which is > -15%
assert res["status"] == "hold"
assert res["sell_ratio"] == 0.0
def test_hold_medium_profit(base_config):
"""Hold if profit is 10-30% and drawdown is less than 5%."""
res = evaluate_sell_conditions(
current_price=128.0, # +28% profit
buy_price=100.0,
max_price=130.0, # High was +30%
holding_info={"partial_sell_done": True},
config=base_config,
)
# Drawdown is (128-130)/130 = -1.5% which is > -5%
assert res["status"] == "hold"
assert res["sell_ratio"] == 0.0

74
src/tests/test_helpers.py Normal file
View File

@@ -0,0 +1,74 @@
"""Test helper functions used primarily in test scenarios."""
import inspect
import sys
import os
# Add parent directory to path to import from src
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from src.common import logger
from src.signals import process_symbol
from src.notifications import send_telegram
def safe_send_telegram(bot_token: str, chat_id: str, text: str, **kwargs) -> bool:
"""Flexibly call send_telegram even if monkeypatched version has a simpler signature.
Inspects the target callable and only passes accepted parameters."""
func = send_telegram
try:
sig = inspect.signature(func)
accepted = sig.parameters.keys()
call_kwargs = {}
# positional mapping
params = list(accepted)
pos_args = [bot_token, chat_id, text]
for i, val in enumerate(pos_args):
if i < len(params):
call_kwargs[params[i]] = val
# optional kwargs filtered
for k, v in kwargs.items():
if k in accepted:
call_kwargs[k] = v
return func(**call_kwargs)
except Exception:
# Fallback positional
try:
return func(bot_token, chat_id, text)
except Exception:
return False
def check_and_notify(
exchange: str,
symbol: str,
timeframe: str,
telegram_token: str,
telegram_chat_id: str,
limit: int = 200,
dry_run: bool = True,
):
"""Compatibility helper used by tests: run processing for a single symbol and send notification if needed.
exchange parameter is accepted for API compatibility but not used (we use pyupbit internally).
"""
try:
res = process_symbol(
symbol,
timeframe,
limit,
telegram_token,
telegram_chat_id,
dry_run,
indicators=None,
indicator_timeframe=None,
)
# If a telegram message was returned from process_symbol, send it (unless dry_run)
if res.get("telegram"):
if dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
else:
if telegram_token and telegram_chat_id:
safe_send_telegram(telegram_token, telegram_chat_id, res["telegram"], add_thread_prefix=False)
except Exception as e:
logger.exception("check_and_notify 오류: %s", e)

93
src/tests/test_main.py Normal file
View File

@@ -0,0 +1,93 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
import builtins
import types
import pandas as pd
import pytest
import main
from .test_helpers import check_and_notify, safe_send_telegram
def test_compute_macd_hist_monkeypatch(monkeypatch):
# Arrange: monkeypatch pandas_ta.macd to return a DataFrame with MACDh column
dummy_macd = pd.DataFrame({"MACDh_12_26_9": [None, 0.5, 1.2, 2.3]})
def fake_macd(series, fast, slow, signal):
return dummy_macd
monkeypatch.setattr(main.ta, "macd", fake_macd)
close = pd.Series([1, 2, 3, 4])
# Act: import directly from indicators
from src.indicators import compute_macd_hist
hist = compute_macd_hist(close)
# Assert
assert isinstance(hist, pd.Series)
assert list(hist.dropna()) == [0.5, 1.2, 2.3]
def test_check_and_notify_positive_sends(monkeypatch):
# Prepare a fake OHLCV DataFrame with required OHLCV columns
idx = pd.date_range(end=pd.Timestamp.now(), periods=250, freq="h")
df = pd.DataFrame(
{
"open": list(range(100, 350)),
"high": list(range(105, 355)),
"low": list(range(95, 345)),
"close": list(range(100, 350)),
"volume": [1000] * 250,
},
index=idx,
)
# Monkeypatch at the point of use: src.signals imports from indicators
from src import signals
# Patch fetch_ohlcv to return complete OHLCV data
monkeypatch.setattr(signals, "fetch_ohlcv", lambda symbol, timeframe, limit=200, log_buffer=None: df)
# Fake pandas_ta.macd to return MACD crossover (signal cross)
def fake_macd(close, fast=12, slow=26, signal=9):
macd_df = pd.DataFrame(index=close.index)
# Create crossover: prev < signal, curr > signal
macd_values = [-0.5] * (len(close) - 1) + [1.5] # Last value crosses above
signal_values = [0.5] * len(close) # Constant signal line
macd_df["MACD_12_26_9"] = pd.Series(macd_values, index=close.index)
macd_df["MACDs_12_26_9"] = pd.Series(signal_values, index=close.index)
macd_df["MACDh_12_26_9"] = pd.Series([v - s for v, s in zip(macd_values, signal_values)], index=close.index)
return macd_df
monkeypatch.setattr(signals.ta, "macd", fake_macd)
# Fake pandas_ta.adx to return valid ADX data
def fake_adx(high, low, close, length=14):
adx_df = pd.DataFrame(index=close.index)
adx_df[f"ADX_{length}"] = pd.Series([30.0] * len(close), index=close.index)
return adx_df
monkeypatch.setattr(signals.ta, "adx", fake_adx)
# Capture calls to safe_send_telegram
called = {"count": 0}
def fake_safe_send(token, chat_id, text, **kwargs):
called["count"] += 1
return True
# Monkeypatch test_helpers module
from . import test_helpers
monkeypatch.setattr(test_helpers, "safe_send_telegram", fake_safe_send)
# Act: call check_and_notify (not dry-run)
check_and_notify("upbit", "KRW-BTC", "1h", "token", "chat", limit=10, dry_run=False)
# Assert: safe_send_telegram was called
assert called["count"] == 1

160
src/threading_utils.py Normal file
View File

@@ -0,0 +1,160 @@
import time
import threading
from typing import List
from .config import RuntimeConfig
from .common import logger
from .signals import process_symbol
from .notifications import send_telegram
def run_sequential(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols))
alerts = []
buy_signal_count = 0
for i, sym in enumerate(symbols):
try:
res = process_symbol(sym, cfg=cfg)
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if cfg.dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
else:
# dry_run이 아닐 때는 콘솔에 메시지 출력하지 않음
pass
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
res["telegram"],
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
except Exception as e:
logger.exception("심볼 처리 오류: %s -> %s", sym, e)
if i < len(symbols) - 1 and cfg.symbol_delay is not None:
logger.debug("다음 심볼까지 %.2f초 대기", cfg.symbol_delay)
time.sleep(cfg.symbol_delay)
if aggregate_enabled and len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if cfg.dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
summary_text,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
return buy_signal_count
def run_with_threads(symbols: List[str], cfg: RuntimeConfig, aggregate_enabled: bool = False):
logger.info(
"병렬 처리 시작 (심볼 수=%d, 스레드 수=%d, 심볼 간 지연=%.2f초)",
len(symbols),
cfg.max_threads or 0,
cfg.symbol_delay or 0.0,
)
semaphore = threading.Semaphore(cfg.max_threads)
threads = []
last_request_time = [0]
throttle_lock = threading.Lock()
results = {}
results_lock = threading.Lock()
def worker(symbol: str):
try:
with semaphore:
with throttle_lock:
elapsed = time.time() - last_request_time[0]
if cfg.symbol_delay is not None and elapsed < cfg.symbol_delay:
sleep_time = cfg.symbol_delay - elapsed
logger.debug("[%s] 스로틀 대기: %.2f", symbol, sleep_time)
time.sleep(sleep_time)
last_request_time[0] = time.time()
res = process_symbol(symbol, cfg=cfg)
with results_lock:
results[symbol] = res
except Exception as e:
logger.exception("[%s] 워커 스레드 오류: %s", symbol, e)
for sym in symbols:
t = threading.Thread(target=worker, args=(sym,), name=f"Worker-{sym}")
threads.append(t)
t.start()
for t in threads:
t.join()
alerts = []
buy_signal_count = 0
for sym in symbols:
with results_lock:
res = results.get(sym)
if not res:
logger.warning("심볼 결과 없음: %s", sym)
continue
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if cfg.dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
res["telegram"],
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
if aggregate_enabled and len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if cfg.dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
summary_text,
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
if cfg.telegram_bot_token and cfg.telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(
cfg.telegram_bot_token,
cfg.telegram_chat_id,
"[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)",
add_thread_prefix=False,
parse_mode=cfg.telegram_parse_mode,
)
logger.info("병렬 처리 완료")
return buy_signal_count