EMA Crossover Strategy with Stoploss and Target
Strategy Type
Test Strategy – Purpose-built for functional validation of data flow, signal generation, order placement, and WebSocket-based exits. Tight SL/Target are kept intentional for testing.
Instrument Configuration
Exchange: NSE
Symbol: NHPC
Quantity: 1
Product: MIS
Timeframe (bars): 5m
Historical Lookback: 5 days
Trade Direction Mode: BOTH (supports
LONG
/SHORT
/BOTH
)
Note: Behavior is logic-driven; MIS is used only for convenience during tests.
Indicators Used
EMA 2: Exponential Moving Average over the last 2 closing prices (very short-term).
EMA 4: Exponential Moving Average over the last 4 closing prices (short-term).
Entry Conditions
Fetch 5-minute historical candle data approximately every 5 seconds.
Calculate EMA-2 and EMA-4.
Confirm crossover using the last two closed candles (not the current forming candle).
Buy Signal: Previous candle EMA-2 ≤ EMA-4 and last closed candle EMA-2 > EMA-4.
Sell Signal: Previous candle EMA-2 ≥ EMA-4 and last closed candle EMA-2 < EMA-4.
On confirmed signal:
Place a MARKET order (BUY or SELL).
Capture entry price and compute risk levels:
For BUY: Stoploss = Entry − ₹0.10, Target = Entry + ₹0.20
For SELL: Stoploss = Entry + ₹0.10, Target = Entry − ₹0.20
Exit Conditions
Use WebSocket streaming to receive live LTP updates.
Continuously check if LTP hits the defined stoploss or target.
When triggered, exit via a MARKET order in the opposite direction.
Strategy Architecture
Two threads:
WebSocket Thread: Listens to real-time LTP and checks SL/Target.
Strategy Thread: Periodically fetches historical data, evaluates EMA signals, and initiates trades.
Uses
threading.Event()
for graceful shutdown via CTRL+C.
Shutdown Behavior
On keyboard interrupt, both threads are stopped safely.
WebSocket subscription is removed and the connection is closed.
If a position is open, the strategy attempts to close it with a MARKET order before exit.
The strategy exits cleanly and logs the shutdown.
Complete Code
"""
===============================================================================
EMA CROSSOVER WITH FIXED DATETIME HANDLING
OpenAlgo Trading Bot
===============================================================================
"""
from openalgo import api
import pandas as pd
from datetime import datetime, timedelta
import threading
import time
# ===============================================================================
# TRADING CONFIGURATION
# ===============================================================================
# API Configuration
API_KEY = "openalgo-apikey"
API_HOST = "http://127.0.0.1:5000"
WS_URL = "ws://127.0.0.1:8765"
# Trade Settings
SYMBOL = "NHPC" # Stock to trade
EXCHANGE = "NSE" # Exchange (NSE, BSE, NFO, etc.)
QUANTITY = 1 # Number of shares
PRODUCT = "MIS" # MIS (Intraday) or CNC (Delivery)
# Strategy Parameters
FAST_EMA_PERIOD = 2 # Fast EMA (smaller number)
SLOW_EMA_PERIOD = 4 # Slow EMA (larger number)
CANDLE_TIMEFRAME = "5m" # 1m, 5m, 15m, 30m, 1h, 1d
# Historical Data Lookback
LOOKBACK_DAYS = 3 # Number of days to fetch historical data (1-30)
# Risk Management
STOPLOSS = 0.1 # Stoploss in Rupees
TARGET = 0.2 # Target in Rupees
# Direction Control
TRADE_DIRECTION = "BOTH" # Options: "LONG", "SHORT", "BOTH"
# Signal Check Interval
SIGNAL_CHECK_INTERVAL = 5 # Check for signals every X seconds
# ===============================================================================
# TRADING BOT WITH FIXED DATETIME
# ===============================================================================
class ConfigurableEMABot:
def __init__(self):
"""Initialize the trading bot with configurable parameters"""
# Initialize API client
self.client = api(
api_key=API_KEY,
host=API_HOST,
ws_url=WS_URL
)
# Position tracking
self.position = None
self.entry_price = 0
self.stoploss_price = 0
self.target_price = 0
# Real-time price tracking
self.ltp = None
self.exit_in_progress = False
# Thread control
self.running = True
self.stop_event = threading.Event()
# Instrument for WebSocket
self.instrument = [{"exchange": EXCHANGE, "symbol": SYMBOL}]
# Strategy name
self.strategy_name = f"EMA_{TRADE_DIRECTION}"
# Validate lookback period
if LOOKBACK_DAYS < 1:
print("[WARNING] LOOKBACK_DAYS too small, setting to 1")
self.lookback_days = 1
elif LOOKBACK_DAYS > 30:
print("[WARNING] LOOKBACK_DAYS too large, setting to 30")
self.lookback_days = 30
else:
self.lookback_days = LOOKBACK_DAYS
print("[BOT] OpenAlgo Trading Bot Started")
print(f"[BOT] Direction Mode: {TRADE_DIRECTION}")
print(f"[BOT] Strategy: {FAST_EMA_PERIOD} EMA x {SLOW_EMA_PERIOD} EMA")
print(f"[BOT] Lookback Period: {self.lookback_days} days")
print(f"[BOT] Signal Check Interval: {SIGNAL_CHECK_INTERVAL} seconds")
# ===============================================================================
# WEBSOCKET HANDLER WITH IMMEDIATE EXIT
# ===============================================================================
def on_ltp_update(self, data):
"""Handle real-time LTP updates and place exit orders immediately"""
if data.get("type") == "market_data" and data.get("symbol") == SYMBOL:
self.ltp = float(data["data"]["ltp"])
# Display current status
current_time = datetime.now().strftime("%H:%M:%S")
if self.position and not self.exit_in_progress:
# Calculate real-time P&L
if self.position == "BUY":
unrealized_pnl = (self.ltp - self.entry_price) * QUANTITY
else:
unrealized_pnl = (self.entry_price - self.ltp) * QUANTITY
pnl_sign = "+" if unrealized_pnl > 0 else "-"
print(f"\r[{current_time}] LTP: Rs.{self.ltp:.2f} | "
f"{self.position} @ Rs.{self.entry_price:.2f} | "
f"P&L: {pnl_sign}Rs.{abs(unrealized_pnl):.2f} | "
f"SL: {self.stoploss_price:.2f} | TG: {self.target_price:.2f} ", end="")
# Check and execute exit immediately
exit_reason = None
if self.position == "BUY":
if self.ltp <= self.stoploss_price:
exit_reason = "STOPLOSS HIT"
print(f"\n[ALERT] STOPLOSS HIT! LTP Rs.{self.ltp:.2f} <= SL Rs.{self.stoploss_price:.2f}")
elif self.ltp >= self.target_price:
exit_reason = "TARGET HIT"
print(f"\n[ALERT] TARGET HIT! LTP Rs.{self.ltp:.2f} >= Target Rs.{self.target_price:.2f}")
elif self.position == "SELL":
if self.ltp >= self.stoploss_price:
exit_reason = "STOPLOSS HIT"
print(f"\n[ALERT] STOPLOSS HIT! LTP Rs.{self.ltp:.2f} >= SL Rs.{self.stoploss_price:.2f}")
elif self.ltp <= self.target_price:
exit_reason = "TARGET HIT"
print(f"\n[ALERT] TARGET HIT! LTP Rs.{self.ltp:.2f} <= Target Rs.{self.target_price:.2f}")
# Place exit order immediately if SL/Target hit
if exit_reason and not self.exit_in_progress:
self.exit_in_progress = True
print(f"[EXIT] Placing exit order immediately...")
# Create a new thread for exit to avoid blocking WebSocket
exit_thread = threading.Thread(
target=self.place_exit_order,
args=(exit_reason,)
)
exit_thread.start()
elif not self.position:
print(f"\r[{current_time}] LTP: Rs.{self.ltp:.2f} | No Position | Mode: {TRADE_DIRECTION} | Lookback: {self.lookback_days}d ", end="")
def websocket_thread(self):
"""WebSocket thread for real-time price updates"""
try:
print("[WEBSOCKET] Connecting...")
self.client.connect()
# Subscribe to LTP updates
self.client.subscribe_ltp(self.instrument, on_data_received=self.on_ltp_update)
print(f"[WEBSOCKET] Connected - Monitoring {SYMBOL} in real-time")
# Keep thread alive
while not self.stop_event.is_set():
time.sleep(1)
except Exception as e:
print(f"\n[ERROR] WebSocket error: {e}")
finally:
print("\n[WEBSOCKET] Closing connection...")
try:
self.client.unsubscribe_ltp(self.instrument)
self.client.disconnect()
except:
pass
print("[WEBSOCKET] Connection closed")
# ===============================================================================
# TRADING FUNCTIONS
# ===============================================================================
def get_historical_data(self):
"""Fetch historical candle data with configurable lookback - FIXED VERSION"""
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=self.lookback_days)
print(f"\n[DATA] Fetching {self.lookback_days} days of historical data...")
print(f"[DATA] From: {start_date.strftime('%Y-%m-%d')} To: {end_date.strftime('%Y-%m-%d')}")
data = self.client.history(
symbol=SYMBOL,
exchange=EXCHANGE,
interval=CANDLE_TIMEFRAME,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d")
)
if data is not None and len(data) > 0:
# Handle datetime field properly
if 'datetime' in data.columns:
# Get first and last datetime as strings
first_time = str(data['datetime'].iloc[0])
last_time = str(data['datetime'].iloc[-1])
print(f"[DATA] Received {len(data)} candles from {first_time} to {last_time}")
elif 'date' in data.columns:
first_date = str(data['date'].iloc[0])
last_date = str(data['date'].iloc[-1])
print(f"[DATA] Received {len(data)} candles from {first_date} to {last_date}")
else:
print(f"[DATA] Received {len(data)} candles")
else:
print("[WARNING] No data received from API")
return data
except Exception as e:
print(f"\n[ERROR] Failed to fetch data: {str(e)}")
print(f"[DEBUG] Error type: {type(e).__name__}")
# Try alternative approach without datetime access
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=self.lookback_days)
data = self.client.history(
symbol=SYMBOL,
exchange=EXCHANGE,
interval=CANDLE_TIMEFRAME,
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d")
)
if data is not None and len(data) > 0:
print(f"[DATA] Successfully received {len(data)} candles (alternative method)")
return data
except Exception as e2:
print(f"[ERROR] Alternative fetch also failed: {str(e2)}")
return None
def check_for_signal(self, data):
"""Check for EMA crossover signals with direction filter"""
if data is None:
return None
if len(data) < SLOW_EMA_PERIOD + 2:
print(f"[INFO] Insufficient data. Need at least {SLOW_EMA_PERIOD + 2} candles, have {len(data)}")
return None
try:
# Calculate EMAs
data['fast_ema'] = data['close'].ewm(span=FAST_EMA_PERIOD, adjust=False).mean()
data['slow_ema'] = data['close'].ewm(span=SLOW_EMA_PERIOD, adjust=False).mean()
# Get last two completed candles
prev = data.iloc[-3]
last = data.iloc[-2]
current = data.iloc[-1]
# Display EMA values for debugging
print(f"[DEBUG] Fast EMA: {last['fast_ema']:.2f}, Slow EMA: {last['slow_ema']:.2f}, Close: {current['close']:.2f}")
# Check for BUY signal (Fast EMA crosses above Slow EMA)
if prev['fast_ema'] <= prev['slow_ema'] and last['fast_ema'] > last['slow_ema']:
if TRADE_DIRECTION in ["LONG", "BOTH"]:
print(f"[SIGNAL] BUY - Fast EMA crossed above Slow EMA")
return "BUY"
else:
print(f"[SIGNAL] BUY signal detected but ignored (Mode: {TRADE_DIRECTION})")
return None
# Check for SELL signal (Fast EMA crosses below Slow EMA)
if prev['fast_ema'] >= prev['slow_ema'] and last['fast_ema'] < last['slow_ema']:
if TRADE_DIRECTION in ["SHORT", "BOTH"]:
print(f"[SIGNAL] SELL - Fast EMA crossed below Slow EMA")
return "SELL"
else:
print(f"[SIGNAL] SELL signal detected but ignored (Mode: {TRADE_DIRECTION})")
return None
except Exception as e:
print(f"[ERROR] Error checking signal: {str(e)}")
return None
def get_executed_price(self, order_id):
"""Get actual executed price from order status"""
max_attempts = 5
for attempt in range(max_attempts):
time.sleep(2)
try:
response = self.client.orderstatus(
order_id=order_id,
strategy=self.strategy_name
)
if response.get("status") == "success":
order_data = response.get("data", {})
if order_data.get("order_status") == "complete":
executed_price = float(order_data.get("average_price", 0))
if executed_price > 0:
return executed_price
elif order_data.get("order_status") in ["rejected", "cancelled"]:
print(f"[ERROR] Order {order_data.get('order_status')}")
return None
else:
print(f"[WAITING] Order status: {order_data.get('order_status')}")
except Exception as e:
print(f"[ERROR] Failed to get order status: {e}")
return None
def place_entry_order(self, signal):
"""Place entry order based on direction filter"""
# Double-check direction filter
if signal == "BUY" and TRADE_DIRECTION == "SHORT":
print("[INFO] BUY signal ignored - SHORT only mode")
return False
if signal == "SELL" and TRADE_DIRECTION == "LONG":
print("[INFO] SELL signal ignored - LONG only mode")
return False
print(f"\n[ORDER] Placing {signal} order for {QUANTITY} shares of {SYMBOL}")
try:
response = self.client.placeorder(
strategy=self.strategy_name,
symbol=SYMBOL,
exchange=EXCHANGE,
action=signal,
quantity=QUANTITY,
price_type="MARKET",
product=PRODUCT
)
if response.get("status") == "success":
order_id = response.get("orderid")
print(f"[ORDER] Order placed. ID: {order_id}")
# Get actual executed price
executed_price = self.get_executed_price(order_id)
if executed_price:
self.position = signal
self.entry_price = executed_price
# Set SL and Target
if signal == "BUY":
self.stoploss_price = round(self.entry_price - STOPLOSS, 2)
self.target_price = round(self.entry_price + TARGET, 2)
else: # SELL
self.stoploss_price = round(self.entry_price + STOPLOSS, 2)
self.target_price = round(self.entry_price - TARGET, 2)
print("\n" + "="*60)
print(" TRADE EXECUTED")
print("="*60)
print(f" Direction Mode: {TRADE_DIRECTION}")
print(f" Position: {signal}")
print(f" Entry Price: Rs.{self.entry_price:.2f}")
print(f" Quantity: {QUANTITY}")
print(f" Stoploss: Rs.{self.stoploss_price:.2f}")
print(f" Target: Rs.{self.target_price:.2f}")
print("="*60)
print("\n[INFO] WebSocket monitoring SL/Target in real-time...")
# Reset exit flag after successful entry
self.exit_in_progress = False
return True
else:
print("[ERROR] Could not get executed price")
else:
print(f"[ERROR] Order failed: {response}")
except Exception as e:
print(f"[ERROR] Failed to place order: {e}")
return False
def place_exit_order(self, reason="Manual"):
"""Place exit order - called immediately from WebSocket handler"""
if not self.position:
self.exit_in_progress = False
return
exit_action = "SELL" if self.position == "BUY" else "BUY"
print(f"\n[EXIT] Closing {self.position} position - {reason}")
try:
response = self.client.placeorder(
strategy=self.strategy_name,
symbol=SYMBOL,
exchange=EXCHANGE,
action=exit_action,
quantity=QUANTITY,
price_type="MARKET",
product=PRODUCT
)
if response.get("status") == "success":
order_id = response.get("orderid")
print(f"[EXIT] Exit order placed. ID: {order_id}")
exit_price = self.get_executed_price(order_id)
if exit_price:
# Calculate P&L
if self.position == "BUY":
pnl = (exit_price - self.entry_price) * QUANTITY
else:
pnl = (self.entry_price - exit_price) * QUANTITY
print("\n" + "="*60)
print(" POSITION CLOSED")
print("="*60)
print(f" Reason: {reason}")
print(f" Exit Price: Rs.{exit_price:.2f}")
print(f" Entry Price: Rs.{self.entry_price:.2f}")
print(f" P&L: Rs.{pnl:.2f} [{('PROFIT' if pnl > 0 else 'LOSS')}]")
print("="*60)
else:
print("[WARNING] Exit order placed but could not confirm price")
# Reset position regardless
self.position = None
self.entry_price = 0
self.stoploss_price = 0
self.target_price = 0
self.exit_in_progress = False
else:
print(f"[ERROR] Exit order failed: {response}")
self.exit_in_progress = False # Reset flag to allow retry
except Exception as e:
print(f"[ERROR] Failed to exit: {e}")
self.exit_in_progress = False # Reset flag to allow retry
# ===============================================================================
# STRATEGY THREAD
# ===============================================================================
def strategy_thread(self):
"""Strategy thread for signal generation only (exit handled by WebSocket)"""
print("[STRATEGY] Strategy thread started")
print(f"[STRATEGY] Direction: {TRADE_DIRECTION} trades only")
print(f"[STRATEGY] Checking signals every {SIGNAL_CHECK_INTERVAL} seconds")
print(f"[STRATEGY] Using {self.lookback_days} days of historical data")
# Initial data fetch on startup
initial_data_fetched = False
while not self.stop_event.is_set():
try:
# Only look for entry signals if not in position
if not self.position and not self.exit_in_progress:
data = self.get_historical_data()
if data is not None:
if not initial_data_fetched:
print(f"[STRATEGY] Initial data loaded: {len(data)} candles")
initial_data_fetched = True
signal = self.check_for_signal(data)
if signal:
self.place_entry_order(signal)
else:
if not initial_data_fetched:
print("[WARNING] Waiting for historical data...")
# Check signals at configured interval
time.sleep(SIGNAL_CHECK_INTERVAL)
except Exception as e:
print(f"\n[ERROR] Strategy error: {e}")
time.sleep(10)
# ===============================================================================
# MAIN RUN METHOD
# ===============================================================================
def run(self):
"""Main method to run the bot"""
print("="*60)
print(" EMA CROSSOVER BOT - FIXED VERSION")
print("="*60)
print(f" Symbol: {SYMBOL} | Exchange: {EXCHANGE}")
print(f" Strategy: {FAST_EMA_PERIOD} EMA x {SLOW_EMA_PERIOD} EMA")
print(f" Direction: {TRADE_DIRECTION} trades only")
print(f" Risk: SL Rs.{STOPLOSS} | Target Rs.{TARGET}")
print(f" Timeframe: {CANDLE_TIMEFRAME}")
print(f" Lookback: {self.lookback_days} days")
print(f" Signal Check: Every {SIGNAL_CHECK_INTERVAL} seconds")
print("="*60)
# Display direction mode details
if TRADE_DIRECTION == "LONG":
print(" [MODE] LONG ONLY - Will only take BUY trades")
elif TRADE_DIRECTION == "SHORT":
print(" [MODE] SHORT ONLY - Will only take SELL trades")
else:
print(" [MODE] BOTH - Will take both BUY and SELL trades")
print("="*60)
print("\nPress Ctrl+C to stop the bot\n")
# Start WebSocket thread
ws_thread = threading.Thread(target=self.websocket_thread, daemon=True)
ws_thread.start()
# Give WebSocket time to connect
time.sleep(2)
# Start strategy thread
strat_thread = threading.Thread(target=self.strategy_thread, daemon=True)
strat_thread.start()
try:
# Keep main thread alive
while self.running:
time.sleep(1)
except KeyboardInterrupt:
print("\n\n[SHUTDOWN] Shutting down bot...")
self.running = False
self.stop_event.set()
# Close any open position
if self.position and not self.exit_in_progress:
print("[INFO] Closing open position before shutdown...")
self.place_exit_order("Bot Shutdown")
# Wait for threads to finish
ws_thread.join(timeout=5)
strat_thread.join(timeout=5)
print("[SUCCESS] Bot stopped successfully!")
# ===============================================================================
# START THE BOT
# ===============================================================================
if __name__ == "__main__":
print("\n" + "="*60)
print(" OPENALGO EMA STRATEGY - READY TO RUN")
print("="*60)
print(f" Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Mode: {TRADE_DIRECTION}")
print(f" Lookback: {LOOKBACK_DAYS} days")
print("="*60 + "\n")
# Create and run the bot
bot = ConfigurableEMABot()
bot.run()
Last updated