최초 프로젝트 업로드 (Script Auto Commit)

This commit is contained in:
2025-12-03 22:37:46 +09:00
commit ed7084dd8f
30 changed files with 3254 additions and 0 deletions

43
.github/copilot-instructions.md vendored Normal file
View File

@@ -0,0 +1,43 @@
<!-- 개발 원칙 가이드 -->
<!-- copilot-instructions.md -->
<!-- -->
<!-- Project_Root/ -->
<!-- ├── .github/ # 깃허브 코파일럿 설정 폴더 -->
<!-- │ └── copilot-instructions.md # 1. 개발 원칙 (AI 페르소나 및 코딩 규칙) -->
<!-- ├── docs/ # 기획 및 설계 문서 -->
<!-- │ ├── PRD.md # 2. 기획 및 로직 설계서 (프로젝트 지도) -->
<!-- │ ├── implementation_plan.md # 3. 단계별 구현 체크리스트 (작업 지시서) -->
<!-- │ └── review_prompt.md # 4. AI 코드 리뷰 지침 (품질 관리) -->
<!-- └── src/ # 소스 코드 -->
<!-- ├── main.py -->
<!-- └── ... -->
# Project Rules & AI Persona
## 1. Role & Persona
- 당신은 Google, Meta 출신의 20년 차 **Principal Software Engineer**입니다.
- **C++ (C++17/20)** 및 **Python (3.11+)** 전문가입니다.
- 코드는 간결하고, 성능 효율적이며, 유지보수 가능해야 합니다.
- 불필요한 서론(대화)을 생략하고 **코드와 핵심 설명**만 출력하십시오.
## 2. Tech Stack & Style
- **Python:**
- 모든 함수에 Type Hinting (typing 모듈) 필수 적용.
- PEP8 스타일 준수 (Black Formatter 기준).
- Docstring은 Google Style을 따름.
- **C++:**
- Modern C++ (Smart Pointers, RAII, move semantics) 적극 활용.
- Raw pointer 사용 금지 (필수적인 경우 주석으로 사유 명시).
- Google C++ Style Guide 준수.
## 3. Coding Principles
- **DRY (Don't Repeat Yourself):** 중복 로직은 반드시 함수나 클래스로 분리.
- **Early Return:** 들여쓰기 깊이(Indentation depth)를 최소화하기 위해 가드 절(Guard Clause) 사용.
- **Error Handling:**
- `try-except` (Python) 또는 `try-catch` (C++)를 남용하지 말 것.
- 예외를 삼키지 말고(Silent Failure 금지), 명확한 에러 로그를 남길 것.
- **Security:** API Key, 비밀번호 등 민감 정보는 절대 하드코딩 금지 (.env 사용).
## 4. Response Rules
- 코드를 수정할 때는 변경된 부분만 보여주지 말고, **문맥 파악이 가능한 전체 함수/블록**을 보여주세요.
- 파일 경로와 이름을 코드 블록 상단에 항상 주석으로 명시하세요.

58
.gitignore vendored Normal file
View File

@@ -0,0 +1,58 @@
# Environment variables
.env
.env.local
.env.*.local
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
venv/
ENV/
env/
.venv
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Test data (in tests/ folder)
tests/holdings.json
tests/holdings.json.example
tests/*.txt
tests/*.log
# Logs (in logs/ folder)
logs/*.log
# Production data (root level)
trades.json
pending_orders.json
confirmed_tokens.txt

35
Dockerfile Normal file
View File

@@ -0,0 +1,35 @@
# Synology DSM용 MACD 알림 봇 Dockerfile
FROM python:3.12-slim
# 필수 패키지 설치
RUN apt-get update && apt-get install -y \
build-essential \
libffi-dev \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# 작업 디렉토리 생성 및 이동
WORKDIR /app
# 소스 복사
COPY . /app
# 파이썬 패키지 설치
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt
# 환경변수 예시 파일 복사 (필요시)
# COPY .env.example .env
# 로그 폴더 생성
RUN mkdir -p logs
# 기본 실행 명령
CMD ["python", "main.py"]
# 포트 노출 (필요시)
# EXPOSE 8000
# 시놀로지 DSM에서 빌드 예시:
# docker build -t macd_alarm .
# docker run --env-file .env -v $(pwd)/logs:/app/logs macd_alarm

120
README.md Normal file
View File

@@ -0,0 +1,120 @@
# MACD 알림 봇 (Upbit 기반)
이 프로젝트는 Upbit의 OHLCV 데이터를 `pyupbit`로 가져와 MACD, SMA, ADX를 계산하고, 설정된 매수/매도 조건에 따라 Telegram으로 알림을 보내는 봇입니다.
---
## 프로젝트 구조
최근 프로젝트가 모듈화되어 다음과 같은 구조를 갖습니다:
```
macd_alarm/
├── main.py # 프로그램의 진입점
├── src/ # 모듈화된 코드
│ ├── __init__.py # 패키지 초기화
│ ├── common.py # 로깅 설정
│ ├── config.py # 설정 및 심볼 로드, RuntimeConfig
│ ├── indicators.py # MACD 및 지표 계산
│ ├── holdings.py # 보유 자산 관리
│ ├── order.py # 주문 및 확인
│ ├── signals.py # 매수/매도 신호 처리
│ ├── notifications.py # Telegram 알림
│ ├── threading_utils.py # 멀티스레딩 유틸리티
│ └── tests/ # 테스트 코드
│ ├── test_helpers.py
│ ├── test_main.py
│ └── test_evaluate_sell_conditions.py
└── pytest.ini # pytest 설정
```
---
## 주요 기능
- **config.py**: 설정 파일(`config.json`) 로드, 심볼 목록 읽기, `RuntimeConfig` 데이터클래스로 실행 컨텍스트 관리.
- **indicators.py**: OHLCV 데이터를 가져오고 MACD, SMA, ADX 계산.
- **holdings.py**: 보유 자산 로드, 저장 및 현재 가격 확인.
- **order.py**: 주문 실행 및 결과 확인.
- **signals.py**: 매수/매도 조건 확인 및 기록.
- **notifications.py**: Telegram 알림 전송.
- **threading_utils.py**: 멀티스레딩 실행 지원.
### RuntimeConfig
프로젝트는 `RuntimeConfig` 데이터클래스를 사용하여 설정과 환경 변수를 단일 컨텍스트로 관리합니다. 이를 통해 함수 간 파라미터 전달이 간소화되고, 새로운 설정 옵션을 쉽게 추가할 수 있습니다.
```python
from src.config import build_runtime_config
# config.json 로드 후
cfg = build_runtime_config(config_dict)
# cfg는 다음을 포함:
# - timeframe, indicator_timeframe, candle_count
# - telegram_bot_token, telegram_chat_id
# - upbit_access_key, upbit_secret_key
# - dry_run, max_threads, trading_mode 등
```
---
## 실행 방법
1. **의존성 설치**:
```bash
python -m pip install -r requirements.txt
```
2. **환경 변수 설정** (Telegram 및 Upbit API 키):
PowerShell:
```powershell
$env:TELEGRAM_BOT_TOKEN = "YOUR_BOT_TOKEN"
$env:TELEGRAM_CHAT_ID = "YOUR_CHAT_ID"
$env:UPBIT_ACCESS_KEY = "YOUR_UPBIT_ACCESS_KEY"
$env:UPBIT_SECRET_KEY = "YOUR_UPBIT_SECRET_KEY"
```
또는 `.env` 파일 생성:
```
TELEGRAM_BOT_TOKEN=YOUR_BOT_TOKEN
TELEGRAM_CHAT_ID=YOUR_CHAT_ID
UPBIT_ACCESS_KEY=YOUR_UPBIT_ACCESS_KEY
UPBIT_SECRET_KEY=YOUR_UPBIT_SECRET_KEY
```
3. **설정 파일 준비**:
```bash
copy config.example.json config.json
```
`config.json` 파일을 필요에 따라 수정합니다.
4. **프로그램 실행**:
```bash
python main.py
```
---
## 테스트 실행
1. **pytest 설치**:
```bash
python -m pip install pytest
```
2. **테스트 실행**:
```bash
pytest
```
---
이 문서는 프로젝트의 최신 구조와 실행 방법을 반영하도록 업데이트되었습니다.

54
config/config.json Normal file
View File

@@ -0,0 +1,54 @@
{
"symbols_file": "config/symbols.txt",
"symbol_delay": 1.0,
"candle_count": 200,
"buy_check_interval_minutes": 240,
"stop_loss_check_interval_minutes": 60,
"profit_taking_check_interval_minutes": 240,
"loop": true,
"dry_run": true,
"max_threads": 3,
"telegram_parse_mode": "HTML",
"macd_fast": 12,
"macd_slow": 26,
"macd_signal": 9,
"adx_length": 14,
"adx_threshold": 25,
"sma_short": 5,
"sma_long": 200,
"trading_mode": "auto_trade",
"auto_trade": {
"enabled": true,
"buy_enabled": true,
"buy_amount": 1000000,
"min_order_value": 100000,
"allowed_symbols": [],
"require_env_confirm": false,
"buy_price_slippage_pct": 0.2,
"loss_threshold": -5.0,
"profit_threshold_1": 10.0,
"profit_threshold_2": 30.0,
"drawdown_1": 5.0,
"drawdown_2": 15.0,
"partial_sell_ratio": 0.5
},
"confirm": {
"confirm_via_file": false,
"confirm_timeout": 300
},
"monitor": {
"enabled": true,
"timeout": 120,
"poll_interval": 3,
"max_retries": 1
},
"notify": {
"order_filled": true,
"order_partial": true,
"order_cancelled": true,
"order_error": true
},
"kiwoom": {
"account_number": "YOUR_ACCOUNT_NUMBER"
}
}

4
config/symbols.txt Normal file
View File

@@ -0,0 +1,4 @@
# symbols.txt - 한 줄에 하나의 주식 종목코드 입력
# 빈 줄과 #으로 시작하는 줄은 무시됨
005930
000660

354
docs/DEPLOYMENT.md Normal file
View File

@@ -0,0 +1,354 @@
# 자동매매 활성화 가이드 (Deployment & Safety)
## 개요
이 문서는 MACD 알림 봇의 **자동매매 기능(auto_trade)**을 안전하게 활성화하고 운영하는 방법을 설명합니다.
---
## 1. 사전 준비 (필수)
### 1.1 Upbit API 키 생성
1. [Upbit 홈페이지](https://upbit.com) 로그인
2. 계정 설정 → API 관리 → "Open API" 클릭
3. "오픈 API 키 생성" 버튼 클릭
4. 접근 권한:
-**조회**: 계좌, 자산, 주문내역 (필수)
-**매매**: 매도 주문 (필수)
-**출금**: 해제 권장 (자동매매에 불필요)
5. IP 화이트리스트: 봇이 실행되는 서버 IP 추가 (선택, 권장)
6. Access Key / Secret Key 메모 (비밀 유지!)
### 1.2 Telegram Bot 토큰 & Chat ID 확인
- 기존에 설정했다면 그대로 사용 가능
- 없다면 [BotFather로 새 봇 생성](https://core.telegram.org/bots#botfather) 후 Chat ID 확인
### 1.3 환경변수 설정 (`.env` 파일)
```bash
# Telegram 알림
TELEGRAM_BOT_TOKEN=your_bot_token
TELEGRAM_CHAT_ID=your_chat_id
# Upbit API (자동매매 시 필수)
UPBIT_ACCESS_KEY=your_access_key
UPBIT_SECRET_KEY=your_secret_key
# 선택: 자동매매 활성화 확인 (require_env_confirm=true일 때)
AUTO_TRADE_ENABLED=1
# 선택: 주문 모니터링 타임아웃 (초, 기본 120)
ORDER_MONITOR_TIMEOUT=120
# 선택: 주문 폴링 간격 (초, 기본 3)
ORDER_POLL_INTERVAL=3
# 선택: 재시도 횟수 (기본 1)
ORDER_MAX_RETRIES=1
```
⚠️ `.env` 파일을 `.gitignore`에 추가하여 비밀 정보를 저장소에 커밋하지 마세요.
---
## 2. config.json 설정
### 2.1 기본 설정 (신중하게)
```json
{
"trading_mode": "signal_only",
"auto_trade": {
"enabled": false,
"max_trade_krw": 1000000,
"allowed_symbols": [],
"require_env_confirm": true
},
"confirm": {
"confirm_via_file": true,
"confirm_timeout": 300
},
"monitor": {
"enabled": true,
"timeout": 120,
"poll_interval": 3,
"max_retries": 1
},
"notify": {
"order_filled": true,
"order_partial": true,
"order_cancelled": true,
"order_error": true
},
"dry_run": true
}
```
### 2.2 각 항목 설명
#### `trading_mode`
- `"signal_only"` (기본): Telegram 알림만 전송
- `"auto_trade"`: 자동매매 시도 (수동 확인 필수)
- `"mixed"`: 알림 + 거래 기록
#### `auto_trade`
- `enabled`: `false`로 유지 (기본값). 실제 자동매도 활성화하려면 명시적으로 `true`로 변경
- `buy_enabled`: `false`로 유지 (기본값). 실제 자동매수 활성화하려면 명시적으로 `true`로 변경
- `buy_amount_krw`: 매수 시 사용할 KRW 금액 (예: 10,000)
- `max_trade_krw`: 한 번에 매도할 최대 KRW 금액 (예: 1,000,000)
- `allowed_symbols`: 자동매매 허용 심볼 목록. 빈 배열이면 모든 심볼 허용
- 예: `["KRW-BTC", "KRW-ETH"]` (이 심볼들만 자동매매)
- `require_env_confirm`: `true`이면 환경변수 `AUTO_TRADE_ENABLED=1` 필요 (권장)
#### `confirm`
- `confirm_via_file`: 파일 기반 확인 활성화 여부
- `confirm_timeout`: 확인 대기 시간(초). 이 시간 내에 확인이 없으면 주문 취소
#### `monitor`
- `enabled`: 주문 모니터링 활성화
- `timeout`: 주문 폴링 타임아웃(초)
- `poll_interval`: 주문 상태 확인 간격(초)
- `max_retries`: 타임아웃 시 재시도 횟수
#### `notify`
- `order_filled`: 완전체결 시 알림
- `order_partial`: 부분체결/타임아웃 시 알림
- `order_cancelled`: 주문 취소 시 알림
- `order_error`: 오류 발생 시 알림
---
## 3. 안전 체크리스트 (자동매매 활성화 전)
### ✅ 필수 확인 항목
- [ ] Upbit API 키가 올바르게 발급되었나? (테스트 계좌에서 실제 키 사용하기)
- [ ] `.env` 파일이 `.gitignore`에 있나?
- [ ] `config.json``dry_run``true`로 설정되어 있나? (테스트 단계)
- [ ] `auto_trade.enabled``false`로 설정되어 있나? (테스트 단계)
- [ ] 로그 파일(`logs/macd_alarm.log`)을 확인할 수 있는 환경인가?
### ✅ 단계적 활성화 (점진적 위험 증가)
#### 3.1 단계 1: Signal-Only 모드 (알림만)
```json
{
"trading_mode": "signal_only",
"dry_run": true
}
```
- 실행: `python main.py`
- 기대 결과: 매도/매수 신호 감지 시 Telegram 알림 수신
- 확인: 로그에 오류 없고, Telegram 메시지가 제대로 도착하는가?
#### 3.2 단계 2: Mixed 모드 (알림 + 기록, dry-run)
```json
{
"trading_mode": "mixed",
"auto_trade": {
"enabled": false,
"buy_enabled": false
},
"dry_run": true
}
```
- 실행: `python main.py`
- 기대 결과: 매도 신호 시 Telegram + `trades.json` 기록
- 매수 신호는 `auto_trade.buy_enabled`가 true일 때만 `trades.json`에 기록됨
- 확인: `trades.json`에 매도 기록이 제대로 저장되는가? (매수 기록은 buy_enabled가 true일 때만 저장됨)
#### 3.3 단계 3: Auto-Trade 시뮬레이션 (dry-run, 실제 주문 아님)
```json
{
"trading_mode": "auto_trade",
"auto_trade": {
"enabled": true,
"max_trade_krw": 100000,
"allowed_symbols": [],
"require_env_confirm": true
},
"dry_run": true
}
```
- 환경변수 설정:
```bash
$env:AUTO_TRADE_ENABLED = "1"
```
- 실행: `python main.py`
- 기대 결과:
- 매도 신호 감지 시 Telegram으로 확인 토큰 수신
- 토큰으로 파일 생성 후 주문 진행 (시뮬레이션)
- `pending_orders.json` 및 `trades.json`에 기록
- 확인:
- Telegram 확인 메시지 형식이 명확한가?
- 파일 생성으로 확인 메커니즘이 동작하는가?
- 거래 기록이 제대로 저장되는가?
#### 3.4 단계 4: **실제 자동매도 활성화** (최종 단계, 신중!)
```json
{
"trading_mode": "auto_trade",
"auto_trade": {
"enabled": true,
"buy_enabled": false,
"max_trade_krw": 100000,
"allowed_symbols": ["KRW-BTC"],
"require_env_confirm": true
},
"dry_run": false
}
```
- 환경변수 설정:
```bash
$env:AUTO_TRADE_ENABLED = "1"
```
- Upbit 테스트 계좌 또는 소액 계좌 사용 권장
- 실행: `python main.py`
- 확인:
- Telegram에서 실제 매도 주문 확인 메시지 수신
- 파일 생성으로 주문 실행
- Upbit 계정에서 주문 이력 확인
- 모니터링 후 체결 알림 수신
#### 3.5 단계 5: **실제 자동매수 활성화** (최종 단계, 매우 신중!)
```json
{
"trading_mode": "auto_trade",
"auto_trade": {
"enabled": true,
"buy_enabled": true,
"buy_amount_krw": 10000,
"max_trade_krw": 100000,
"allowed_symbols": ["KRW-BTC"],
"require_env_confirm": true
},
"dry_run": false
}
```
- 환경변수 설정:
```bash
$env:AUTO_TRADE_ENABLED = "1"
```
- ⚠️ **중요**: 자동매수는 자본 손실 위험이 높습니다. 반드시 소액으로 시작하세요.
- Upbit 테스트 계좌 또는 소액 계좌 사용 필수
- 실행: `python main.py`
- 확인:
- 매수 신호 감지 시 Telegram에서 매수 주문 확인 메시지 수신
- 파일 생성으로 주문 실행
- Upbit 계정에서 매수 주문 이력 확인
- 모니터링 후 체결 알림 수신
- `holdings.json`에 새로운 보유 정보 자동 기록
---
## 4. 주문 확인 프로세스
### 4.1 Telegram 기반 확인 (현재 구현)
1. 자동매매 신호 감지 → Telegram 알림 수신
```
[확인필요] 자동매매 주문 대기
토큰: a1b2c3d4
심볼: KRW-BTC
주문량: 0.00010000
```
2. 프로젝트 루트에서 파일 생성 (PowerShell 예):
```powershell
New-Item "confirm_a1b2c3d4" -ItemType File
```
또는 `confirmed_tokens.txt` 파일에 토큰 추가:
```
a1b2c3d4
```
3. 코드가 자동으로 파일/토큰 감지 → 주문 실행
### 4.2 타임아웃
- 기본 300초(5분) 내에 확인이 없으면 자동 취소
- `config.json`의 `confirm.confirm_timeout`으로 조정 가능
---
## 5. 운영 (Troubleshooting)
### 주문이 실행되지 않는 경우
1. **환경변수 확인**:
```bash
echo $env:AUTO_TRADE_ENABLED
# 결과: 1이어야 함
```
2. **API 키 확인**:
```bash
echo $env:UPBIT_ACCESS_KEY
# 결과: 실제 키가 설정되어 있어야 함
```
3. **로그 파일 확인**:
```bash
Get-Content "logs/macd_alarm.log" -Tail 20
# "자동매매 비활성화" 또는 "Upbit API 키 없음" 오류 찾기
```
4. **config.json 문법 확인**:
```bash
python -c "import json; json.load(open('config.json'))"
# JSON 파싱 오류 있으면 출력됨
```
### 부분체결 처리
- 주문량이 크거나 시장 유동성이 낮으면 부분체결 발생 가능
- `ORDER_MAX_RETRIES` 환경변수로 재시도 횟수 조정 (기본 1회)
- 모니터링은 `ORDER_MONITOR_TIMEOUT` 시간(기본 120초) 동안 진행
### 긴급 중지
- 파일 생성하지 않으면 `confirm_timeout` 후 자동 취소
- 또는 `dry_run: true`로 되돌려서 봇 재시작
---
## 6. 성능/안정성 팁
1. **작은 규모로 시작**:
- `max_trade_krw`: 처음엔 10만~100만 KRW 범위
- `allowed_symbols`: 확실한 심볼 몇 개만 선택
2. **로그 모니터링**:
- 실시간: `Get-Content "logs/macd_alarm.log" -Tail 10 -Wait`
- 또는 별도 터미널에서: `tail -f logs/macd_alarm.log`
3. **야간/주말 고려**:
- Upbit는 24/7 운영이지만, 유동성이 낮은 시간대 존재
- 백테스트 후 적절한 시간대에만 매매 권장
4. **백업**:
- `trades.json` 주기적 백업
- 또는 SQLite로 마이그레이션 고려
---
## 7. FAQ
**Q: 실제로 주문이 체결되면 어떻게 되나?**
A: Upbit 계정에서 직접 체결되며, `trades.json` + Telegram 알림으로 기록됩니다.
**Q: 파일 생성을 깜빡하면?**
A: `confirm_timeout` 후 자동 취소되고, `trades.json`에 `user_not_confirmed` 기록됩니다.
**Q: 여러 심볼이 동시에 신호 나면?**
A: 각 심볼마다 독립적인 토큰으로 확인 요청. 병렬 처리 가능.
**Q: 테스트 중 실제 주문을 방지하려면?**
A: `dry_run: true` 또는 `auto_trade.enabled: false`, `auto_trade.buy_enabled: false` 유지.
**Q: 자동매수와 자동매도를 독립적으로 제어할 수 있나?**
A: 네. `auto_trade.enabled`는 매도만, `auto_trade.buy_enabled`는 매수만 제어합니다. 각각 독립적으로 활성화/비활성화 가능합니다.
**Q: 매수 후 holdings.json은 자동으로 업데이트되나?**
A: 네. 매수 체결 완료 시 자동으로 `holdings.json`에 매수가, 수량, 최고가, 매수시간이 기록됩니다.
**Q: 자동매수 금액을 어떻게 설정하나?**
A: `config.json`의 `auto_trade.buy_amount_krw`에 매수할 KRW 금액을 설정하세요. (예: 10000 = 1만원)
---
## 8. 법적 공지
⚠️ **면책**: 이 봇은 교육 목적으로 제작되었습니다. 실제 투자/자동매매 시 발생하는 손실에 대해 개발자는 책임을 지지 않습니다. 충분한 테스트 후 자신의 책임 하에 사용하세요.

33
docs/PRD.md Normal file
View File

@@ -0,0 +1,33 @@
<!-- 기획 및 로직 설계서 -->
<!-- PRD.md -->
# Product Requirements Document (PRD)
## 1. Project Overview
- **프로젝트명:** (예: Stock-Finder-AI)
- **목적:** (예: 미국/한국 주식 시장에서 특정 조건에 맞는 종목을 필터링하고, 매수/매도 신호를 포착하여 알림을 보낸다.)
- **주요 사용자:** 퀀트 투자자, 개인 트레이더
## 2. Core Features (User Stories)
1. **데이터 수집:** 야후 파이낸스 API 및 한국투자증권 API를 통해 일봉/분봉 데이터를 수집한다.
2. **지표 계산:** 수집된 데이터로 RSI, Bollinger Bands, MACD를 계산한다.
3. **필터링 로직:** PBR < 1.0 이면서 RSI < 30인 종목을 추출한다.
4. **알림 발송:** 추출된 종목을 텔레그램 봇으로 전송한다.
## 3. Data Flow & Architecture
- **Input:** 종목 리스트 (Ticker List), 설정된 파라미터 (config.json)
- **Process:**
1. Data Fetcher -> (Raw Data) -> DB 저장
2. Indicator Engine -> (Calculated Data)
3. Screener -> (Filtered List)
- **Output:** JSON 리포트 및 메신저 알림
## 4. File Structure Plan
- /src/data_loader.py : API 연동 및 데이터 수집
- /src/indicators.py : 기술적 지표 계산 로직
- /src/screener.py : 필터링 및 종목 선정 핵심 로직
- /src/notifier.py : 메시지 발송 처리
## 5. Non-Functional Requirements
- **성능:** 2000개 종목 스캔을 3분 이내 완료할 것 (멀티스레딩/비동기 필수).
- **안정성:** API 호출 제한(Rate Limit) 도달 시 자동으로 Backoff/Retry 수행.

View File

@@ -0,0 +1,30 @@
<!-- 단계별 구현 체크리스트 -->
<!-- implementation_plan.md -->
# Implementation Plan
이 문서는 개발 진행 상황을 추적합니다. AI는 이 문서를 참조하여 현재 단계의 작업을 수행해야 합니다.
## Phase 1: 환경 설정 및 기본 구조 [ ]
- [ ] 프로젝트 폴더 구조 생성 및 Git 초기화
- [ ] `copilot-instructions.md``.env` 템플릿 작성
- [ ] Python 가상환경 설정 및 `requirements.txt` 작성 (pandas, numpy, requests 등)
## Phase 2: 데이터 수집 모듈 (Data Fetcher) [ ]
- [ ] `data_loader.py`: 외부 API 연동 클래스 작성
- [ ] API Rate Limit 처리 로직 (Retry/Backoff) 구현
- [ ] 단위 테스트: API 연결 및 데이터 수신 확인
## Phase 3: 핵심 로직 구현 (Core Logic) [ ]
- [ ] `indicators.py`: RSI, MACD 계산 함수 구현
- [ ] `screener.py`: 조건식 필터링 엔진 구현
- [ ] 단위 테스트: 샘플 데이터를 이용한 지표 계산 정확도 검증
## Phase 4: 알림 및 메인 실행 (Interface) [ ]
- [ ] `notifier.py`: 텔레그램/슬랙 메시지 발송 함수
- [ ] `main.py`: 전체 프로세스(수집->계산->필터->알림) 통합 실행
- [ ] 통합 테스트 (Integration Test)
## Phase 5: 최적화 및 리팩토링 [ ]
- [ ] 비동기(asyncio) 적용으로 속도 개선
- [ ] 코드 리뷰 프롬프트(`review_prompt.md`) 기반 자가 점검 수행

88
docs/review_prompt.md Normal file
View File

@@ -0,0 +1,88 @@
<!-- AI 코드 리뷰 지침 -->
<!-- review_prompt.md -->
<!-- -->
<!-- 코드 리뷰 프롬프트(실무용, 핵심 위주) -->
<!-- -->
<!-- [1. 분석 컨텍스트] -->
<!-- 언어/환경: (예: Python 3.11, AWS Lambda) -->
<!-- 코드 목적: (예: 결제 로직 처리) -->
<!-- 핵심 요구: (예: 동시성 문제 해결, 성능 최적화) -->
<!-- -->
<!-- [2. 역할 및 원칙] -->
<!-- 당신은 가장 까다로운 시니어 개발자입니다. 칭찬은 생략하고, 오직 **결함(Bug)**과 **위험 요소(Risk)**만 찾아내십시오. -->
<!-- - 원칙: 코드가 "문제없다"고 가정하지 마십시오. 숨겨진 논리적 오류, 예외 처리 누락, 보안 취약점을 집요하게 파고드십시오. -->
<!-- - 금지: 코드에 없는 내용을 추측하여 지적하지 마십시오(No Hallucination). -->
<!-- -->
<!-- [3. 중점 검토 항목] -->
<!-- 1. 논리 오류: 엣지 케이스(Null, 0, 경계값)에서 로직이 깨지지 않는가? -->
<!-- 2. 안정성: 예외가 발생했을 때 시스템이 안전하게 복구되거나 종료되는가? (Silent Failure 방지) -->
<!-- 3. 보안: SQL 인젝션, XSS, 민감 정보(비번/키) 노출이 있는가? -->
<!-- 4. 효율성: 불필요한 반복문(O(n²))이나 메모리 누수가 있는가? -->
<!-- -->
<!-- [4. 출력 형식] -->
<!-- 발견된 문제가 없다면 "특이사항 없음"이라고 하십시오. 문제가 있다면 아래 양식으로 핵심만 적어주세요. -->
<!-- -->
<!-- 🚨 치명적 문제 (Critical) -->
<!-- - 위치: [라인 번호 또는 코드 스니펫] -->
<!-- - 이유: (왜 위험한지 기술적 설명) -->
<!-- - 해결책: (수정된 코드 블록) -->
<!-- -->
<!-- ⚠️ 개선 제안 (Warning) -->
<!-- - 위치: [라인 번호] -->
<!-- - 내용: (잠재적 위험 또는 가독성/성능 개선 제안) -->
<!-- -->
<!-- 코드 리뷰 프롬프트(심화버전) -->
[1. 분석 컨텍스트]
정확한 분석을 위해 아래 정보를 기반으로 코드를 검토하십시오.
- 언어/프레임워크: (예: Python 3.11, Spring Boot)
- 코드의 목적: (예: 대용량 트래픽 처리, 결제 로직, 데이터 파싱)
- 주요 제약사항: (예: 동시성 처리 필수, 응답속도 중요, 메모리 효율성)
[2. 역할 및 원칙]
당신은 '무관용 원칙'을 가진 수석 소프트웨어 아키텍트입니다.
- 목표: 칭찬보다는 결함(Bug), 보안 취약점, 성능 병목, 유지보수 저해 요소를 찾아내는 데 집중하십시오.
- 금지: 코드에 없는 내용을 추측하여 지적하지 마십시오(Zero Hallucination).
- 기준: "작동한다"에 만족하지 말고, "견고하고 안전한가"를 기준으로 판단하십시오.
[3. 사전 단계: 의도 파악]
분석 전, 이 코드가 수행하는 핵심 로직을 3줄 이내로 요약하여, 당신이 코드를 올바르게 이해했는지 먼저 보여주십시오.
[4. 심층 검토 체크리스트]
다음 항목을 기준으로 코드를 해부하십시오.
1. 논리 및 엣지 케이스 (Logic & Edge Cases)
- 가상 실행: 코드를 한 줄씩 추적하며 변수 상태 변화를 검증했는가?
- 경계값: Null, 빈 값, 음수, 최대값 등 극한의 입력에서 로직이 깨지지 않는가?
- 예외 처리: 에러를 단순히 삼키지 않고(Silent Failure), 적절히 처리하거나 전파하는가?
2. 보안 및 안정성 (Security & Stability)
- 입력 검증: SQL 인젝션, XSS, 버퍼 오버플로우 취약점이 없는가?
- 정보 노출: 비밀번호, API 키, PII(개인정보)가 하드코딩되거나 로그에 남지 않는가?
- 자원 관리: 파일, DB 연결, 메모리 등이 예외 발생 시에도 확실히 해제되는가?
3. 동시성 및 성능 (Concurrency & Performance)
- 동기화: (해당 시) 경쟁 상태(Race Condition), 데드락, 스레드 안전성 문제가 없는가?
- 효율성: 불필요한 중첩 반복문(O(n²)), N+1 쿼리, 중복 연산이 없는가?
[5. 출력 형식: 결함 보고서]
발견된 문제가 없다면 "특이사항 없음"으로 명시하십시오. 문제가 있다면 아래 양식을 엄수해 주세요.
🚨 치명적 문제 (Critical Issues)
(서비스 중단, 데이터 손실/오염, 보안 사고 위험이 있는 경우)
[C-1] 문제 제목
├─ 위치: [파일경로:라인] 또는 [코드 스니펫 3~5줄]
├─ 원인: [기술적 원인 설명]
├─ 재현/조건: [문제가 발생하는 상황]
└─ 해결책: [수정된 코드 블록 (Auto-Fix)]
⚠️ 개선 제안 (Warnings & Improvements)
(성능 저하, 유지보수성 부족, 잠재적 버그)
[W-1] 문제 제목
├─ 위치: [파일경로:라인] 또는 [코드 스니펫]
├─ 분석: [문제점 설명]
└─ 권장 조치: [리팩토링 제안]
✅ 잘된 점 (Strengths)
(핵심적인 장점 1~2가지만 간결하게)

4
docs/sample.md Normal file
View File

@@ -0,0 +1,4 @@
<!-- 사용 방법 (워크플로우) -->
<!-- -->
<!-- rules.md와 PRD.md를 참고해서, implementation_plan.md의 단계로 코드를 작성해줘. -->
<!-- 코드를 review_prompt.md 기준으로 검토해줘. -->

91
git_init.bat.bat Normal file
View File

@@ -0,0 +1,91 @@
@echo off
chcp 65001 > nul
cls
echo ========================================================
echo Git 초기 설정 마법사 V2 (for Gitea)
echo ========================================================
echo.
echo [!] 이 파일은 프로젝트 폴더의 최상위에 위치해야 합니다.
echo [!] Gitea에서 저장소를 생성한 후, HTTPS 주소를 준비해주세요.
echo.
:: 1. 원격 저장소 URL 입력받기
set /p REMOTE_URL="[입력] Gitea 저장소 주소 (HTTPS)를 붙여넣으세요: "
if "%REMOTE_URL%"=="" (
echo [오류] 주소가 입력되지 않았습니다. 창을 닫고 다시 실행해주세요.
pause
exit
)
echo.
echo --------------------------------------------------------
echo [Step 0] Git 사용자 정보 확인...
:: 사용자 이름이 설정되어 있는지 확인합니다.
git config user.name >nul 2>&1
if %ERRORLEVEL% NEQ 0 (
echo - 사용자 정보가 없습니다. 설정을 시작합니다.
echo.
set /p GIT_USER="[입력] 사용자 이름 (예: tae2564): "
set /p GIT_EMAIL="[입력] 이메일 주소 (예: tae2564@gmail.com): "
:: 입력받은 정보를 이 프로젝트에만 적용(local) 할지, PC 전체(global)에 할지 선택
:: 여기서는 편의상 Global로 설정합니다.
git config --global user.name "%GIT_USER%"
git config --global user.email "%GIT_EMAIL%"
echo - 사용자 정보 등록 완료!
) else (
echo - 기존 사용자 정보가 감지되었습니다. 건너뜁니다.
)
echo.
echo [Step 1] 저장소 초기화 중...
git init
echo.
echo [Step 2] .gitignore 파일 생성 중 (Python용)...
if not exist .gitignore (
(
echo __pycache__/
echo *.py[cod]
echo .venv/
echo venv/
echo .env
echo .vscode/
echo .idea/
echo *.log
) > .gitignore
echo - .gitignore 파일이 생성되었습니다.
) else (
echo - .gitignore 파일이 이미 존재하여 건너뜁니다.
)
echo.
echo [Step 3] 파일 담기 및 첫 커밋...
git add .
git commit -m "최초 프로젝트 업로드 (Script Auto Commit)"
echo.
echo [Step 4] 브랜치 이름 변경 (master - main)...
git branch -M main
echo.
echo [Step 5] 원격 저장소 연결...
git remote remove origin 2>nul
git remote add origin %REMOTE_URL%
echo.
echo [Step 6] 서버로 업로드 (Push)...
echo - 로그인 창이 뜨면 아이디와 비밀번호를 입력하세요.
git push -u origin main
echo.
echo ========================================================
if %ERRORLEVEL% == 0 (
echo [성공] 모든 설정이 완료되었습니다!
echo 이제부터는 git_upload.bat 파일을 사용해 수정사항을 올리세요.
) else (
echo [실패] 오류가 발생했습니다. 위 메시지를 확인해주세요.
)
echo ========================================================
pause

1
holdings.json Normal file
View File

@@ -0,0 +1 @@
{}

268
main.py Normal file
View File

@@ -0,0 +1,268 @@
import os
import time
import threading
import argparse
from typing import List, Dict, Tuple
from dotenv import load_dotenv
import logging
# Modular imports
from src.common import logger, setup_logger
from src.config import load_config, read_symbols, get_symbols_file, build_runtime_config, RuntimeConfig
from src.threading_utils import run_sequential, run_with_threads
from src.notifications import send_telegram, report_error, send_startup_test_message
from src.holdings import load_holdings
from src.signals import check_sell_conditions, CheckType
from src.kiwoom_api import get_kiwoom_api
load_dotenv()
def minutes_to_timeframe(minutes: int) -> str:
"""분 단위를 캔들봉 timeframe 문자열로 변환 (예: 60 -> '1h', 240 -> '4h')"""
if minutes < 60:
return f"{minutes}m"
elif minutes % 1440 == 0:
return f"{minutes // 1440}d"
elif minutes % 60 == 0:
return f"{minutes // 60}h"
else:
return f"{minutes}m"
def process_symbols_and_holdings(
cfg: RuntimeConfig,
symbols: List[str],
config: Dict[str, any],
last_buy_check_time: float,
last_sell_check_time: float,
last_profit_taking_check_time: float
) -> Tuple[float, float, float]:
"""매수/매도 조건을 확인하고 실행.
Args:
cfg: 런타임 설정
symbols: 심볼 리스트
config: 설정 딕셔너리
last_buy_check_time: 마지막 매수 체크 시간
last_sell_check_time: 마지막 손절 체크 시간
last_profit_taking_check_time: 마지막 익절 체크 시간
Returns:
(업데이트된 매수 체크 시간, 손절 체크 시간, 익절 체크 시간)
"""
holdings = load_holdings('holdings.json')
held_symbols = set(holdings.keys()) if holdings else set()
buy_candidate_symbols = [s for s in symbols if s not in held_symbols]
current_time = time.time()
buy_signal_count = 0
sell_signal_count = 0
buy_interval_minutes = config.get("buy_check_interval_minutes", 240)
buy_interval = buy_interval_minutes * 60
buy_timeframe = minutes_to_timeframe(buy_interval_minutes)
if current_time - last_buy_check_time >= buy_interval:
logger.info("매수 조건 확인 시작 (주기: %d분, 데이터: %s)", buy_interval_minutes, buy_timeframe)
from src.holdings import get_kiwoom_balances
krw_balance = None
try:
balances = get_kiwoom_balances(cfg.kiwoom_account_number)
krw_balance = balances.get('KRW', 0)
except Exception as e:
logger.warning("Kiwoom 잔고 조회 실패: %s", e)
buy_amount = config.get('auto_trade', {}).get('buy_amount', 1000000)
if krw_balance is not None and krw_balance < buy_amount:
msg = f"[매수 건너뜀] Kiwoom 계좌 잔고 부족: 현재 KRW={krw_balance:.0f}, 필요={buy_amount:.0f}"
logger.warning(msg)
if cfg.telegram_bot_token and cfg.telegram_chat_id:
send_telegram(cfg.telegram_bot_token, cfg.telegram_chat_id, msg, parse_mode=cfg.telegram_parse_mode or "HTML")
else:
from src.config import RuntimeConfig
cfg_with_buy_timeframe = RuntimeConfig(
timeframe=buy_timeframe,
indicator_timeframe=buy_timeframe,
candle_count=cfg.candle_count,
symbol_delay=cfg.symbol_delay,
interval=cfg.interval,
loop=cfg.loop,
dry_run=cfg.dry_run,
max_threads=cfg.max_threads,
telegram_parse_mode=cfg.telegram_parse_mode,
trading_mode=cfg.trading_mode,
telegram_bot_token=cfg.telegram_bot_token,
telegram_chat_id=cfg.telegram_chat_id,
kiwoom_account_number=cfg.kiwoom_account_number,
aggregate_alerts=cfg.aggregate_alerts,
benchmark=cfg.benchmark,
telegram_test=getattr(cfg, 'telegram_test', False),
config=cfg.config
)
if cfg.max_threads > 1:
buy_signal_count = run_with_threads(buy_candidate_symbols, parse_mode=cfg.telegram_parse_mode, aggregate_enabled=cfg.aggregate_alerts, cfg=cfg_with_buy_timeframe)
else:
buy_signal_count = run_sequential(buy_candidate_symbols, parse_mode=cfg.telegram_parse_mode, aggregate_enabled=cfg.aggregate_alerts, cfg=cfg_with_buy_timeframe)
last_buy_check_time = current_time
else:
logger.debug("매수 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (buy_interval - (current_time - last_buy_check_time)) / 60)
# 손절/익절 조건 체크 설정
stop_loss_interval_minutes = config.get("stop_loss_check_interval_minutes", 60)
stop_loss_interval = stop_loss_interval_minutes * 60
profit_taking_interval_minutes = config.get("profit_taking_check_interval_minutes", 240)
profit_taking_interval = profit_taking_interval_minutes * 60
sell_signal_count = 0
# 손절/익절 체크 여부 확인
should_check_stop_loss = (current_time - last_sell_check_time >= stop_loss_interval)
should_check_profit_taking = (current_time - last_profit_taking_check_time >= profit_taking_interval)
# Holdings 업데이트는 한 번만 수행 (손절/익절 체크 전)
if should_check_stop_loss or should_check_profit_taking:
account_number = cfg.kiwoom_account_number
# Holdings 한 번만 업데이트
if account_number:
try:
from src.holdings import save_holdings, fetch_holdings_from_kiwoom
updated_holdings = fetch_holdings_from_kiwoom(account_number)
if updated_holdings is not None:
holdings = updated_holdings
save_holdings(holdings, 'holdings.json')
except Exception as e:
logger.exception("Kiwoom holdings 조회 중 오류: %s", e)
report_error(cfg.telegram_bot_token, cfg.telegram_chat_id, f"[오류] Holdings 조회 실패: {e}", cfg.dry_run)
# 손절 체크 (1시간 주기): 조건1, 조건3, 조건4-2, 조건5-2
if should_check_stop_loss:
logger.info("손절 조건 확인 시작 (주기: %d분)", stop_loss_interval_minutes)
try:
if holdings:
sell_results, count = check_sell_conditions(holdings, cfg=cfg, config=config, check_type=CheckType.STOP_LOSS)
sell_signal_count += count
logger.info("보유 종목 손절 조건 확인 완료: %d개 검사, %d개 매도 신호", len(sell_results), count)
else:
logger.debug("보유 종목 없음")
except Exception as e:
logger.exception("보유 종목 손절 조건 확인 중 오류: %s", e)
report_error(cfg.telegram_bot_token, cfg.telegram_chat_id, f"[오류] 손절 조건 확인 실패: {e}", cfg.dry_run)
last_sell_check_time = current_time
# 익절 체크 (4시간 주기): 조건2, 조건4-1, 조건5-1
if should_check_profit_taking:
logger.info("익절 조건 확인 시작 (주기: %d분)", profit_taking_interval_minutes)
try:
if holdings:
sell_results, count = check_sell_conditions(holdings, cfg=cfg, config=config, check_type=CheckType.PROFIT_TAKING)
sell_signal_count += count
logger.info("보유 종목 익절 조건 확인 완료: %d개 검사, %d개 매도 신호", len(sell_results), count)
else:
logger.debug("보유 종목 없음")
except Exception as e:
logger.exception("보유 종목 익절 조건 확인 중 오류: %s", e)
report_error(cfg.telegram_bot_token, cfg.telegram_chat_id, f"[오류] 익절 조건 확인 실패: {e}", cfg.dry_run)
last_profit_taking_check_time = current_time
else:
if not should_check_stop_loss:
logger.debug("손절 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (stop_loss_interval - (current_time - last_sell_check_time)) / 60)
if not should_check_profit_taking:
logger.debug("익절 조건 확인 대기 중 (다음 확인까지 %.1f분 남음)", (profit_taking_interval - (current_time - last_profit_taking_check_time)) / 60)
logger.info("[요약] 매수 조건 충족 심볼: %d개, 매도 조건 충족 심볼: %d", buy_signal_count, sell_signal_count)
return last_buy_check_time, last_sell_check_time, last_profit_taking_check_time
def execute_benchmark(cfg, symbols):
"""Execute benchmark to compare single-thread and multi-thread performance."""
logger.info("간단 벤치마크 시작: 심볼=%d", len(symbols))
start = time.time()
run_sequential(symbols, parse_mode=cfg.telegram_parse_mode, aggregate_enabled=False, cfg=cfg)
elapsed_single = time.time() - start
logger.info("순차 처리 소요 시간: %.2f", elapsed_single)
start = time.time()
run_with_threads(symbols, parse_mode=cfg.telegram_parse_mode, aggregate_enabled=False, cfg=cfg)
elapsed_parallel = time.time() - start
logger.info("병렬 처리(%d 스레드) 소요 시간: %.2f", cfg.max_threads, elapsed_parallel)
if elapsed_parallel < elapsed_single:
reduction = (elapsed_single - elapsed_parallel) / elapsed_single * 100
logger.info("병렬로 %.1f%% 빨라졌습니다 (권장 스레드=%d).", reduction, cfg.max_threads)
else:
logger.info("병렬이 순차보다 빠르지 않습니다. 네트워크/IO 패턴을 점검하세요.")
def main():
parser = argparse.ArgumentParser(description="주식 자동매매 프로그램")
parser.add_argument("--benchmark", action="store_true", help="벤치마크 실행")
args = parser.parse_args()
config = load_config()
if not config:
logger.error("설정 로드 실패; 종료합니다")
return
cfg = build_runtime_config(config)
setup_logger(cfg.dry_run)
logger.info("=" * 80)
logger.info("주식 자동매매 프로그램 시작")
logger.info("=" * 80)
# Kiwoom API 로그인
get_kiwoom_api()
symbols = read_symbols(get_symbols_file(config))
if not symbols:
logger.error("심볼 로드 실패; 종료합니다")
return
if cfg.telegram_test:
send_startup_test_message(cfg.telegram_bot_token, cfg.telegram_chat_id, cfg.telegram_parse_mode, cfg.dry_run)
if not cfg.dry_run and (not cfg.telegram_bot_token or not cfg.telegram_chat_id):
logger.error("dry-run이 아닐 때 텔레그램 환경변수 필수: TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID")
return
buy_check_minutes = config.get("buy_check_interval_minutes", 240)
stop_loss_check_minutes = config.get("stop_loss_check_interval_minutes", 60)
profit_taking_check_minutes = config.get("profit_taking_check_interval_minutes", 240)
logger.info("설정: symbols=%d, symbol_delay=%.2f초, candle_count=%d, loop=%s, dry_run=%s, max_threads=%d, trading_mode=%s",
len(symbols), cfg.symbol_delay, cfg.candle_count, cfg.loop, cfg.dry_run, cfg.max_threads, cfg.trading_mode)
logger.info("매수 확인 주기: %d분 (%s봉), 손절 확인 주기: %d분 (%s봉), 익절 확인 주기: %d분 (%s봉)",
buy_check_minutes, minutes_to_timeframe(buy_check_minutes),
stop_loss_check_minutes, minutes_to_timeframe(stop_loss_check_minutes),
profit_taking_check_minutes, minutes_to_timeframe(profit_taking_check_minutes))
if args.benchmark:
execute_benchmark(cfg, symbols)
return
last_buy_check_time = 0
last_sell_check_time = 0
last_profit_taking_check_time = 0
if not cfg.loop:
process_symbols_and_holdings(cfg, symbols, config, last_buy_check_time, last_sell_check_time, last_profit_taking_check_time)
else:
buy_check_minutes = config.get("buy_check_interval_minutes", 240)
stop_loss_check_minutes = config.get("stop_loss_check_interval_minutes", 60)
profit_taking_check_minutes = config.get("profit_taking_check_interval_minutes", 240)
loop_interval_minutes = min(buy_check_minutes, stop_loss_check_minutes, profit_taking_check_minutes)
interval_seconds = max(10, loop_interval_minutes * 60)
logger.info("루프 모드 시작: %d분 간격 (매수확인: %d분마다, 손절확인: %d분마다, 익절확인: %d분마다)",
loop_interval_minutes, buy_check_minutes, stop_loss_check_minutes, profit_taking_check_minutes)
try:
while True:
try:
last_buy_check_time, last_sell_check_time, last_profit_taking_check_time = process_symbols_and_holdings(
cfg, symbols, config, last_buy_check_time, last_sell_check_time, last_profit_taking_check_time)
except Exception as e:
logger.exception("루프 내 작업 중 오류: %s", e)
report_error(cfg.telegram_bot_token, cfg.telegram_chat_id, f"[오류] 루프 내 작업 실패: {e}", cfg.dry_run)
logger.info("다음 실행까지 %d초 대기", interval_seconds)
time.sleep(interval_seconds)
except KeyboardInterrupt:
logger.info("사용자가 루프를 중단함")
if __name__ == "__main__":
main()

2
pytest.ini Normal file
View File

@@ -0,0 +1,2 @@
[pytest]
testpaths = src/tests

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
pyupbit
pandas
pandas_ta
requests
python-dotenv
pytest
pykiwoom

4
src/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .common import logger
from .indicators import fetch_ohlcv, compute_macd_hist, ta
from .signals import evaluate_sell_conditions
from .notifications import send_telegram

26
src/common.py Normal file
View File

@@ -0,0 +1,26 @@
import os
import logging
from pathlib import Path
import logging.handlers
LOG_DIR = os.getenv("LOG_DIR", "logs")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
Path(LOG_DIR).mkdir(parents=True, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "AutoStockTrader.log")
logger = logging.getLogger("macd_alarm")
def setup_logger(dry_run: bool):
global logger
logger.handlers.clear()
logger.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(threadName)s] - %(message)s")
if dry_run:
ch = logging.StreamHandler()
ch.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
ch.setFormatter(formatter)
logger.addHandler(ch)
fh = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=10 * 1024 * 1024, backupCount=7, encoding="utf-8")
fh.setLevel(getattr(logging, LOG_LEVEL, logging.INFO))
fh.setFormatter(formatter)
logger.addHandler(fh)

94
src/config.py Normal file
View File

@@ -0,0 +1,94 @@
import os, json
from dataclasses import dataclass
from typing import Optional, Dict, Any
from .common import logger
def load_config() -> dict:
paths = [os.path.join('config','config.json'), 'config.json']
example_paths = [os.path.join('config','config.example.json'), 'config.example.json']
for p in paths:
if os.path.exists(p):
with open(p,'r',encoding='utf-8') as f:
cfg = json.load(f)
logger.info('설정 파일 로드: %s', p)
return cfg
for p in example_paths:
if os.path.exists(p):
with open(p,'r',encoding='utf-8') as f:
cfg = json.load(f)
logger.warning('기본 설정 없음; 예제 사용: %s', p)
return cfg
logger.error('설정 파일 없음: config/config.json 확인')
return None
def read_symbols(path: str) -> list:
syms = []
try:
with open(path,'r',encoding='utf-8') as f:
for line in f:
s=line.strip()
if not s or s.startswith('#'): continue
syms.append(s)
logger.info('심볼 %d개 로드: %s', len(syms), path)
except Exception as e:
logger.exception('심볼 로드 실패: %s', e)
return syms
@dataclass(frozen=True)
class RuntimeConfig:
timeframe: str
indicator_timeframe: str
candle_count: int
symbol_delay: float
interval: int
loop: bool
dry_run: bool
max_threads: int
telegram_parse_mode: Optional[str]
trading_mode: str
telegram_bot_token: Optional[str]
telegram_chat_id: Optional[str]
kiwoom_account_number: Optional[str]
aggregate_alerts: bool = False
benchmark: bool = False
telegram_test: bool = False
config: Optional[Dict[str, Any]] = None # 원본 config 포함
def build_runtime_config(cfg_dict: dict) -> RuntimeConfig:
# timeframe은 동적으로 결정되므로 기본값만 설정 (실제로는 매수/매도 주기에 따라 변경됨)
timeframe = "1h" # 기본값
aggregate_alerts = bool(cfg_dict.get("aggregate_alerts", False)) or bool(os.getenv("AGGREGATE_ALERTS", "False").lower() in ("1", "true", "yes"))
benchmark = bool(cfg_dict.get("benchmark", False))
telegram_test = os.getenv("TELEGRAM_TEST", "0") == "1"
return RuntimeConfig(
timeframe=timeframe,
indicator_timeframe=timeframe,
candle_count=int(cfg_dict.get("candle_count", 200)),
symbol_delay=float(cfg_dict.get("symbol_delay", 1.0)),
interval=int(cfg_dict.get("interval", 60)),
loop=bool(cfg_dict.get("loop", False)),
dry_run=bool(cfg_dict.get("dry_run", False)),
max_threads=int(cfg_dict.get("max_threads", 3)),
telegram_parse_mode=cfg_dict.get("telegram_parse_mode"),
trading_mode=cfg_dict.get("trading_mode", "signal_only"),
telegram_bot_token=os.getenv("TELEGRAM_BOT_TOKEN"),
telegram_chat_id=os.getenv("TELEGRAM_CHAT_ID"),
kiwoom_account_number=cfg_dict.get("kiwoom", {}).get("account_number"),
aggregate_alerts=aggregate_alerts,
benchmark=benchmark,
telegram_test=telegram_test,
config=cfg_dict,
)
def get_symbols_file(config: dict) -> str:
"""Determine the symbols file path with fallback logic."""
default_path = "config/symbols.txt" if os.path.exists("config/symbols.txt") else "symbols.txt"
return config.get("symbols_file", default_path)
# Define valid trading modes as constants
TRADING_MODES = ("signal_only", "auto_trade", "mixed")

245
src/holdings.py Normal file
View File

@@ -0,0 +1,245 @@
import os, json
import threading
from .common import logger
from .kiwoom_api import get_kiwoom_api
# 스레드 동기화를 위한 Lock 객체
_holdings_lock = threading.Lock()
def _unsafe_load_holdings(holdings_file: str) -> dict:
"""Lock 없이 홀딩 파일을 로드 (내부용)"""
try:
if os.path.exists(holdings_file):
if os.path.getsize(holdings_file) == 0:
logger.debug("보유 파일이 비어있습니다: %s", holdings_file)
return {}
with open(holdings_file, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.warning('보유 파일 로드 실패: %s', e)
return {}
def _unsafe_save_holdings(holdings: dict, holdings_file: str):
"""Lock 없이 홀딩 파일을 저장 (내부용)"""
try:
os.makedirs(os.path.dirname(holdings_file) or '.', exist_ok=True)
with open(holdings_file, 'w', encoding='utf-8') as f:
json.dump(holdings, f, ensure_ascii=False, indent=2)
logger.debug('보유 저장: %s', holdings_file)
except Exception as e:
logger.error('보유 저장 실패: %s', e)
def load_holdings(holdings_file: str = 'holdings.json') -> dict:
"""스레드에 안전하게 홀딩 파일을 로드"""
with _holdings_lock:
return _unsafe_load_holdings(holdings_file)
def save_holdings(holdings: dict, holdings_file: str = 'holdings.json'):
"""스레드에 안전하게 홀딩 파일을 저장"""
with _holdings_lock:
_unsafe_save_holdings(holdings, holdings_file)
def get_kiwoom_balances(account_number: str) -> dict:
try:
api = get_kiwoom_api()
deposit_info, _ = api.get_account_info(account_number)
balance = 0
possible_keys = ['d+2추정예수금', 'D+2추정예수금', '예수금', '주문가능금액', '출금가능금액']
if isinstance(deposit_info, dict):
for key in possible_keys:
if key in deposit_info:
try:
balance = int(deposit_info[key])
logger.debug(f'Kiwoom 잔고 조회 성공 (키: {key}): {balance}')
break
except (ValueError, TypeError):
continue
if balance == 0:
logger.warning(f'예수금 키를 찾을 수 없음. 사용 가능한 키: {list(deposit_info.keys())}')
else:
logger.error(f'deposit_info 형식 오류: {type(deposit_info)}')
return {'KRW': balance}
except Exception as e:
logger.error('Kiwoom balances 실패: %s', e)
return {}
def get_current_price(symbol: str) -> float:
try:
api = get_kiwoom_api()
data = api.get_ohlcv(symbol, "D")
if data is None:
logger.debug('현재가 조회 실패 (None): %s', symbol)
return 0.0
if hasattr(data, 'iloc'):
close_col = next((c for c in ['close', '현재가', '종가'] if c in data.columns), None)
if close_col:
price = float(data[close_col].iloc[0])
else:
logger.warning('close 컬럼 없음: %s, columns: %s', symbol, data.columns.tolist())
return 0.0
elif isinstance(data, dict):
close_col = next((c for c in ['close', '현재가', '종가'] if c in data), None)
if close_col:
price_data = data[close_col]
price = float(price_data[0] if isinstance(price_data, (list, tuple)) else price_data)
else:
logger.warning('close 키 없음: %s, keys: %s', symbol, data.keys())
return 0.0
else:
logger.warning('알 수 없는 데이터 형식: %s, type: %s', symbol, type(data))
return 0.0
logger.debug('현재가 %s -> %.2f', symbol, price)
return price if price > 0 else 0.0
except Exception as e:
logger.debug('현재가 실패 %s: %s', symbol, e)
return 0.0
def add_new_holding(symbol: str, buy_price: float, amount: float, buy_timestamp: float = None, holdings_file: str = "holdings.json") -> bool:
with _holdings_lock:
try:
import time
holdings = _unsafe_load_holdings(holdings_file)
timestamp = buy_timestamp if buy_timestamp is not None else time.time()
if symbol in holdings:
prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0)
prev_price = float(holdings[symbol].get("buy_price", 0.0) or 0.0)
total_amount = prev_amount + amount
if total_amount > 0:
new_avg_price = ((prev_price * prev_amount) + (buy_price * amount)) / total_amount
holdings[symbol]["buy_price"] = new_avg_price
holdings[symbol]["amount"] = total_amount
prev_max = float(holdings[symbol].get("max_price", 0.0) or 0.0)
holdings[symbol]["max_price"] = max(new_avg_price, prev_max)
logger.info("[%s] holdings 추가 매수: 평균가 %.2f -> %.2f, 수량 %.8f -> %.8f",
symbol, prev_price, new_avg_price, prev_amount, total_amount)
else:
holdings[symbol] = {
"buy_price": buy_price,
"amount": amount,
"max_price": buy_price,
"buy_timestamp": timestamp,
"partial_sell_done": False
}
logger.info("[%s] holdings 신규 추가: 매수가=%.2f, 수량=%.8f", symbol, buy_price, amount)
_unsafe_save_holdings(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[%s] holdings 추가 실패: %s", symbol, e)
return False
def update_holding_amount(symbol: str, amount_change: float, holdings_file: str = "holdings.json") -> bool:
with _holdings_lock:
try:
holdings = _unsafe_load_holdings(holdings_file)
if symbol not in holdings:
logger.warning("[%s] holdings에 존재하지 않아 수량 업데이트 건너뜀", symbol)
return False
prev_amount = float(holdings[symbol].get("amount", 0.0) or 0.0)
new_amount = max(0.0, prev_amount + amount_change)
if new_amount <= 1e-8:
holdings.pop(symbol, None)
logger.info("[%s] holdings 업데이트: 전량 매도 완료, 보유 제거 (이전: %.8f, 변경: %.8f)",
symbol, prev_amount, amount_change)
else:
holdings[symbol]["amount"] = new_amount
logger.info("[%s] holdings 업데이트: 수량 변경 %.8f -> %.8f (변경량: %.8f)",
symbol, prev_amount, new_amount, amount_change)
_unsafe_save_holdings(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[%s] holdings 수량 업데이트 실패: %s", symbol, e)
return False
def set_holding_field(symbol: str, key: str, value, holdings_file: str = "holdings.json") -> bool:
with _holdings_lock:
try:
holdings = _unsafe_load_holdings(holdings_file)
if symbol not in holdings:
logger.warning("[%s] holdings에 존재하지 않아 필드 설정 건너뜀", symbol)
return False
holdings[symbol][key] = value
logger.info("[%s] holdings 업데이트: 필드 '%s''%s'(으)로 설정", symbol, key, value)
_unsafe_save_holdings(holdings, holdings_file)
return True
except Exception as e:
logger.exception("[%s] holdings 필드 설정 실패: %s", symbol, e)
return False
def fetch_holdings_from_kiwoom(account_number: str) -> dict:
try:
api = get_kiwoom_api()
_, balance_info = api.get_account_info(account_number)
if balance_info is None:
logger.warning('balance_info가 None')
return {}
holdings = {}
# fetch_holdings_from_kiwoom는 외부 상태를 직접 바꾸지 않고 정보를 가져오므로,
# 기존 holdings.json을 스레드 안전하게 로드합니다.
existing_holdings = load_holdings('holdings.json')
items = []
if hasattr(balance_info, 'iterrows'):
items = balance_info.to_dict('records')
elif isinstance(balance_info, list):
items = balance_info
else:
logger.error(f'balance_info 형식 오류: {type(balance_info)}')
return {}
for item in items:
try:
symbol = item.get('종목번호', item.get('종목코드', ''))
amount = item.get('보유수량', item.get('잔고수량', 0))
buy_price = item.get('매입가', item.get('평균단가', 0))
current_price = item.get('현재가', 0)
if isinstance(amount, str): amount = int(amount.replace('+', '').replace('-', '').strip())
else: amount = int(amount)
if isinstance(buy_price, str): buy_price = int(buy_price.replace('+', '').replace('-', '').strip())
else: buy_price = int(buy_price)
if isinstance(current_price, str): current_price = int(current_price.replace('+', '').replace('-', '').strip())
else: current_price = int(current_price)
if not symbol or amount <= 0:
continue
prev_max_price = existing_holdings.get(symbol, {}).get('max_price', 0)
max_price = max(current_price, prev_max_price)
holdings[symbol] = {
'buy_price': buy_price,
'amount': amount,
'max_price': max_price,
'buy_timestamp': existing_holdings.get(symbol, {}).get('buy_timestamp'),
'partial_sell_done': existing_holdings.get(symbol, {}).get('partial_sell_done', False),
}
except Exception as e:
logger.warning(f'항목 처리 실패: {e}, item: {item}')
continue
logger.debug('Kiwoom holdings %d', len(holdings))
return holdings
except Exception as e:
logger.error('fetch_holdings 실패: %s', e)
return {}

192
src/indicators.py Normal file
View File

@@ -0,0 +1,192 @@
import os
import time
import random
import pandas as pd
import pandas_ta as ta
from .common import logger
from .kiwoom_api import get_kiwoom_api
__all__ = ["fetch_ohlcv", "compute_macd_hist", "compute_sma", "ta"]
def fetch_ohlcv(symbol: str, timeframe: str, candle_count: int = 200, log_buffer: list = None) -> pd.DataFrame:
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
_buf("debug", f"OHLCV 수집 시작: {symbol} {timeframe}")
# Kiwoom API 타임프레임 매핑
# 분봉: "1", "3", "5", "15", "30", "60" (분 단위)
# 일/주/월봉: "D", "W", "M"
# 주의: 4시간봉은 Kiwoom API가 직접 지원하지 않으므로 60분봉 데이터를 리샘플링
tf_map = {
"1m": "1", "3m": "3", "5m": "5", "15m": "15", "30m": "30",
"1h": "60",
"4h": "60", # 60분봉 데이터를 가져와서 4시간으로 리샘플링
"D": "D", "W": "W", "M": "M"
}
# 리샘플링이 필요한 타임프레임 체크
needs_resampling = timeframe == "4h"
original_timeframe = timeframe
original_candle_count = candle_count
# 4시간봉은 60분봉 4개를 합쳐야 하므로 candle_count를 4배로 조정
if needs_resampling:
candle_count = candle_count * 4
_buf("debug", f"4시간봉 요청으로 candle_count 조정: {original_candle_count} -> {candle_count}")
tf = tf_map.get(timeframe, "D") # 기본값 'D' (일봉)
if timeframe not in tf_map:
_buf("warning", f"지원하지 않는 타임프레임 '{timeframe}', 일봉(D)으로 대체")
needs_resampling = False
candle_count = original_candle_count
max_attempts = int(os.getenv("MAX_FETCH_ATTEMPTS", "5"))
base_backoff = float(os.getenv("BASE_BACKOFF", "1.0"))
jitter_factor = float(os.getenv("BACKOFF_JITTER", "0.5"))
max_total_backoff = float(os.getenv("MAX_TOTAL_BACKOFF", "300"))
cumulative_sleep = 0.0
for attempt in range(1, max_attempts + 1):
try:
api = get_kiwoom_api()
df = api.get_ohlcv(symbol, tf)
if df is None or (hasattr(df, 'empty') and df.empty):
_buf("warning", f"OHLCV 빈 결과: {symbol}")
raise RuntimeError("empty ohlcv")
# 컬럼명 확인 및 변경
# opt10081 (일/주/월봉): '일자', '시가', '고가', '저가', '현재가', '거래량' 등
# opt10080 (분봉): '체결시간', '시가', '고가', '저가', '현재가', '거래량' 등
column_mapping = {
'일자': 'timestamp', # 일/주/월봉
'체결시간': 'timestamp', # 분봉
'시간': 'timestamp', # 대체 가능한 컬럼명
'시가': 'open',
'고가': 'high',
'저가': 'low',
'현재가': 'close',
'종가': 'close', # 일부 응답에서 '종가'를 사용할 수 있음
'거래량': 'volume'
}
# 존재하는 컬럼만 rename
rename_dict = {k: v for k, v in column_mapping.items() if k in df.columns}
df = df.rename(columns=rename_dict)
# 필수 컬럼 확인
required_cols = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
_buf("error", f"필수 컬럼 누락: {missing_cols}, 존재하는 컬럼: {df.columns.tolist()}")
raise RuntimeError(f"missing required columns: {missing_cols}")
# timestamp 처리
# 분봉: 'YYYYMMDDHHMMSS' (14자리), 일/주/월봉: 'YYYYMMDD' (8자리)
if df['timestamp'].dtype == 'object':
# 첫 번째 값으로 형식 판별
first_value = str(df['timestamp'].iloc[0]).strip()
if len(first_value) >= 14: # 분봉 데이터 (YYYYMMDDHHMMSS)
df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y%m%d%H%M%S', errors='coerce')
_buf("debug", f"분봉 timestamp 파싱: {first_value[:14]}")
else: # 일/주/월봉 데이터 (YYYYMMDD)
df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y%m%d', errors='coerce')
_buf("debug", f"일/주/월봉 timestamp 파싱: {first_value}")
df.set_index('timestamp', inplace=True)
# timestamp 파싱 실패 확인
if df.index.isnull().any():
null_count = df.index.isnull().sum()
_buf("warning", f"timestamp 파싱 실패한 행 {null_count}개 발견, 제거합니다")
df = df[df.index.notnull()]
# 데이터 타입을 float으로 변환
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce').astype(float)
# NaN 값이 있는 행 제거
df = df.dropna()
if df.empty:
_buf("warning", f"데이터 변환 후 빈 결과: {symbol}")
raise RuntimeError("empty after conversion")
# 4시간봉 리샘플링 처리
if needs_resampling and original_timeframe == "4h":
try:
_buf("debug", f"4시간봉 리샘플링 시작: {symbol}, 원본 rows={len(df)}")
# 4시간 간격으로 리샘플링 (OHLCV 재집계)
df_4h = df.resample('4H').agg({
'open': 'first', # 시가: 첫 번째 값
'high': 'max', # 고가: 최대값
'low': 'min', # 저가: 최소값
'close': 'last', # 종가: 마지막 값
'volume': 'sum' # 거래량: 합계
}).dropna()
if df_4h.empty:
_buf("warning", f"4시간봉 리샘플링 후 빈 데이터: {symbol}, 원본 데이터 사용")
else:
df = df_4h
_buf("debug", f"4시간봉 리샘플링 완료: {symbol}, rows={len(df)}")
except Exception as e:
_buf("warning", f"4시간봉 리샘플링 실패: {symbol}, {e} - 60분봉 데이터 사용")
# 리샘플링된 경우 원래 요청한 candle_count로 제한
final_candle_count = original_candle_count if needs_resampling else candle_count
_buf("debug", f"OHLCV 수집 완료: {symbol}, timeframe={original_timeframe}, rows={len(df)}, returning={min(final_candle_count, len(df))}")
return df.head(final_candle_count)
except Exception as e:
_buf("warning", f"OHLCV 수집 실패 (시도 {attempt}/{max_attempts}): {symbol} -> {e}")
if attempt == max_attempts:
_buf("error", f"OHLCV: 최대 재시도 도달 ({symbol})")
return pd.DataFrame(columns=["open","high","low","close","volume"]).set_index(pd.Index([], name="timestamp"))
sleep_time = base_backoff * (2 ** (attempt - 1))
sleep_time = sleep_time + random.uniform(0, jitter_factor * sleep_time)
if cumulative_sleep + sleep_time > max_total_backoff:
logger.warning("누적 재시도 대기시간 초과 (%s)", symbol)
return pd.DataFrame(columns=["open","high","low","close","volume"]).set_index(pd.Index([], name="timestamp"))
cumulative_sleep += sleep_time
_buf("debug", f"{sleep_time:.2f}초 후 재시도")
time.sleep(sleep_time)
return pd.DataFrame(columns=["open","high","low","close","volume"]).set_index(pd.Index([], name="timestamp"))
def compute_macd_hist(close_series: pd.Series, log_buffer: list = None) -> pd.Series:
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
try:
macd_df = ta.macd(close_series, fast=12, slow=26, signal=9)
hist_cols = [c for c in macd_df.columns if "MACDh" in c]
if not hist_cols:
_buf("error", "MACD histogram column not found")
return pd.Series([])
return macd_df[hist_cols[0]]
except Exception as e:
_buf("error", f"MACD 계산 실패: {e}")
return pd.Series([])
def compute_sma(close_series: pd.Series, window: int, log_buffer: list = None) -> pd.Series:
"""단순 이동평균선(SMA) 계산"""
def _buf(level: str, msg: str):
if log_buffer is not None:
log_buffer.append(f"{level}: {msg}")
else:
getattr(logger, level.lower())(msg)
try:
return close_series.rolling(window=window).mean()
except Exception as e:
_buf("error", f"SMA{window} 계산 실패: {e}")
return pd.Series([])

223
src/kiwoom_api.py Normal file
View File

@@ -0,0 +1,223 @@
import os
import sys
import threading
from PyQt5.QtWidgets import QApplication
from pykiwoom.kiwoom import *
import logging
logger = logging.getLogger(__name__)
class KiwoomAPI:
def __init__(self):
self.kiwoom = Kiwoom()
self._login_lock = threading.Lock()
self._last_connection_check = 0
self._connection_check_interval = 60 # 연결 상태 확인 간격 (60초)
self.login()
def login(self):
"""Kiwoom API 로그인"""
with self._login_lock:
try:
self.kiwoom.CommConnect(block=True)
logger.info("Kiwoom API에 로그인 성공")
return True
except Exception as e:
logger.error(f"Kiwoom API 로그인 실패: {e}")
return False
def is_connected(self) -> bool:
"""
현재 연결 상태 확인
반환값: 0=연결안됨, 1=연결완료
"""
try:
state = self.kiwoom.GetConnectState()
return state == 1
except Exception as e:
logger.warning(f"연결 상탄 확인 실패: {e}")
return False
def ensure_connected(self) -> bool:
"""
연결 상탈 확인 및 필요시 재연결
주기적으로 호출하도록 간격 제한 포함
"""
import time
current_time = time.time()
# 마지막 확인 후 일정 시간이 지나지 않았으면 검사 생략
if current_time - self._last_connection_check < self._connection_check_interval:
return True
self._last_connection_check = current_time
if self.is_connected():
logger.debug("연결 상탄 정상")
return True
logger.warning("Kiwoom API 연결이 끊어졌습니다. 재연결을 시도합니다...")
# 최대 3회 재시도
for attempt in range(1, 4):
logger.info(f"재연결 시도 {attempt}/3")
if self.login():
if self.is_connected():
logger.info("재연결 성공")
return True
if attempt < 3:
wait_time = attempt * 5 # 5, 10초 대기
logger.info(f"{wait_time}초 후 재시도...")
time.sleep(wait_time)
logger.error("재연결 실패: 3회 모두 실패")
# 텔레그램 알림 (환경변수에서 토큰 가져오기)
try:
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if bot_token and chat_id:
import requests
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
message = "🔴 [긴급] Kiwoom API 재연결 실패\n\n3회 재시도 모두 실패했습니다.\n프로그램 관리자의 확인이 필요합니다."
requests.post(url, json={"chat_id": chat_id, "text": message}, timeout=10)
except Exception as e:
logger.warning(f"재연결 실패 알림 전송 실패: {e}")
return False
def get_account_info(self, account_number):
"""
계좌 정보 조회 (예수금 및 잔고)
"""
# 연결 상탄 확인 및 재연결
if not self.ensure_connected():
raise ConnectionError("Kiwoom API 연결이 끊어졌고 재연결에 실패했습니다")
try:
# 예수금 정보
deposit_info = self.kiwoom.block_request("opw00001",
계좌번호=account_number,
비밀번호="",
비밀번호입력매체구분="00",
조회구분=2,
output="예수금상세현황",
next=0)
# 계좌평가잔고내역
balance_info = self.kiwoom.block_request("opw00018",
계좌번호=account_number,
비밀번호="",
비밀번호입력매체구분="00",
조회구분=1,
output="계좌평가잔고개별합산",
next=0)
return deposit_info, balance_info
except Exception as e:
logger.error(f"계좌 정보 조회 실패: {e}")
raise
def get_ohlcv(self, code, timeframe):
"""
OHLCV 데이터 조회
timeframe: '1', '3', '5', ... (분봉), 'D'(일봉), 'W'(주봉), 'M'(월봉)
"""
# 연결 상탄 확인 및 재연결
if not self.ensure_connected():
logger.error(f"API 연결 끊김: {code} OHLCV 조회 실패")
return None
try:
# timeframe이 숫자인지 문자인지 확인하여 API 분기
if timeframe.isdigit():
# 분봉/시간봉 데이터 요청 (opt10080)
df = self.kiwoom.block_request("opt10080",
종목코드=code,
틱범위=timeframe,
수정주가구분=1,
output="주식분봉차트조회",
next=0)
logger.debug(f"분봉 데이터 요청: {code}, {timeframe}")
else:
# 일/주/월봉 데이터 요청 (opt10081)
df = self.kiwoom.block_request("opt10081",
종목코드=code,
틱범위=timeframe,
수정주가구분=1,
output="주식일봉차트조회",
next=0)
logger.debug(f"일/주/월봉 데이터 요청: {code}, {timeframe}")
if df is None or (hasattr(df, 'empty') and df.empty):
logger.warning(f"OHLCV 조회 결과 없음: {code} ({timeframe})")
return None
return df
except Exception as e:
logger.error(f"OHLCV 조회 실패: {code}, {timeframe}, {e}")
return None
def send_order(self, order_type, code, quantity, price, account_number):
"""
주문 실행
order_type: 1=신규매수, 2=신규매도, 3=매수취소, 4=매도취소, 5=매수정정, 6=매도정정
code: 종목코드 (6자리)
quantity: 주문수량
price: 주문가격 (0이면 시장가)
"""
# 연결 상탄 확인 및 재연결 (주문은 매우 중요하므로 반드시 확인)
if not self.ensure_connected():
error_msg = f"주문 실패: API 연결이 끊어졌고 재연결에 실패했습니다"
logger.error(error_msg)
raise ConnectionError(error_msg)
try:
# 시장가 주문의 경우 가격을 0으로, 지정가 주문의 경우 해당 가격으로 설정
price_to_send = 0 if price == 0 else int(price)
# 거래구분: 지정가="00", 시장가="03"
order_price_type = "03" if price == 0 else "00"
result = self.kiwoom.SendOrder(
"자동매매", # 사용자구분명
"0101", # 화면번호
account_number, # 계좌번호
order_type, # 주문유형
code, # 종목코드
quantity, # 주문수량
price_to_send, # 주문가격
order_price_type, # 거래구분 (00: 지정가, 03: 시장가)
"" # 원주문번호
)
logger.info(f"주문 실행: type={order_type}, code={code}, qty={quantity}, price={price_to_send}, order_type={order_price_type}, result={result}")
return result
except Exception as e:
logger.error(f"주문 실행 실패: {e}")
raise
# Singleton instance
kiwoom_instance = None
def get_kiwoom_api():
global kiwoom_instance
if kiwoom_instance is None:
app = QApplication.instance()
if not app:
app = QApplication(sys.argv)
kiwoom_instance = KiwoomAPI()
return kiwoom_instance
if __name__ == '__main__':
# For testing purposes
app = QApplication(sys.argv)
api = get_kiwoom_api()
# Example: Get account info
account_num = "YOUR_ACCOUNT_NUMBER" # 실제 계좌번호로 변경 필요
deposit, balance = api.get_account_info(account_num)
print("예수금 정보:", deposit)
print("계좌잔고:", balance)
# Example: Get OHLCV data
ohlcv = api.get_ohlcv("005930", "D") # 삼성전자, 일봉
print("삼성전자 일봉 데이터:", ohlcv)

67
src/notifications.py Normal file
View File

@@ -0,0 +1,67 @@
import threading
import requests
import time
from .common import logger
__all__ = ["send_telegram", "report_error", "send_startup_test_message"]
def send_telegram(bot_token: str, chat_id: str, text: str, add_thread_prefix: bool = True, parse_mode: str = None) -> bool:
if add_thread_prefix:
thread_name = threading.current_thread().name
payload_text = f"[{thread_name}] {text}"
else:
payload_text = text
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {"chat_id": chat_id, "text": payload_text}
if parse_mode:
payload["parse_mode"] = parse_mode
max_attempts = 4
backoff = 1.0
cumulative_sleep = 0.0
for attempt in range(1, max_attempts + 1):
try:
resp = requests.post(url, json=payload, timeout=10)
if resp.status_code == 200:
return True
else:
logger.warning("텔레그램 전송 실패 (시도 %d/%d) status=%s", attempt, max_attempts, resp.status_code)
except Exception as e:
logger.warning("텔레그램 예외 (시도 %d/%d): %s", attempt, max_attempts, e)
if attempt == max_attempts:
logger.error("텔레그램 최대 재시도 도달")
return False
sleep_time = backoff * attempt
cumulative_sleep += sleep_time
if cumulative_sleep > 30:
logger.error("텔레그램 누적 대기시간 초과")
return False
time.sleep(sleep_time)
return False
def report_error(bot_token: str, chat_id: str, message: str, dry_run: bool):
"""
Report an error via Telegram.
"""
if not dry_run and bot_token and chat_id:
try:
send_telegram(bot_token, chat_id, message)
except Exception:
logger.exception("에러 알림 전송 실패")
def send_startup_test_message(bot_token: str, chat_id: str, parse_mode: str, dry_run: bool):
"""
Send a startup test message to verify Telegram settings.
"""
if dry_run:
logger.info("[dry-run] Telegram 테스트 메시지 전송 생략")
return
if bot_token and chat_id:
test_msg = "[테스트] Telegram 설정 확인용 메시지입니다. 봇/채팅 설정이 올바르면 이 메시지가 도착합니다."
logger.info("텔레그램 테스트 메시지 전송 시도")
if send_telegram(bot_token, chat_id, test_msg, add_thread_prefix=False, parse_mode=parse_mode):
logger.info("텔레그램 테스트 메시지 전송 성공")
else:
logger.warning("텔레그램 테스트 메시지 전송 실패")
else:
logger.warning("TELEGRAM_TEST=1 이지만 TELEGRAM_BOT_TOKEN/TELEGRAM_CHAT_ID가 설정되어 있지 않습니다")

317
src/order.py Normal file
View File

@@ -0,0 +1,317 @@
import os
import time
import json
import secrets
from .common import logger
from .notifications import send_telegram
from .kiwoom_api import get_kiwoom_api
def _make_confirm_token(length: int = 16) -> str:
return secrets.token_hex(length)
def _write_pending_order(token: str, order: dict, pending_file: str = "pending_orders.json"):
try:
pending = []
if os.path.exists(pending_file):
with open(pending_file, "r", encoding="utf-8") as f:
try:
pending = json.load(f)
except Exception:
pending = []
pending.append({"token": token, "order": order, "timestamp": time.time()})
with open(pending_file, "w", encoding="utf-8") as f:
json.dump(pending, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.exception("pending_orders 기록 실패: %s", e)
def _check_confirmation(token: str, timeout: int = 300) -> bool:
start = time.time()
confirm_file = f"confirm_{token}"
tokens_file = "confirmed_tokens.txt"
while time.time() - start < timeout:
if os.path.exists(confirm_file):
try:
os.remove(confirm_file)
except Exception:
pass
return True
if os.path.exists(tokens_file):
try:
with open(tokens_file, "r", encoding="utf-8") as f:
for line in f:
if line.strip() == token:
return True
except Exception:
pass
time.sleep(2)
return False
def notify_order_result(symbol: str, order_result: dict, config: dict, telegram_token: str, telegram_chat_id: str) -> bool:
if not (telegram_token and telegram_chat_id):
return False
notify_cfg = config.get("notify", {}) if config else {}
status = order_result.get("status", "unknown")
should_notify = False
msg = ""
if status == "placed" and notify_cfg.get("order_filled", True):
should_notify = True
msg = f"[주문완료] {symbol}\n상태: 주문 성공"
elif status in ("error", "failed") and notify_cfg.get("order_error", True):
should_notify = True
msg = f"[주문오류] {symbol}\n상태: {status}\n에러: {order_result.get('error')}"
if should_notify and msg:
try:
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False)
return True
except Exception as e:
logger.exception("주문 결과 알림 전송 실패: %s", e)
return False
return False
def _calculate_and_add_profit_rate(trade_record: dict, symbol: str, sell_price: float):
try:
from .holdings import load_holdings
holdings = load_holdings("holdings.json")
if symbol not in holdings:
return
buy_price = float(holdings[symbol].get("buy_price", 0.0) or 0.0)
if buy_price > 0 and sell_price > 0:
profit_rate = ((sell_price - buy_price) / buy_price) * 100
trade_record["buy_price"] = buy_price
trade_record["sell_price"] = sell_price
trade_record["profit_rate"] = round(profit_rate, 2)
logger.info("[%s] 매도 수익률: %.2f%% (매수가: %.2f, 매도가: %.2f)",
symbol, profit_rate, buy_price, sell_price)
else:
logger.warning("[%s] 수익률 계산 불가: buy_price=%.2f, sell_price=%.2f", symbol, buy_price, sell_price)
trade_record["profit_rate"] = None
except Exception as e:
logger.warning("매도 수익률 계산 중 오류 (기록은 계속 진행): %s", e)
trade_record["profit_rate"] = None
def place_buy_order_kiwoom(market: str, amount: int, config: dict, account_number: str, dry_run: bool = True) -> dict:
from .holdings import get_current_price
if dry_run:
price = get_current_price(market)
quantity = int(amount // price) if price > 0 else 0
logger.info("[place_buy_order_kiwoom][dry-run] %s 매수 금액=%d, 예상 수량=%d", market, amount, quantity)
return {"market": market, "side": "buy", "amount": amount, "price": price, "status": "simulated", "timestamp": time.time()}
if not account_number:
msg = "Kiwoom 계좌번호 없음: 매수 주문을 실행할 수 없습니다"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
try:
api = get_kiwoom_api()
price = get_current_price(market)
if price <= 0:
msg = f"현재가 조회 실패로 매수 불가: {market}"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
quantity = int(amount // price)
min_order_value = config.get("auto_trade", {}).get("min_order_value", 100000)
if amount < min_order_value:
msg = f"시장가 매수 건너뜀: 주문 금액 {amount} < 최소 {min_order_value}"
logger.warning(msg)
return {"market": market, "side": "buy", "amount": amount, "status": "skipped_too_small", "reason": "min_order_value", "timestamp": time.time()}
# 시장가 매수 주문 (가격 0)
order_result = api.send_order(1, market, quantity, 0, account_number)
logger.info("Kiwoom 시장가 매수 주문: %s, 금액=%d, 수량=%d", market, amount, quantity)
logger.info("Kiwoom 주문 응답: %s", order_result)
status = "placed" if order_result == 0 else "failed"
result = {"market": market, "side": "buy", "amount": amount, "quantity": quantity, "status": status, "response": order_result, "timestamp": time.time()}
return result
except Exception as e:
logger.exception("Kiwoom 매수 주문 실패: %s", e)
return {"error": str(e), "status": "failed", "timestamp": time.time()}
def place_sell_order_kiwoom(market: str, quantity: int, config: dict, account_number: str, dry_run: bool = True) -> dict:
"""
매도 주문 실행
market: 종목코드
quantity: 매도 수량 (주)
"""
if dry_run:
logger.info("[place_sell_order_kiwoom][dry-run] %s 매도 수량=%d", market, quantity)
return {"market": market, "side": "sell", "quantity": quantity, "status": "simulated", "timestamp": time.time()}
if not account_number:
msg = "Kiwoom 계좌번호 없음: 매도 주문을 실행할 수 없습니다"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
try:
api = get_kiwoom_api()
# 매도 수량 검증
if quantity <= 0:
msg = f"잘못된 매도 수량: {quantity}"
logger.error(msg)
return {"error": msg, "status": "failed", "timestamp": time.time()}
# 시장가 매도 주문 (가격 0)
order_result = api.send_order(2, market, int(quantity), 0, account_number)
logger.info("Kiwoom 시장가 매도 주문: %s, 수량=%d", market, quantity)
logger.info("Kiwoom 주문 응답: %s", order_result)
status = "placed" if order_result == 0 else "failed"
result = {"market": market, "side": "sell", "quantity": quantity, "status": status, "response": order_result, "timestamp": time.time()}
return result
except Exception as e:
logger.exception("Kiwoom 매도 주문 실패: %s", e)
return {"error": str(e), "status": "failed", "timestamp": time.time()}
def execute_sell_order_with_confirmation(symbol: str, quantity: int, config: dict, telegram_token: str, telegram_chat_id: str, account_number: str, dry_run: bool, parse_mode: str = "HTML") -> dict:
"""
확인 절차를 거쳐 매도 주문 실행
symbol: 종목코드
quantity: 매도 수량 (주)
"""
confirm_cfg = config.get("confirm", {})
confirm_via_file = confirm_cfg.get("confirm_via_file", True)
confirm_timeout = confirm_cfg.get("confirm_timeout", 300)
result = None
if not confirm_via_file:
logger.info("파일 확인 비활성화: 즉시 매도 주문 실행")
result = place_sell_order_kiwoom(symbol, quantity, config, account_number, dry_run)
else:
token = _make_confirm_token()
order_info = {"symbol": symbol, "side": "sell", "quantity": quantity, "timestamp": time.time()}
_write_pending_order(token, order_info)
if parse_mode == "HTML":
msg = f"<b>[확인필요] 자동매도 주문 대기</b>\n"
msg += f"토큰: <code>{token}</code>\n"
msg += f"심볼: <b>{symbol}</b>\n"
msg += f"매도수량: <b>{quantity:,d}주</b>\n\n"
msg += f"확인 방법:\n"
msg += f"1. 파일 생성: <code>confirm_{token}</code>\n"
msg += f"2. 또는 <code>confirmed_tokens.txt</code>에 토큰 추가\n"
msg += f"타임아웃: {confirm_timeout}"
else:
msg = f"[확인필요] 자동매도 주문 대기\n"
msg += f"토큰: {token}\n"
msg += f"심볼: {symbol}\n"
msg += f"매도수량: {quantity:,d}\n\n"
msg += f"확인 방법:\n"
msg += f"1. 파일 생성: confirm_{token}\n"
msg += f"2. 또는 confirmed_tokens.txt에 토큰 추가\n"
msg += f"타임아웃: {confirm_timeout}"
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False, parse_mode=parse_mode)
logger.info("[%s] 매도 확인 대기 중: 토큰=%s, 타임아웃=%d", symbol, token, confirm_timeout)
confirmed = _check_confirmation(token, confirm_timeout)
if not confirmed:
logger.warning("[%s] 매도 확인 타임아웃: 주문 취소", symbol)
if telegram_token and telegram_chat_id:
cancel_msg = f"[주문취소] {symbol} 매도\n사유: 사용자 미확인 (타임아웃)"
send_telegram(telegram_token, telegram_chat_id, cancel_msg, add_thread_prefix=False, parse_mode=parse_mode)
result = {"status": "user_not_confirmed", "symbol": symbol, "timestamp": time.time()}
else:
logger.info("[%s] 매도 확인 완료: 주문 실행", symbol)
result = place_sell_order_kiwoom(symbol, quantity, config, account_number, dry_run)
if result:
notify_order_result(symbol, result, config, telegram_token, telegram_chat_id)
trade_status = result.get("status")
if trade_status in ["simulated", "placed", "user_not_confirmed"]:
from .holdings import get_current_price
sell_price = get_current_price(symbol) if trade_status == "placed" else 0
trade_record = {"symbol": symbol, "side": "sell", "quantity": quantity, "timestamp": time.time(), "dry_run": dry_run, "result": result}
_calculate_and_add_profit_rate(trade_record, symbol, sell_price)
from .signals import record_trade
record_trade(trade_record)
if not dry_run and result.get("status") == "placed":
from .holdings import update_holding_amount
update_holding_amount(symbol, -quantity, "holdings.json")
return result
def execute_buy_order_with_confirmation(symbol: str, amount: int, config: dict, telegram_token: str, telegram_chat_id: str, account_number: str, dry_run: bool, parse_mode: str = "HTML") -> dict:
confirm_cfg = config.get("confirm", {})
confirm_via_file = confirm_cfg.get("confirm_via_file", True)
confirm_timeout = confirm_cfg.get("confirm_timeout", 300)
result = None
if not confirm_via_file:
logger.info("파일 확인 비활성화: 즉시 매수 주문 실행")
result = place_buy_order_kiwoom(symbol, amount, config, account_number, dry_run)
else:
token = _make_confirm_token()
order_info = {"symbol": symbol, "side": "buy", "amount": amount, "timestamp": time.time()}
_write_pending_order(token, order_info)
if parse_mode == "HTML":
msg = f"<b>[확인필요] 자동매수 주문 대기</b>\n"
msg += f"토큰: <code>{token}</code>\n"
msg += f"심볼: <b>{symbol}</b>\n"
msg += f"매수금액: <b>{amount:,.0f}</b>\n\n"
msg += f"확인 방법:\n"
msg += f"1. 파일 생성: <code>confirm_{token}</code>\n"
msg += f"2. 또는 <code>confirmed_tokens.txt</code>에 토큰 추가\n"
msg += f"타임아웃: {confirm_timeout}"
else:
msg = f"[확인필요] 자동매수 주문 대기\n"
msg += f"토큰: {token}\n"
msg += f"심볼: {symbol}\n"
msg += f"매수금액: {amount:,.0f}\n\n"
msg += f"확인 방법:\n"
msg += f"1. 파일 생성: confirm_{token}\n"
msg += f"2. 또는 confirmed_tokens.txt에 토큰 추가\n"
msg += f"타임아웃: {confirm_timeout}"
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False, parse_mode=parse_mode)
logger.info("[%s] 매수 확인 대기 중: 토큰=%s, 타임아웃=%d", symbol, token, confirm_timeout)
confirmed = _check_confirmation(token, confirm_timeout)
if not confirmed:
logger.warning("[%s] 매수 확인 타임아웃: 주문 취소", symbol)
if telegram_token and telegram_chat_id:
cancel_msg = f"[주문취소] {symbol} 매수\n사유: 사용자 미확인 (타임아웃)"
send_telegram(telegram_token, telegram_chat_id, cancel_msg, add_thread_prefix=False, parse_mode=parse_mode)
result = {"status": "user_not_confirmed", "symbol": symbol, "timestamp": time.time()}
else:
logger.info("[%s] 매수 확인 완료: 주문 실행", symbol)
result = place_buy_order_kiwoom(symbol, amount, config, account_number, dry_run)
if result:
notify_order_result(symbol, result, config, telegram_token, telegram_chat_id)
trade_status = result.get("status")
if trade_status in ["simulated", "placed", "user_not_confirmed"]:
trade_record = {
"symbol": symbol,
"side": "buy",
"amount": amount,
"timestamp": time.time(),
"dry_run": dry_run,
"result": result
}
from .signals import record_trade
record_trade(trade_record)
return result

551
src/signals.py Normal file
View File

@@ -0,0 +1,551 @@
import os
import time
import json
import inspect
from typing import List, Dict, Tuple, Any
from enum import Enum
import pandas as pd
import pandas_ta as ta
from datetime import datetime
from .common import logger
from .config import RuntimeConfig
from .indicators import fetch_ohlcv, compute_macd_hist, compute_sma
from .holdings import fetch_holdings_from_kiwoom, get_current_price
from .notifications import send_telegram
class CheckType(str, Enum):
"""매도 조건 체크 타입."""
STOP_LOSS = "stop_loss" # 손절 조건 (1시간 주기)
PROFIT_TAKING = "profit_taking" # 익절 조건 (4시간 주기)
ALL = "all" # 모든 조건
def make_trade_record(symbol, side, amount, dry_run, price=None, status="simulated"):
now = float(time.time())
return {
"symbol": symbol,
"side": side,
"amount": amount,
"timestamp": now,
"datetime": datetime.fromtimestamp(now).strftime("%Y-%m-%d %H:%M:%S"),
"dry_run": dry_run,
"result": {
"market": symbol,
"side": side,
"amount": amount,
"price": price,
"status": status,
"timestamp": now
}
}
def evaluate_sell_conditions(
current_price: float,
buy_price: float,
max_price: float,
holding_info: Dict[str, Any],
config: Dict[str, Any] | None = None,
check_type: str = CheckType.ALL
) -> Dict[str, Any]:
"""매도 조건을 평가하여 매도 여부와 비율을 반환.
Args:
current_price: 현재 가격
buy_price: 매수 가격
max_price: 최고 가격
holding_info: 보유 정보 (partial_sell_done 포함)
config: 설정 딕셔너리
check_type: 체크 타입 (CheckType.STOP_LOSS, CheckType.PROFIT_TAKING, CheckType.ALL)
Returns:
매도 조건 평가 결과 딕셔너리
"""
config = config or {}
auto_trade_config = config.get("auto_trade", {})
loss_threshold = float(auto_trade_config.get("loss_threshold", -5.0))
profit_threshold_1 = float(auto_trade_config.get("profit_threshold_1", 10.0))
profit_threshold_2 = float(auto_trade_config.get("profit_threshold_2", 30.0))
drawdown_1 = float(auto_trade_config.get("drawdown_1", 5.0))
drawdown_2 = float(auto_trade_config.get("drawdown_2", 15.0))
partial_sell_ratio = float(auto_trade_config.get("partial_sell_ratio", 0.5))
# 가격 유효성 검증
if buy_price <= 0:
logger.error(f"비정상 매수가: {buy_price}")
return {
"status": "error",
"sell_ratio": 0.0,
"reasons": [f"비정상 매수가: {buy_price}"],
"profit_rate": 0,
"max_drawdown": 0,
"set_partial_sell_done": False,
"check_interval_minutes": 60
}
if max_price <= 0:
logger.warning(f"비정상 최고가: {max_price}, 현재가로 대체")
max_price = current_price
profit_rate = ((current_price - buy_price) / buy_price) * 100
max_drawdown = ((current_price - max_price) / max_price) * 100
result = {
"status": "hold",
"sell_ratio": 0.0,
"reasons": [],
"profit_rate": profit_rate,
"max_drawdown": max_drawdown,
"set_partial_sell_done": False,
"check_interval_minutes": 60 # 기본값: 1시간
}
# check_type에 따른 조건 필터링
# "stop_loss": 1시간 주기 조건만 (조건1, 조건3, 조건4-2, 조건5-2)
# "profit_taking": 4시간 주기 조건만 (조건2, 조건4-1, 조건5-1)
# "all": 모든 조건
# 조건 우선순위:
# 1. 손절 (조건1) - 최우선
# 2. 부분 익절 (조건3) - 수익 실현 시작 (1회성)
# 3. 전량 익절 (조건4, 5) - 최고점 대비 하락 또는 수익률 하락
# 조건1: 손절 -5% (1시간 주기) - 손실 방지 최우선
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= loss_threshold:
result.update(status="stop_loss", sell_ratio=1.0, check_interval_minutes=60)
result["reasons"].append(f"손절(조건1): 수익률 {profit_rate:.2f}% <= {loss_threshold}%")
return result
max_profit_rate = ((max_price - buy_price) / buy_price) * 100
# 조건3: 부분 익절 10% (1시간 주기) - profit_threshold_1 도달 시 일부 수익 확정
# 주의: 부분 익절은 1회만 실행되며, 이후 전량 익절 조건만 평가됨
partial_sell_done = holding_info.get("partial_sell_done", False)
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and not partial_sell_done and profit_rate >= profit_threshold_1:
result.update(status="partial_profit", sell_ratio=partial_sell_ratio, check_interval_minutes=60)
result["reasons"].append(f"부분 익절(조건3): 수익률 {profit_rate:.2f}% 달성, {int(partial_sell_ratio * 100)}% 매도")
result["set_partial_sell_done"] = True
return result
if max_profit_rate > profit_threshold_2:
# 조건5-1: 최고점 -15% 하락 (4시간 주기)
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_2:
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=240)
result["reasons"].append(f"전량 익절(조건5-1): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_2}%)")
return result
# 조건5-2: 수익률 30% 이하 하락 (1시간 주기)
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= profit_threshold_2:
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=60)
result["reasons"].append(f"전량 익절(조건5-2): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 수익률 {profit_rate:.2f}%로 하락 (기준: {profit_threshold_2}%)")
return result
elif profit_threshold_1 < max_profit_rate <= profit_threshold_2:
# 조건4-1: 최고점 -5% 하락 (4시간 주기)
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_1:
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=240)
result["reasons"].append(f"전량 익절(조건4-1): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)")
return result
# 조건4-2: 수익률 10% 이하 하락 (1시간 주기)
if check_type in (CheckType.STOP_LOSS, CheckType.ALL) and profit_rate <= profit_threshold_1:
result.update(status="profit_taking", sell_ratio=1.0, check_interval_minutes=60)
result["reasons"].append(f"전량 익절(조건4-2): 최고 수익률({max_profit_rate:.2f}%) 달성 후, 수익률 {profit_rate:.2f}%로 하락 (기준: {profit_threshold_1}%)")
return result
elif max_profit_rate <= profit_threshold_1:
# 조건2: 10% 이하 + 최고점 -5% (4시간 주기)
if check_type in (CheckType.PROFIT_TAKING, CheckType.ALL) and max_drawdown <= -drawdown_1:
result.update(status="stop_loss", sell_ratio=1.0, check_interval_minutes=240)
result["reasons"].append(f"손절(조건2): 최고점 대비 {abs(max_drawdown):.2f}% 하락 (기준: {drawdown_1}%)")
return result
result["reasons"].append(f"홀드 (수익률 {profit_rate:.2f}%, 최고점 대비 하락 {max_drawdown:.2f}%)")
return result
def build_sell_message(symbol: str, sell_result: dict, parse_mode: str = "HTML") -> str:
status = sell_result.get("status", "unknown")
profit = sell_result.get("profit_rate", 0.0)
drawdown = sell_result.get("max_drawdown", 0.0)
ratio = int(sell_result.get("sell_ratio", 0.0) * 100)
reasons = sell_result.get("reasons", [])
reason = reasons[0] if reasons else "사유 없음"
if parse_mode == "HTML":
msg = f"<b>🔴 매도 신호: {symbol}</b>\n"
msg += f"상태: <b>{status}</b>\n"
msg += f"수익률: <b>{profit:.2f}%</b>\n"
msg += f"최고점 대비: <b>{drawdown:.2f}%</b>\n"
msg += f"매도 비율: <b>{ratio}%</b>\n"
msg += f"사유: {reason}\n"
return msg
msg = f"🔴 매도 신호: {symbol}\n"
msg += f"상태: {status}\n"
msg += f"수익률: {profit:.2f}%\n"
msg += f"최고점 대비: {drawdown:.2f}%\n"
msg += f"매도 비율: {ratio}%\n"
msg += f"사유: {reason}\n"
return msg
def _adjust_sell_ratio_for_min_order(symbol: str, total_amount: float, sell_ratio: float, current_price: float, config: dict) -> float:
if not (0 < sell_ratio < 1):
return sell_ratio
auto_trade_cfg = (config or {}).get("auto_trade", {})
try:
min_order_value = float(auto_trade_cfg.get("min_order_value", 100000))
except (ValueError, TypeError):
min_order_value = 100000.0
amount_to_sell_partial = total_amount * sell_ratio
value_to_sell = amount_to_sell_partial * current_price
value_remaining = (total_amount - amount_to_sell_partial) * current_price
if value_to_sell < min_order_value or value_remaining < min_order_value:
logger.info("[%s] 부분 매도(%.0f%%) 조건 충족했으나, 최소 주문 금액(%.0f) 문제로 전량 매도로 전환합니다. (예상 매도액: %.0f, 예상 잔여액: %.0f)",
symbol, sell_ratio * 100, min_order_value, value_to_sell, value_remaining)
return 1.0
return sell_ratio
def record_trade(trade: dict, trades_file: str = "trades.json"):
try:
trades = []
if os.path.exists(trades_file):
with open(trades_file, "r", encoding="utf-8") as f:
try:
trades = json.load(f)
except Exception:
trades = []
trades.append(trade)
with open(trades_file, "w", encoding="utf-8") as f:
json.dump(trades, f, ensure_ascii=False, indent=2)
logger.debug("거래기록 저장됨: %s", trades_file)
except Exception as e:
logger.exception("거래기록 저장 실패: %s", e)
def _update_df_with_realtime_price(df: pd.DataFrame, symbol: str, timeframe: str, buffer: list) -> pd.DataFrame:
"""
실시간 가격으로 현재 캔들 업데이트
타임프레임에 따라 현재 캔들이 아직 진행 중인지 판단하여 업데이트
"""
try:
from datetime import datetime, timezone
current_price = get_current_price(symbol)
if not (current_price > 0 and df is not None and not df.empty):
return df
last_candle_time = df.index[-1]
now = datetime.now(timezone.utc)
# 타임프레임별 interval_seconds 계산
interval_seconds = 0
try:
if 'm' in timeframe:
interval_seconds = int(timeframe.replace('m', '')) * 60
elif 'h' in timeframe:
interval_seconds = int(timeframe.replace('h', '')) * 3600
elif timeframe == 'D':
interval_seconds = 86400
elif timeframe == 'W':
interval_seconds = 86400 * 7
elif timeframe == 'M':
# 월봉은 정확한 계산이 어려우므로 업데이트 건너뜀
buffer.append("warning: 월봉(M)은 실시간 업데이트를 지원하지 않습니다")
return df
except (ValueError, AttributeError) as e:
buffer.append(f"warning: 타임프레임 파싱 실패 ({timeframe}): {e}")
return df
if interval_seconds > 0:
if last_candle_time.tzinfo is None:
last_candle_time = last_candle_time.tz_localize(timezone.utc)
next_candle_time = last_candle_time + pd.Timedelta(seconds=interval_seconds)
if last_candle_time <= now < next_candle_time:
# 현재 캔들이 아직 진행 중이므로 실시간 가격으로 업데이트
df.loc[df.index[-1], 'close'] = current_price
df.loc[df.index[-1], 'high'] = max(df.loc[df.index[-1], 'high'], current_price)
df.loc[df.index[-1], 'low'] = min(df.loc[df.index[-1], 'low'], current_price)
buffer.append(f"실시간 캔들 업데이트 적용: close={current_price:.2f}, timeframe={timeframe}")
else:
buffer.append(f"info: 새로운 캔들 시작됨, 실시간 업데이트 건너뜀 (timeframe={timeframe})")
else:
buffer.append(f"warning: interval_seconds 계산 실패 (timeframe={timeframe})")
except Exception as e:
buffer.append(f"warning: 실시간 캔들 업데이트 실패: {e}")
return df
def process_symbol(symbol: str, timeframe: str, candle_count: int, telegram_token: str, telegram_chat_id: str, dry_run: bool, indicators: dict = None, indicator_timeframe: str = None, cfg: RuntimeConfig | None = None) -> dict:
result = {"symbol": symbol, "summary": [], "telegram": None, "error": None}
try:
if cfg is not None:
timeframe = timeframe or cfg.timeframe
candle_count = candle_count or cfg.candle_count
telegram_token = telegram_token or cfg.telegram_bot_token
telegram_chat_id = telegram_chat_id or cfg.telegram_chat_id
dry_run = cfg.dry_run if dry_run is None else dry_run
indicator_timeframe = indicator_timeframe or cfg.indicator_timeframe
buffer = []
use_tf = indicator_timeframe or timeframe
df = fetch_ohlcv(symbol, use_tf, candle_count=candle_count, log_buffer=buffer)
df = _update_df_with_realtime_price(df, symbol, use_tf, buffer)
if buffer:
for b in buffer:
result["summary"].append(b)
if df.empty or len(df) < 3:
result["summary"].append(f"MACD 계산에 충분한 데이터 없음: {symbol}")
result["error"] = "insufficient_data"
return result
hist = compute_macd_hist(df["close"], log_buffer=buffer)
if buffer and len(result["summary"]) == 0:
for b in buffer:
result["summary"].append(b)
if len(hist.dropna()) < 2:
result["summary"].append(f"MACD 히스토그램 값 부족: {symbol}")
result["error"] = "insufficient_macd"
return result
ind = indicators or {}
macd_fast = int(ind.get("macd_fast", 12))
macd_slow = int(ind.get("macd_slow", 26))
macd_signal = int(ind.get("macd_signal", 9))
adx_length = int(ind.get("adx_length", 14))
adx_threshold = float(ind.get("adx_threshold", 25))
sma_short_len = int(ind.get("sma_short", 5))
sma_long_len = int(ind.get("sma_long", 200))
macd_df = ta.macd(df["close"], fast= macd_fast, slow= macd_slow, signal= macd_signal)
cols = list(macd_df.columns)
hist_cols = [c for c in cols if "MACDh" in c or "hist" in c.lower()]
macd_cols = [c for c in cols if ("MACD" in c and c not in hist_cols and not c.lower().endswith("s"))]
signal_cols = [c for c in cols if ("MACDs" in c or c.lower().endswith("s") or "signal" in c.lower())]
if not macd_cols or not signal_cols:
if len(cols) >= 3:
macd_col = cols[0]
signal_col = cols[-1]
else:
raise RuntimeError("MACD columns not found")
else:
macd_col = macd_cols[0]
signal_col = signal_cols[0]
macd_line = macd_df[macd_col].dropna()
signal_line = macd_df[signal_col].dropna()
sma_short = compute_sma(df["close"], sma_short_len, log_buffer=buffer)
sma_long = compute_sma(df["close"], sma_long_len, log_buffer=buffer)
adx_df = ta.adx(df["high"], df["low"], df["close"], length=adx_length)
adx_cols = [c for c in adx_df.columns if "ADX" in c.upper()]
adx_series = adx_df[adx_cols[0]].dropna() if adx_cols else pd.Series([])
if macd_line is None or signal_line is None:
if hist is not None and len(hist.dropna()) >= 2:
slope = float(hist.dropna().iloc[-1] - hist.dropna().iloc[-2])
if slope > 0:
close_price = float(df["close"].iloc[-1])
result["summary"].append(f"매수 신호발생 (히스토그램 기울기): {symbol}")
result["telegram"] = f"매수 신호발생: {symbol} -> 히스토그램 기울기 양수\n가격: {close_price:.2f}\n사유: histogram slope {slope:.6f}"
return result
result["summary"].append("조건 미충족: 히스토그램 기울기 음수")
result["error"] = "insufficient_macd"
return result
result["summary"].append("MACD 데이터 부족: 교차 판별 불가")
result["error"] = "insufficient_macd"
return result
if len(macd_line) < 2 or len(signal_line) < 2:
result["summary"].append("MACD 데이터 부족: 교차 판별 불가")
return result
close = float(df["close"].iloc[-1])
prev_macd = float(macd_line.iloc[-2])
curr_macd = float(macd_line.iloc[-1])
prev_signal = float(signal_line.iloc[-2])
curr_signal = float(signal_line.iloc[-1])
prev_sma_short = float(sma_short.dropna().iloc[-2]) if len(sma_short.dropna()) >= 2 else None
curr_sma_short = float(sma_short.dropna().iloc[-1]) if len(sma_short.dropna()) >= 1 else None
prev_sma_long = float(sma_long.dropna().iloc[-2]) if len(sma_long.dropna()) >= 2 else None
curr_sma_long = float(sma_long.dropna().iloc[-1]) if len(sma_long.dropna()) >= 1 else None
prev_adx = float(adx_series.iloc[-2]) if len(adx_series) >= 2 else None
curr_adx = float(adx_series.iloc[-1]) if len(adx_series) >= 1 else None
cross_macd_signal = (prev_macd < prev_signal and curr_macd > curr_signal)
cross_macd_zero = (prev_macd < 0 and curr_macd > 0)
macd_cross_ok = (cross_macd_signal or cross_macd_zero)
macd_above_signal = (curr_macd > curr_signal)
sma_condition = (curr_sma_short is not None and curr_sma_long is not None and curr_sma_short > curr_sma_long)
cross_sma = (prev_sma_short is not None and prev_sma_long is not None and prev_sma_short < prev_sma_long and curr_sma_short is not None and curr_sma_long is not None and curr_sma_short > curr_sma_long)
adx_ok = (curr_adx is not None and curr_adx > adx_threshold)
cross_adx = (prev_adx is not None and curr_adx is not None and prev_adx <= adx_threshold and curr_adx > adx_threshold)
matches = []
if macd_cross_ok and sma_condition:
matches.append("매수조건1")
if cross_sma and macd_above_signal and adx_ok:
matches.append("매수조건2")
if cross_adx and sma_condition and macd_above_signal:
matches.append("매수조건3")
if matches:
result["summary"].append(f"매수 신호발생: {symbol} -> {', '.join(matches)}")
text = f"매수 신호발생: {symbol} -> {', '.join(matches)}\n가격: {close:.2f}\n"
result["telegram"] = text
amount = cfg.config.get("auto_trade", {}).get("buy_amount", 0) if cfg else 0
trade_recorded = False
if dry_run:
trade = make_trade_record(symbol, "buy", amount, True, price=close, status="simulated")
record_trade(trade, "trades.json")
trade_recorded = True
elif cfg is not None and cfg.trading_mode == "auto_trade":
auto_trade_cfg = cfg.config.get("auto_trade", {})
buy_enabled = auto_trade_cfg.get("buy_enabled", False)
buy_amount = auto_trade_cfg.get("buy_amount", 0)
allowed_symbols = auto_trade_cfg.get("allowed_symbols", [])
can_auto_buy = buy_enabled and buy_amount > 0
if allowed_symbols and symbol not in allowed_symbols:
can_auto_buy = False
if can_auto_buy:
logger.info("[%s] 자동 매수 조건 충족: 매수 주문 시작 (금액: %d)", symbol, buy_amount)
from .order import execute_buy_order_with_confirmation
buy_result = execute_buy_order_with_confirmation(symbol=symbol, amount=buy_amount, config=cfg.config, telegram_token=telegram_token, telegram_chat_id=telegram_chat_id, account_number=cfg.kiwoom_account_number, dry_run=False, parse_mode=cfg.telegram_parse_mode)
result["buy_order"] = buy_result
if buy_result.get("status") == "placed":
trade_recorded = True
from .holdings import add_new_holding
quantity = buy_result.get("quantity", 0)
add_new_holding(symbol, close, quantity, time.time(), "holdings.json")
trade = make_trade_record(symbol, "buy", buy_amount, False, price=close, status="filled")
record_trade(trade, "trades.json")
if not trade_recorded and not dry_run:
trade = make_trade_record(symbol, "buy", amount, False, price=close, status="notified")
record_trade(trade, "trades.json")
return result
except Exception as e:
logger.exception("심볼 처리 중 오류: %s -> %s", symbol, e)
result["error"] = str(e)
result["summary"].append(f"심볼 처리 중 오류: {symbol} -> {e}")
return result
def check_sell_conditions(
holdings: Dict[str, Dict[str, Any]],
cfg: RuntimeConfig,
config: Dict[str, Any] | None = None,
check_type: str = CheckType.ALL
) -> Tuple[List[Dict[str, Any]], int]:
"""보유 종목의 매도 조건을 확인.
Args:
holdings: 보유 종목 딕셔너리
cfg: 런타임 설정
config: 설정 딕셔너리
check_type: 체크 타입 (CheckType.STOP_LOSS, CheckType.PROFIT_TAKING, CheckType.ALL)
Returns:
(매도 결과 리스트, 매도 신호 개수)
"""
if config is None and cfg is not None and hasattr(cfg, "config"):
config = cfg.config
results = []
telegram_token = cfg.telegram_bot_token
telegram_chat_id = cfg.telegram_chat_id
dry_run = cfg.dry_run
account_number = cfg.kiwoom_account_number
trading_mode = cfg.trading_mode
if not holdings:
logger.info("보유 정보가 없음 - 매도 조건 검사 건너뜀")
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매도 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
return [], 0
sell_signal_count = 0
from src.order import execute_sell_order_with_confirmation
for symbol, holding_info in holdings.items():
try:
current_price = get_current_price(symbol)
if current_price <= 0:
logger.warning("현재가 조회 실패: %s", symbol)
continue
buy_price = float(holding_info.get("buy_price", 0))
max_price = float(holding_info.get("max_price", current_price))
if buy_price <= 0:
logger.warning("매수가 정보 없음: %s", symbol)
continue
sell_result = evaluate_sell_conditions(
current_price,
buy_price,
max_price,
holding_info,
config,
check_type
)
logger.info("[%s] 매도 조건 검사 - 현재가: %.2f, 매수가: %.2f, 최고가: %.2f, 수익률: %.2f%%, 최고점대비: %.2f%%", symbol, current_price, buy_price, max_price, sell_result["profit_rate"], sell_result["max_drawdown"])
logger.info("[%s] 매도 상태: %s (비율: %.0f%%), 사유: %s", symbol, sell_result["status"], sell_result["sell_ratio"] * 100, ", ".join(sell_result["reasons"]))
result = {"symbol": symbol, "status": sell_result["status"], "sell_ratio": sell_result["sell_ratio"], "profit_rate": sell_result["profit_rate"], "max_drawdown": sell_result["max_drawdown"], "reasons": sell_result["reasons"], "current_price": current_price, "buy_price": buy_price, "max_price": max_price, "amount": holding_info.get("amount", 0)}
results.append(result)
if sell_result.get("set_partial_sell_done", False) and dry_run:
from src.holdings import set_holding_field
set_holding_field(symbol, "partial_sell_done", True, "holdings.json")
logger.info("[%s] partial_sell_done 플래그 설정 완료 (dry_run)", symbol)
if sell_result["sell_ratio"] > 0:
sell_signal_count += 1
if ((cfg.trading_mode == "auto_trade" and dry_run) or cfg.trading_mode != "auto_trade"):
if telegram_token and telegram_chat_id:
from .signals import build_sell_message
msg = build_sell_message(symbol, sell_result, parse_mode=cfg.telegram_parse_mode or "HTML")
send_telegram(telegram_token, telegram_chat_id, msg, add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
if cfg.trading_mode == "auto_trade" and not dry_run:
total_amount = float(holding_info.get("amount", 0))
sell_ratio = float(sell_result.get("sell_ratio", 0.0) or 0.0)
sell_ratio = _adjust_sell_ratio_for_min_order(symbol, total_amount, sell_ratio, current_price, config)
amount_to_sell = int(total_amount * sell_ratio)
if amount_to_sell > 0:
logger.info("[%s] 자동 매도 조건 충족: 매도 주문 시작 (총 수량: %d, 매도 비율: %.0f%%, 주문 수량: %d)", symbol, total_amount, sell_ratio * 100, amount_to_sell)
sell_order_result = execute_sell_order_with_confirmation(
symbol=symbol,
quantity=amount_to_sell,
config=config,
telegram_token=telegram_token,
telegram_chat_id=telegram_chat_id,
account_number=account_number,
dry_run=dry_run,
parse_mode=cfg.telegram_parse_mode or "HTML"
)
if sell_order_result and sell_result.get("set_partial_sell_done"):
if sell_order_result.get("status") == "placed":
from .holdings import set_holding_field
if set_holding_field(symbol, "partial_sell_done", True, holdings_file="holdings.json"):
logger.info("[%s] 부분 매도(1회성) 완료, partial_sell_done 플래그를 True로 업데이트합니다.", symbol)
else:
logger.error("[%s] partial_sell_done 플래그 업데이트에 실패했습니다.", symbol)
except Exception as e:
logger.exception("매도 조건 확인 중 오류 (%s): %s", symbol, e)
if telegram_token and telegram_chat_id and not any(r["sell_ratio"] > 0 for r in results):
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매도 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=cfg.telegram_parse_mode or "HTML")
return results, sell_signal_count

1
src/tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Test package

View File

@@ -0,0 +1,81 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
import time
import pytest
from src.signals import evaluate_sell_conditions
@pytest.fixture
def config():
return {
"loss_threshold": -5.0,
"profit_threshold_1": 10.0,
"profit_threshold_2": 30.0,
"drawdown_1": 5.0,
"drawdown_2": 15.0,
}
def test_condition1_full_stop_loss(config):
# current price down 6% from buy -> full sell (condition1)
buy = 100.0
curr = 94.0 # -6%
maxp = 100.0
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["status"] in ("stop_loss", "손절", "loss_cut") or res["sell_ratio"] == 1.0
def test_condition2_stop_on_drawdown_small_profit(config):
# profit <= 10% and drawdown >= drawdown_1 (5%) -> full sell
buy = 100.0
maxp = 105.0
curr = 100.0 # profit 0%, drawdown from max -4.76% (not enough)
# adjust to make drawdown >5
maxp = 110.0
curr = 104.0 # profit 4%, drawdown ~5.45% -> full sell
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["sell_ratio"] == 1.0
def test_condition3_partial_take_profit(config):
# profit between 10% and 30% -> partial sell (50%)
buy = 100.0
maxp = 115.0
curr = 112.0 # profit 12% -> partial
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["sell_ratio"] == 0.5
def test_condition4_full_take_if_drawdown_in_10_30(config):
# profit 20% and drawdown from max >= drawdown_1 (5%) -> full sell
buy = 100.0
maxp = 130.0
curr = 120.0 # profit 20%, drawdown from max ~7.69% -> full
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["sell_ratio"] == 1.0
def test_condition5_high_profit_drawdown(config):
# profit >=30% and drawdown >=drawdown_2 (15%) -> full sell
buy = 100.0
maxp = 150.0
curr = 125.0 # profit 25% (not enough) -> adjust
curr = 140.0 # profit 40%, drawdown from max ~6.66% -> not full unless drawdown >=15
# make drawdown >=15
maxp = 170.0
curr = 145.0 # profit 45%, drawdown ~14.7% (just below)
maxp = 180.0
curr = 145.0 # drawdown 19.44% -> full
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["sell_ratio"] == 1.0
def test_condition6_hold(config):
# profit >=30% and drawdown less than drawdown_2 -> hold
buy = 100.0
maxp = 150.0
curr = 140.0 # profit 40%, drawdown ~6.66% -> hold
res = evaluate_sell_conditions(curr, buy, maxp, config)
assert res["sell_ratio"] == 0.0

56
src/tests/test_helpers.py Normal file
View File

@@ -0,0 +1,56 @@
"""Test helper functions used primarily in test scenarios."""
import inspect
import sys
import os
# Add parent directory to path to import from src
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
from src.common import logger
from src.signals import process_symbol
from src.notifications import send_telegram
def safe_send_telegram(bot_token: str, chat_id: str, text: str, **kwargs) -> bool:
"""Flexibly call send_telegram even if monkeypatched version has a simpler signature.
Inspects the target callable and only passes accepted parameters."""
func = send_telegram
try:
sig = inspect.signature(func)
accepted = sig.parameters.keys()
call_kwargs = {}
# positional mapping
params = list(accepted)
pos_args = [bot_token, chat_id, text]
for i, val in enumerate(pos_args):
if i < len(params):
call_kwargs[params[i]] = val
# optional kwargs filtered
for k, v in kwargs.items():
if k in accepted:
call_kwargs[k] = v
return func(**call_kwargs)
except Exception:
# Fallback positional
try:
return func(bot_token, chat_id, text)
except Exception:
return False
def check_and_notify(exchange: str, symbol: str, timeframe: str, telegram_token: str, telegram_chat_id: str, candle_count: int = 200, dry_run: bool = True):
"""Compatibility helper used by tests: run processing for a single symbol and send notification if needed.
exchange parameter is accepted for API compatibility but not used (we use pyupbit internally).
"""
try:
res = process_symbol(symbol, timeframe, candle_count, telegram_token, telegram_chat_id, dry_run, indicators=None, indicator_timeframe=None)
# If a telegram message was returned from process_symbol, send it (unless dry_run)
if res.get("telegram"):
if dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
else:
if telegram_token and telegram_chat_id:
safe_send_telegram(telegram_token, telegram_chat_id, res["telegram"], add_thread_prefix=False)
except Exception as e:
logger.exception("check_and_notify 오류: %s", e)

67
src/tests/test_main.py Normal file
View File

@@ -0,0 +1,67 @@
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
import builtins
import types
import pandas as pd
import pytest
import main
from .test_helpers import check_and_notify, safe_send_telegram
def test_compute_macd_hist_monkeypatch(monkeypatch):
# Arrange: monkeypatch pandas_ta.macd to return a DataFrame with MACDh column
dummy_macd = pd.DataFrame({"MACDh_12_26_9": [None, 0.5, 1.2, 2.3]})
def fake_macd(series, fast, slow, signal):
return dummy_macd
monkeypatch.setattr(main.ta, "macd", fake_macd)
close = pd.Series([1, 2, 3, 4])
# Act: import directly from indicators
from src.indicators import compute_macd_hist
hist = compute_macd_hist(close)
# Assert
assert isinstance(hist, pd.Series)
assert list(hist.dropna()) == [0.5, 1.2, 2.3]
def test_check_and_notify_positive_sends(monkeypatch):
# Prepare a fake OHLCV DataFrame
idx = pd.date_range(end=pd.Timestamp.now(), periods=4, freq="h")
df = pd.DataFrame({"close": [100, 110, 120, 140]}, index=idx)
# Monkeypatch at the point of use: src.signals imports from indicators
from src import signals
# Patch fetch_ohlcv and compute_macd_hist in signals module
monkeypatch.setattr(signals, "fetch_ohlcv", lambda symbol, timeframe, candle_count=200, log_buffer=None: df)
# Return histogram with at least 2 non-NA values and matching df index
monkeypatch.setattr(signals, "compute_macd_hist", lambda close_series, log_buffer=None: pd.Series([0.0, 0.5, 1.0, 5.0], index=df.index))
# Monkeypatch pandas_ta.macd to raise exception so histogram fallback path is used
def fake_macd_fail(*args, **kwargs):
raise RuntimeError("force histogram fallback")
monkeypatch.setattr(signals.ta, "macd", fake_macd_fail)
# Capture calls to safe_send_telegram
called = {"count": 0}
def fake_safe_send(token, chat_id, text, **kwargs):
called["count"] += 1
return True
# Monkeypatch test_helpers module
from . import test_helpers
monkeypatch.setattr(test_helpers, "safe_send_telegram", fake_safe_send)
# Act: call check_and_notify (not dry-run)
check_and_notify("upbit", "KRW-BTC", "1h", "token", "chat", candle_count=10, dry_run=False)
# Assert: safe_send_telegram was called
assert called["count"] == 1

138
src/threading_utils.py Normal file
View File

@@ -0,0 +1,138 @@
import time
import threading
from typing import List
from .config import RuntimeConfig
from .common import logger
from .signals import process_symbol
from .notifications import send_telegram
def run_sequential(symbols: List[str], timeframe: str | None = None, indicator_timeframe: str | None = None, candle_count: int | None = None, telegram_token: str | None = None, telegram_chat_id: str | None = None, dry_run: bool | None = None, symbol_delay: float | None = None, parse_mode: str | None = None, aggregate_enabled: bool = False, cfg: RuntimeConfig | None = None):
if cfg is not None:
timeframe = timeframe or cfg.timeframe
indicator_timeframe = indicator_timeframe or cfg.indicator_timeframe
candle_count = cfg.candle_count if (candle_count is None) else candle_count
telegram_token = telegram_token or cfg.telegram_bot_token
telegram_chat_id = telegram_chat_id or cfg.telegram_chat_id
dry_run = cfg.dry_run if (dry_run is None) else dry_run
symbol_delay = cfg.symbol_delay if (symbol_delay is None) else symbol_delay
parse_mode = parse_mode or cfg.telegram_parse_mode
logger.info("순차 처리 시작 (심볼 수=%d)", len(symbols))
alerts = []
buy_signal_count = 0
for i, sym in enumerate(symbols):
try:
res = process_symbol(sym, timeframe, candle_count, telegram_token, telegram_chat_id, dry_run, indicators=None, indicator_timeframe=indicator_timeframe, cfg=cfg)
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
else:
# dry_run이 아닐 때는 콘솔에 메시지 출력하지 않음
pass
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, res["telegram"], add_thread_prefix=False, parse_mode=parse_mode)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
except Exception as e:
logger.exception("심볼 처리 오류: %s -> %s", sym, e)
if i < len(symbols) - 1 and symbol_delay is not None:
logger.debug("다음 심볼까지 %.2f초 대기", symbol_delay)
time.sleep(symbol_delay)
if aggregate_enabled and len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, summary_text, add_thread_prefix=False, parse_mode=parse_mode)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
if telegram_token and telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=parse_mode)
return buy_signal_count
def run_with_threads(symbols: List[str], timeframe: str | None = None, indicator_timeframe: str | None = None, candle_count: int | None = None, telegram_token: str | None = None, telegram_chat_id: str | None = None, dry_run: bool | None = None, symbol_delay: float | None = None, max_threads: int | None = None, parse_mode: str | None = None, aggregate_enabled: bool = False, cfg: RuntimeConfig | None = None):
if cfg is not None:
timeframe = timeframe or cfg.timeframe
indicator_timeframe = indicator_timeframe or cfg.indicator_timeframe
candle_count = cfg.candle_count if (candle_count is None) else candle_count
telegram_token = telegram_token or cfg.telegram_bot_token
telegram_chat_id = telegram_chat_id or cfg.telegram_chat_id
dry_run = cfg.dry_run if (dry_run is None) else dry_run
symbol_delay = cfg.symbol_delay if (symbol_delay is None) else symbol_delay
max_threads = cfg.max_threads if (max_threads is None) else max_threads
parse_mode = parse_mode or cfg.telegram_parse_mode
logger.info("병렬 처리 시작 (심볼 수=%d, 스레드 수=%d, 심볼 간 지연=%.2f초)", len(symbols), max_threads or 0, symbol_delay or 0.0)
semaphore = threading.Semaphore(max_threads)
threads = []
last_request_time = [0]
throttle_lock = threading.Lock()
results = {}
results_lock = threading.Lock()
def worker(symbol: str):
try:
with semaphore:
with throttle_lock:
elapsed = time.time() - last_request_time[0]
if symbol_delay is not None and elapsed < symbol_delay:
sleep_time = symbol_delay - elapsed
logger.debug("[%s] 스로틀 대기: %.2f", symbol, sleep_time)
time.sleep(sleep_time)
last_request_time[0] = time.time()
res = process_symbol(symbol, timeframe, candle_count, telegram_token, telegram_chat_id, dry_run, indicators=None, indicator_timeframe=indicator_timeframe, cfg=cfg)
with results_lock:
results[symbol] = res
except Exception as e:
logger.exception("[%s] 워커 스레드 오류: %s", symbol, e)
for sym in symbols:
t = threading.Thread(target=worker, args=(sym,), name=f"Worker-{sym}")
threads.append(t)
t.start()
for t in threads:
t.join()
alerts = []
buy_signal_count = 0
for sym in symbols:
with results_lock:
res = results.get(sym)
if not res:
logger.warning("심볼 결과 없음: %s", sym)
continue
for line in res.get("summary", []):
logger.info(line)
if res.get("telegram"):
buy_signal_count += 1
if dry_run:
logger.info("[dry-run] 알림 내용:\n%s", res["telegram"])
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, res["telegram"], add_thread_prefix=False, parse_mode=parse_mode)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 메시지 전송 불가")
alerts.append({"symbol": sym, "text": res["telegram"]})
if aggregate_enabled and len(alerts) > 1:
summary_lines = [f"알림 발생 심볼 수: {len(alerts)}", "\n"]
summary_lines += [f"- {a['symbol']}" for a in alerts]
summary_text = "\n".join(summary_lines)
if dry_run:
logger.info("[dry-run] 알림 요약:\n%s", summary_text)
else:
if telegram_token and telegram_chat_id:
send_telegram(telegram_token, telegram_chat_id, summary_text, add_thread_prefix=False, parse_mode=parse_mode)
else:
logger.warning("텔레그램 토큰/채팅 ID가 설정되지 않아 요약 메시지 전송 불가")
# 매수 조건이 하나도 충족되지 않은 경우 알림 전송
if telegram_token and telegram_chat_id and not any(a.get("text") for a in alerts):
send_telegram(telegram_token, telegram_chat_id, "[알림] 충족된 매수 조건 없음 (프로그램 정상 작동 중)", add_thread_prefix=False, parse_mode=parse_mode)
logger.info("병렬 처리 완료")
return buy_signal_count