551 lines
28 KiB
Python
551 lines
28 KiB
Python
import os
|
|
import time
|
|
import json
|
|
import inspect
|
|
from typing import List, Dict, Tuple, Any
|
|
from enum import Enum
|
|
import pandas as pd
|
|
import pandas_ta as ta
|
|
from datetime import datetime
|
|
|
|
from .common import logger
|
|
from .config import RuntimeConfig
|
|
from .indicators import fetch_ohlcv, compute_macd_hist, compute_sma
|
|
from .holdings import fetch_holdings_from_kiwoom, get_current_price
|
|
from .notifications import send_telegram
|
|
|
|
|
|
class CheckType(str, Enum):
|
|
"""매도 조건 체크 타입."""
|
|
STOP_LOSS = "stop_loss" # 손절 조건 (1시간 주기)
|
|
PROFIT_TAKING = "profit_taking" # 익절 조건 (4시간 주기)
|
|
ALL = "all" # 모든 조건
|
|
|
|
|
|
def make_trade_record(symbol, side, amount, dry_run, price=None, status="simulated"):
|
|
now = float(time.time())
|
|
return {
|
|
"symbol": symbol,
|
|
"side": side,
|
|
"amount": amount,
|
|
"timestamp": now,
|
|
"datetime": datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S"),
|
|
"dry_run": dry_run,
|
|
"result": {
|
|
"market": symbol,
|
|
"side": side,
|
|
"amount": amount,
|
|
"price": price,
|
|
"status": status,
|
|
"timestamp": now
|
|
}
|
|
}
|
|
|
|
def evaluate_sell_conditions(
|
|
current_price: float,
|
|
buy_price: float,
|
|
max_price: float,
|
|
holding_info: Dict[str, Any],
|
|
config: Dict[str, Any] | None = None,
|
|
check_type: str = CheckType.ALL
|
|
) -> Dict[str, Any]:
|
|
"""매도 조건을 평가하여 매도 여부와 비율을 반환.
|
|
|
|
Args:
|
|
current_price: 현재 가격
|
|
buy_price: 매수 가격
|
|
max_price: 최고 가격
|
|
holding_info: 보유 정보 (partial_sell_done 포함)
|
|
config: 설정 딕셔너리
|
|
check_type: 체크 타입 (CheckType.STOP_LOSS, CheckType.PROFIT_TAKING, CheckType.ALL)
|
|
|
|
Returns:
|
|
매도 조건 평가 결과 딕셔너리
|
|
"""
|
|
config = config or {}
|
|
auto_trade_config = config.get("auto_trade", {})
|
|
loss_threshold = float(auto_trade_config.get("loss_threshold", -5.0))
|
|
profit_threshold_1 = float(auto_trade_config.get("profit_threshold_1", 10.0))
|
|
profit_threshold_2 = float(auto_trade_config.get("profit_threshold_2", 30.0))
|
|
drawdown_1 = float(auto_trade_config.get("drawdown_1", 5.0))
|
|
drawdown_2 = float(auto_trade_config.get("drawdown_2", 15.0))
|
|
partial_sell_ratio = float(auto_trade_config.get("partial_sell_ratio", 0.5))
|
|
|
|
# 가격 유효성 검증
|
|
if buy_price <= 0:
|
|
logger.error(f"비정상 매수가: {buy_price}")
|
|
return {
|
|
"status": "error",
|
|
"sell_ratio": 0.0,
|
|
"reasons": [f"비정상 매수가: {buy_price}"],
|
|
"profit_rate": 0,
|
|
"max_drawdown": 0,
|
|
"set_partial_sell_done": False,
|
|
"check_interval_minutes": 60
|
|
}
|
|
|
|
if max_price <= 0:
|
|
logger.warning(f"비정상 최고가: {max_price}, 현재가로 대체")
|
|
max_price = current_price
|
|
|
|
profit_rate = ((current_price - buy_price) / buy_price) * 100
|
|
max_drawdown = ((current_price - max_price) / max_price) * 100
|
|
|
|
result = {
|
|
"status": "hold",
|
|
"sell_ratio": 0.0,
|
|
"reasons": [],
|
|
"profit_rate": profit_rate,
|
|
"max_drawdown": max_drawdown,
|
|
"set_partial_sell_done": False,
|
|
"check_interval_minutes": 60 # 기본값: 1시간
|
|
}
|
|
|
|
# check_type에 따른 조건 필터링
|
|
# "stop_loss": 1시간 주기 조건만 (조건1, 조건3, 조건4-2, 조건5-2)
|
|
# "profit_taking": 4시간 주기 조건만 (조건2, 조건4-1, 조건5-1)
|
|
# "all": 모든 조건
|
|
|
|
# 조건 우선순위:
|
|
# 1. 손절 (조건1) - 최우선
|
|
# 2. 부분 익절 (조건3) - 수익 실현 시작 (1회성)
|
|
# 3. 전량 익절 (조건4, 5) - 최고점 대비 하락 또는 수익률 하락
|
|
|
|
# 조건1: 손절 -5% (1시간 주기) - 손실 방지 최우선
|
|
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= loss_threshold:
|
|
result.update(status="stop_loss", sell_ratio=1.0, check_interval_minutes=60)
|
|
result["reasons"].append(f"손절(조건1): 수익률 {profit_rate:.2f}% <= {loss_threshold}%")
|
|
return result
|
|
|
|
max_profit_rate = ((max_price - buy_price) / buy_price) * 100
|
|
|
|
# 조건3: 부분 익절 10% (1시간 주기) - profit_threshold_1 도달 시 일부 수익 확정
|
|
# 주의: 부분 익절은 1회만 실행되며, 이후 전량 익절 조건만 평가됨
|
|
partial_sell_done = holding_info.get("partial_sell_done", False)
|
|
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and not partial_sell_done and profit_rate >= profit_threshold_1:
|
|
result.update(status="partial_profit", sell_ratio=partial_sell_ratio, check_interval_minutes=60)
|
|
result["reasons"].append(f"부분 익절(조건3): 수익률 {profit_rate:.2f}% 달성, {int(partial_sell_ratio * 100)}% 매도")
|
|
result["set_partial_sell_done"] = True
|
|
return result
|
|
|
|
if max_profit_rate > profit_threshold_2:
|
|
# 조건5-1: 최고점 -15% 하락 (4시간 주기)
|
|
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_2:
|
|
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=240)
|
|
result["reasons"].append(f"전량 익절(조건5-1): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_2}%)")
|
|
return result
|
|
# 조건5-2: 수익률 30% 이하 하락 (1시간 주기)
|
|
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= profit_threshold_2:
|
|
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=60)
|
|
result["reasons"].append(f"전량 익절(조건5-2): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 수익률 {profit_rate:.2f}%로 하락 (기준: {profit_threshold_2}%)")
|
|
return result
|
|
elif profit_threshold_1 < max_profit_rate <= profit_threshold_2:
|
|
# 조건4-1: 최고점 -5% 하락 (4시간 주기)
|
|
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_1:
|
|
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=240)
|
|
result["reasons"].append(f"전량 익절(조건4-1): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)")
|
|
return result
|
|
# 조건4-2: 수익률 10% 이하 하락 (1시간 주기)
|
|
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= profit_threshold_1:
|
|
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=60)
|
|
result["reasons"].append(f"전량 익절(조건4-2): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 수익률 {profit_rate:.2f}%로 하락 (기준: {profit_threshold_1}%)")
|
|
return result
|
|
elif max_profit_rate <= profit_threshold_1:
|
|
# 조건2: 10% 이하 + 최고점 -5% (4시간 주기)
|
|
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_1:
|
|
result.update(status="stop_loss", sell_ratio=1.0, check_interval_minutes=240)
|
|
result["reasons"].append(f"손절(조건2): 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)")
|
|
return result
|
|
|
|
result["reasons"].append(f"홀드 (수익률 {profit_rate:.2f}%, 최고점 대비 하락 {max_drawdown:.2f}%)")
|
|
return result
|
|
|
|
def build_sell_message(symbol: str, sell_result: dict, parse_mode: str = "HTML") -> str:
|
|
status = sell_result.get("status", "unknown")
|
|
profit = sell_result.get("profit_rate", 0.0)
|
|
drawdown = sell_result.get("max_drawdown", 0.0)
|
|
ratio = int(sell_result.get("sell_ratio", 0.0) * 100)
|
|
reasons = sell_result.get("reasons", [])
|
|
reason = reasons[0] if reasons else "사유 없음"
|
|
|
|
if parse_mode == "HTML":
|
|
msg = f"<b>🔴 매도 신호: {symbol}</b>\n"
|
|
msg += f"상태: <b>{status}</b>\n"
|
|
msg += f"수익률: <b>{profit:.2f}%</b>\n"
|
|
msg += f"최고점 대비: <b>{drawdown:.2f}%</b>\n"
|
|
msg += f"매도 비율: <b>{ratio}%</b>\n"
|
|
msg += f"사유: {reason}\n"
|
|
return msg
|
|
msg = f"🔴 매도 신호: {symbol}\n"
|
|
msg += f"상태: {status}\n"
|
|
msg += f"수익률: {profit:.2f}%\n"
|
|
msg += f"최고점 대비: {drawdown:.2f}%\n"
|
|
msg += f"매도 비율: {ratio}%\n"
|
|
msg += f"사유: {reason}\n"
|
|
return msg
|
|
|
|
def _adjust_sell_ratio_for_min_order(symbol: str, total_amount: float, sell_ratio: float, current_price: float, config: dict) -> float:
|
|
if not (0 < sell_ratio < 1):
|
|
return sell_ratio
|
|
|
|
auto_trade_cfg = (config or {}).get("auto_trade", {})
|
|
try:
|
|
min_order_value = float(auto_trade_cfg.get("min_order_value", 100000))
|
|
except (ValueError, TypeError):
|
|
min_order_value = 100000.0
|
|
|
|
amount_to_sell_partial = total_amount * sell_ratio
|
|
value_to_sell = amount_to_sell_partial * current_price
|
|
value_remaining = (total_amount - amount_to_sell_partial) * current_price
|
|
|
|
if value_to_sell < min_order_value or value_remaining < min_order_value:
|
|
logger.info("[%s] 부분 매도(%.0f%%) 조건 충족했으나, 최소 주문 금액(%.0f) 문제로 전량 매도로 전환합니다. (예상 매도액: %.0f, 예상 잔여액: %.0f)",
|
|
symbol, sell_ratio * 100, min_order_value, value_to_sell, value_remaining)
|
|
return 1.0
|
|
|
|
return sell_ratio
|
|
|
|
def record_trade(trade: dict, trades_file: str = "trades.json"):
|
|
try:
|
|
trades = []
|
|
if os.path.exists(trades_file):
|
|
with open(trades_file, "r", encoding="utf-8") as f:
|
|
try:
|
|
trades = json.load(f)
|
|
except Exception:
|
|
trades = []
|
|
trades.append(trade)
|
|
with open(trades_file, "w", encoding="utf-8") as f:
|
|
json.dump(trades, f, ensure_ascii=False, indent=2)
|
|
logger.debug("거래기록 저장됨: %s", trades_file)
|
|
except Exception as e:
|
|
logger.exception("거래기록 저장 실패: %s", e)
|
|
|
|
def _update_df_with_realtime_price(df: pd.DataFrame, symbol: str, timeframe: str, buffer: list) -> pd.DataFrame:
|
|
"""
|
|
실시간 가격으로 현재 캔들 업데이트
|
|
타임프레임에 따라 현재 캔들이 아직 진행 중인지 판단하여 업데이트
|
|
"""
|
|
try:
|
|
from datetime import datetime, timezone
|
|
current_price = get_current_price(symbol)
|
|
if not (current_price > 0 and df is not None and not df.empty):
|
|
return df
|
|
|
|
last_candle_time = df.index[-1]
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# 타임프레임별 interval_seconds 계산
|
|
interval_seconds = 0
|
|
try:
|
|
if 'm' in timeframe:
|
|
interval_seconds = int(timeframe.replace('m', '')) * 60
|
|
elif 'h' in timeframe:
|
|
interval_seconds = int(timeframe.replace('h', '')) * 3600
|
|
elif timeframe == 'D':
|
|
interval_seconds = 86400
|
|
elif timeframe == 'W':
|
|
interval_seconds = 86400 * 7
|
|
elif timeframe == 'M':
|
|
# 월봉은 정확한 계산이 어려우므로 업데이트 건너뜀
|
|
buffer.append("warning: 월봉(M)은 실시간 업데이트를 지원하지 않습니다")
|
|
return df
|
|
except (ValueError, AttributeError) as e:
|
|
buffer.append(f"warning: 타임프레임 파싱 실패 ({timeframe}): {e}")
|
|
return df
|
|
|
|
if interval_seconds > 0:
|
|
if last_candle_time.tzinfo is None:
|
|
last_candle_time = last_candle_time.tz_localize(timezone.utc)
|
|
|
|
next_candle_time = last_candle_time + pd.Timedelta(seconds=interval_seconds)
|
|
|
|
if last_candle_time <= now < next_candle_time:
|
|
# 현재 캔들이 아직 진행 중이므로 실시간 가격으로 업데이트
|
|
df.loc[df.index[-1], 'close'] = current_price
|
|
df.loc[df.index[-1], 'high'] = max(df.loc[df.index[-1], 'high'], current_price)
|
|
df.loc[df.index[-1], 'low'] = min(df.loc[df.index[-1], 'low'], current_price)
|
|
buffer.append(f"실시간 캔들 업데이트 적용: close={current_price:.2f}, timeframe={timeframe}")
|
|
else:
|
|
buffer.append(f"info: 새로운 캔들 시작됨, 실시간 업데이트 건너뜀 (timeframe={timeframe})")
|
|
else:
|
|
buffer.append(f"warning: interval_seconds 계산 실패 (timeframe={timeframe})")
|
|
except Exception as e:
|
|
buffer.append(f"warning: 실시간 캔들 업데이트 실패: {e}")
|
|
return df
|
|
|
|
def process_symbol(symbol: str, timeframe: str, candle_count: int, telegram_token: str, telegram_chat_id: str, dry_run: bool, indicators: dict = None, indicator_timeframe: str = None, cfg: RuntimeConfig | None = None) -> dict:
|
|
result = {"symbol": symbol, "summary": [], "telegram": None, "error": None}
|
|
try:
|
|
if cfg is not None:
|
|
timeframe = timeframe or cfg.timeframe
|
|
candle_count = candle_count or cfg.candle_count
|
|
telegram_token = telegram_token or cfg.telegram_bot_token
|
|
telegram_chat_id = telegram_chat_id or cfg.telegram_chat_id
|
|
dry_run = cfg.dry_run if dry_run is None else dry_run
|
|
indicator_timeframe = indicator_timeframe or cfg.indicator_timeframe
|
|
buffer = []
|
|
use_tf = indicator_timeframe or timeframe
|
|
|
|
df = fetch_ohlcv(symbol, use_tf, candle_count=candle_count, log_buffer=buffer)
|
|
df = _update_df_with_realtime_price(df, symbol, use_tf, buffer)
|
|
|
|
if buffer:
|
|
for b in buffer:
|
|
result["summary"].append(b)
|
|
if df.empty or len(df) < 3:
|
|
result["summary"].append(f"MACD 계산에 충분한 데이터 없음: {symbol}")
|
|
result["error"] = "insufficient_data"
|
|
return result
|
|
|
|
hist = compute_macd_hist(df["close"], log_buffer=buffer)
|
|
|
|
if buffer and len(result["summary"]) == 0:
|
|
for b in buffer:
|
|
result["summary"].append(b)
|
|
if len(hist.dropna()) < 2:
|
|
result["summary"].append(f"MACD 히스토그램 값 부족: {symbol}")
|
|
result["error"] = "insufficient_macd"
|
|
return result
|
|
|
|
ind = indicators or {}
|
|
macd_fast = int(ind.get("macd_fast", 12))
|
|
macd_slow = int(ind.get("macd_slow", 26))
|
|
macd_signal = int(ind.get("macd_signal", 9))
|
|
adx_length = int(ind.get("adx_length", 14))
|
|
adx_threshold = float(ind.get("adx_threshold", 25))
|
|
sma_short_len = int(ind.get("sma_short", 5))
|
|
sma_long_len = int(ind.get("sma_long", 200))
|
|
|
|
macd_df = ta.macd(df["close"], fast= macd_fast, slow= macd_slow, signal= macd_signal)
|
|
cols = list(macd_df.columns)
|
|
hist_cols = [c for c in cols if "MACDh" in c or "hist" in c.lower()]
|
|
macd_cols = [c for c in cols if ("MACD" in c and c not in hist_cols and not c.lower().endswith("s"))]
|
|
signal_cols = [c for c in cols if ("MACDs" in c or c.lower().endswith("s") or "signal" in c.lower())]
|
|
|
|
if not macd_cols or not signal_cols:
|
|
if len(cols) >= 3:
|
|
macd_col = cols[0]
|
|
signal_col = cols[-1]
|
|
else:
|
|
raise RuntimeError("MACD columns not found")
|
|
else:
|
|
macd_col = macd_cols[0]
|
|
signal_col = signal_cols[0]
|
|
|
|
macd_line = macd_df[macd_col].dropna()
|
|
signal_line = macd_df[signal_col].dropna()
|
|
|
|
sma_short = compute_sma(df["close"], sma_short_len, log_buffer=buffer)
|
|
sma_long = compute_sma(df["close"], sma_long_len, log_buffer=buffer)
|
|
|
|
adx_df = ta.adx(df["high"], df["low"], df["close"], length=adx_length)
|
|
adx_cols = [c for c in adx_df.columns if "ADX" in c.upper()]
|
|
adx_series = adx_df[adx_cols[0]].dropna() if adx_cols else pd.Series([])
|
|
|
|
if macd_line is None or signal_line is None:
|
|
if hist is not None and len(hist.dropna()) >= 2:
|
|
slope = float(hist.dropna().iloc[-1] - hist.dropna().iloc[-2])
|
|
if slope > 0:
|
|
close_price = float(df["close"].iloc[-1])
|
|
result["summary"].append(f"매수 신호발생 (히스토그램 기울기): {symbol}")
|
|
result["telegram"] = f"매수 신호발생: {symbol} -> 히스토그램 기울기 양수\n가격: {close_price:.2f}\n사유: histogram slope {slope:.6f}"
|
|
return result
|
|
result["summary"].append("조건 미충족: 히스토그램 기울기 음수")
|
|
result["error"] = "insufficient_macd"
|
|
return result
|
|
result["summary"].append("MACD 데이터 부족: 교차 판별 불가")
|
|
result["error"] = "insufficient_macd"
|
|
return result
|
|
|
|
if len(macd_line) < 2 or len(signal_line) < 2:
|
|
result["summary"].append("MACD 데이터 부족: 교차 판별 불가")
|
|
return result
|
|
|
|
close = float(df["close"].iloc[-1])
|
|
prev_macd = float(macd_line.iloc[-2])
|
|
curr_macd = float(macd_line.iloc[-1])
|
|
prev_signal = float(signal_line.iloc[-2])
|
|
curr_signal = float(signal_line.iloc[-1])
|
|
prev_sma_short = float(sma_short.dropna().iloc[-2]) if len(sma_short.dropna()) >= 2 else None
|
|
curr_sma_short = float(sma_short.dropna().iloc[-1]) if len(sma_short.dropna()) >= 1 else None
|
|
prev_sma_long = float(sma_long.dropna().iloc[-2]) if len(sma_long.dropna()) >= 2 else None
|
|
curr_sma_long = float(sma_long.dropna().iloc[-1]) if len(sma_long.dropna()) >= 1 else None
|
|
prev_adx = float(adx_series.iloc[-2]) if len(adx_series) >= 2 else None
|
|
curr_adx = float(adx_series.iloc[-1]) if len(adx_series) >= 1 else None
|
|
|
|
cross_macd_signal = (prev_macd < prev_signal and curr_macd > curr_signal)
|
|
cross_macd_zero = (prev_macd < 0 and curr_macd > 0)
|
|
macd_cross_ok = (cross_macd_signal or cross_macd_zero)
|
|
macd_above_signal = (curr_macd > curr_signal)
|
|
|
|
sma_condition = (curr_sma_short is not None and curr_sma_long is not None and curr_sma_short > curr_sma_long)
|
|
cross_sma = (prev_sma_short is not None and prev_sma_long is not None and prev_sma_short < prev_sma_long and curr_sma_short is not None and curr_sma_long is not None and curr_sma_short > curr_sma_long)
|
|
|
|
adx_ok = (curr_adx is not None and curr_adx > adx_threshold)
|
|
cross_adx = (prev_adx is not None and curr_adx is not None and prev_adx <= adx_threshold and curr_adx > adx_threshold)
|
|
|
|
matches = []
|
|
if macd_cross_ok and sma_condition:
|
|
matches.append("매수조건1")
|
|
if cross_sma and macd_above_signal and adx_ok:
|
|
matches.append("매수조건2")
|
|
if cross_adx and sma_condition and macd_above_signal:
|
|
matches.append("매수조건3")
|
|
|
|
if matches:
|
|
result["summary"].append(f"매수 신호발생: {symbol} -> {', '.join(matches)}")
|
|
text = f"매수 신호발생: {symbol} -> {', '.join(matches)}\n가격: {close:.2f}\n"
|
|
result["telegram"] = text
|
|
|
|
amount = cfg.config.get("auto_trade", {}).get("buy_amount", 0) if cfg else 0
|
|
trade_recorded = False
|
|
|
|
if dry_run:
|
|
trade = make_trade_record(symbol, "buy", amount, True, price=close, status="simulated")
|
|
record_trade(trade, "trades.json")
|
|
trade_recorded = True
|
|
elif cfg is not None and cfg.trading_mode == "auto_trade":
|
|
auto_trade_cfg = cfg.config.get("auto_trade", {})
|
|
buy_enabled = auto_trade_cfg.get("buy_enabled", False)
|
|
buy_amount = auto_trade_cfg.get("buy_amount", 0)
|
|
allowed_symbols = auto_trade_cfg.get("allowed_symbols", [])
|
|
|
|
can_auto_buy = buy_enabled and buy_amount > 0
|
|
if allowed_symbols and symbol not in allowed_symbols:
|
|
can_auto_buy = False
|
|
|
|
if can_auto_buy:
|
|
logger.info("[%s] 자동 매수 조건 충족: 매수 주문 시작 (금액: %d)", symbol, buy_amount)
|
|
from .order import execute_buy_order_with_confirmation
|
|
|
|
buy_result = execute_buy_order_with_confirmation(symbol=symbol, amount=buy_amount, config=cfg.config, telegram_token=telegram_token, telegram_chat_id=telegram_chat_id, account_number=cfg.kiwoom_account_number, dry_run=False, parse_mode=cfg.telegram_parse_mode)
|
|
result["buy_order"] = buy_result
|
|
|
|
if buy_result.get("status") == "placed":
|
|
trade_recorded = True
|
|
from .holdings import add_new_holding
|
|
quantity = buy_result.get("quantity", 0)
|
|
add_new_holding(symbol, close, quantity, time.time(), "holdings.json")
|
|
|
|
trade = make_trade_record(symbol, "buy", buy_amount, False, price=close, status="filled")
|
|
record_trade(trade, "trades.json")
|
|
|
|
if not trade_recorded and not dry_run:
|
|
trade = make_trade_record(symbol, "buy", amount, False, price=close, status="notified")
|
|
record_trade(trade, "trades.json")
|
|
|
|
return result
|
|
except Exception as e:
|
|
logger.exception("심볼 처리 중 오류: %s -> %s", symbol, e)
|
|
result["error"] = str(e)
|
|
result["summary"].append(f"심볼 처리 중 오류: {symbol} -> {e}")
|
|
return result
|
|
|
|
def check_sell_conditions(
|
|
holdings: Dict[str, Dict[str, Any]],
|
|
cfg: RuntimeConfig,
|
|
config: Dict[str, Any] | None = None,
|
|
check_type: str = CheckType.ALL
|
|
) -> Tuple[List[Dict[str, Any]], int]:
|
|
"""보유 종목의 매도 조건을 확인.
|
|
|
|
Args:
|
|
holdings: 보유 종목 딕셔너리
|
|
cfg: 런타임 설정
|
|
config: 설정 딕셔너리
|
|
check_type: 체크 타입 (CheckType.STOP_LOSS, CheckType.PROFIT_TAKING, CheckType.ALL)
|
|
|
|
Returns:
|
|
(매도 결과 리스트, 매도 신호 개수)
|
|
"""
|
|
if config is None and cfg is not None and hasattr(cfg, "config"):
|
|
config = cfg.config
|
|
results = []
|
|
|
|
telegram_token = cfg.telegram_bot_token
|
|
telegram_chat_id = cfg.telegram_chat_id
|
|
dry_run = cfg.dry_run
|
|
account_number = cfg.kiwoom_account_number
|
|
trading_mode = cfg.trading_mode
|
|
|
|
if not holdings:
|
|
logger.info("보유 정보가 없음 - 매도 조건 검사 건너뜀")
|
|
if telegram_token and telegram_chat_id:
|
|
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매도 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
|
|
return [], 0
|
|
|
|
sell_signal_count = 0
|
|
from src.order import execute_sell_order_with_confirmation
|
|
for symbol, holding_info in holdings.items():
|
|
try:
|
|
current_price = get_current_price(symbol)
|
|
if current_price <= 0:
|
|
logger.warning("현재가 조회 실패: %s", symbol)
|
|
continue
|
|
buy_price = float(holding_info.get("buy_price", 0))
|
|
max_price = float(holding_info.get("max_price", current_price))
|
|
if buy_price <= 0:
|
|
logger.warning("매수가 정보 없음: %s", symbol)
|
|
continue
|
|
|
|
sell_result = evaluate_sell_conditions(
|
|
current_price,
|
|
buy_price,
|
|
max_price,
|
|
holding_info,
|
|
config,
|
|
check_type
|
|
)
|
|
|
|
logger.info("[%s] 매도 조건 검사 - 현재가: %.2f, 매수가: %.2f, 최고가: %.2f, 수익률: %.2f%%, 최고점대비: %.2f%%", symbol, current_price, buy_price, max_price, sell_result["profit_rate"], sell_result["max_drawdown"])
|
|
logger.info("[%s] 매도 상태: %s (비율: %.0f%%), 사유: %s", symbol, sell_result["status"], sell_result["sell_ratio"] * 100, ", ".join(sell_result["reasons"]))
|
|
result = {"symbol": symbol, "status": sell_result["status"], "sell_ratio": sell_result["sell_ratio"], "profit_rate": sell_result["profit_rate"], "max_drawdown": sell_result["max_drawdown"], "reasons": sell_result["reasons"], "current_price": current_price, "buy_price": buy_price, "max_price": max_price, "amount": holding_info.get("amount", 0)}
|
|
results.append(result)
|
|
|
|
if sell_result.get("set_partial_sell_done", False) and dry_run:
|
|
from src.holdings import set_holding_field
|
|
set_holding_field(symbol, "partial_sell_done", True, "holdings.json")
|
|
logger.info("[%s] partial_sell_done 플래그 설정 완료 (dry_run)", symbol)
|
|
|
|
if sell_result["sell_ratio"] > 0:
|
|
sell_signal_count += 1
|
|
if ((cfg.trading_mode == "auto_trade" and dry_run) or cfg.trading_mode != "auto_trade"):
|
|
if telegram_token and telegram_chat_id:
|
|
from .signals import build_sell_message
|
|
msg = build_sell_message(symbol, sell_result, parse_mode=cfg.telegram_parse_mode or "HTML")
|
|
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
|
|
|
|
if cfg.trading_mode == "auto_trade" and not dry_run:
|
|
total_amount = float(holding_info.get("amount", 0))
|
|
sell_ratio = float(sell_result.get("sell_ratio", 0.0) or 0.0)
|
|
|
|
sell_ratio = _adjust_sell_ratio_for_min_order(symbol, total_amount, sell_ratio, current_price, config)
|
|
|
|
amount_to_sell = int(total_amount * sell_ratio)
|
|
|
|
if amount_to_sell > 0:
|
|
logger.info("[%s] 자동 매도 조건 충족: 매도 주문 시작 (총 수량: %d, 매도 비율: %.0f%%, 주문 수량: %d)", symbol, total_amount, sell_ratio * 100, amount_to_sell)
|
|
sell_order_result = execute_sell_order_with_confirmation(
|
|
symbol=symbol,
|
|
quantity=amount_to_sell,
|
|
config=config,
|
|
telegram_token=telegram_token,
|
|
telegram_chat_id=telegram_chat_id,
|
|
account_number=account_number,
|
|
dry_run=dry_run,
|
|
parse_mode=cfg.telegram_parse_mode or "HTML"
|
|
)
|
|
|
|
if sell_order_result and sell_result.get("set_partial_sell_done"):
|
|
if sell_order_result.get("status") == "placed":
|
|
from .holdings import set_holding_field
|
|
if set_holding_field(symbol, "partial_sell_done", True, holdings_file="holdings.json"):
|
|
logger.info("[%s] 부분 매도(1회성) 완료, partial_sell_done 플래그를 True로 업데이트합니다.", symbol)
|
|
else:
|
|
logger.error("[%s] partial_sell_done 플래그 업데이트에 실패했습니다.", symbol)
|
|
except Exception as e:
|
|
logger.exception("매도 조건 확인 중 오류 (%s): %s", symbol, e)
|
|
if telegram_token and telegram_chat_id and not any(r["sell_ratio"] > 0 for r in results):
|
|
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매도 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
|
|
return results, sell_signal_count |