EventBus¶
The EventBus is a pub/sub event dispatcher that supports two operational modes:
- Queue mode (default, backward compatible): Collects events into queues for later consumption
- Callback mode: Directly invokes registered callbacks — essential for CTP SPI / IB EWrapper callback-driven APIs
Overview¶
The EventBus is the backbone of the event-driven architecture in bt_api. All exchange feeds publish events (ticker updates, order fills, errors) through the bus, and consumers subscribe to specific event types.
Error Handling Modes¶
class ErrorHandlerMode(Enum):
LOG = "log" # Log errors and continue
RAISE = "raise" # Raise the first error
COLLECT = "collect" # Collect all errors, return them
Class Reference¶
EventBus¶
class EventBus:
def __init__(
self,
error_mode: ErrorHandlerMode = ErrorHandlerMode.LOG,
queue_class: Type[Queue] = Queue,
):
Methods¶
on(event_type: str, callback: Callable, priority: int = 0) -> None¶
Subscribe a callback to an event type. Higher priority callbacks execute first.
bus.on("order_filled", lambda data: print(f"Order filled: {data}"))
bus.on("tick_update", update_chart, priority=10)
off(event_type: str, callback: Callable) -> None¶
Unsubscribe a callback from an event type.
bus.off("order_filled", my_callback)
emit(event_type: str, data: Any) -> List[Exception]¶
Emit an event with data to all registered callbacks.
errors = bus.emit("order_filled", {"order_id": "123", "filled": 0.5})
clear(event_type: Optional[str] = None) -> None¶
Clear all callbacks for an event type, or all callbacks if no type specified.
bus.clear("order_filled") # Clear specific event
bus.clear() # Clear all events
listener_count(event_type: str) -> int¶
Return the number of listeners for an event type.
count = bus.listener_count("tick_update")
Queue Mode vs Callback Mode¶
Queue mode (default):
bus = EventBus() # Uses Queue internally
bus.on("tick", handler)
Callback mode (for CTP/IB):
bus = EventBus(error_mode=ErrorHandlerMode.CALLBACK)
bus.on("order", ctp_spi.on_order) # Directly calls the callback
中文¶
概述¶
EventBus 是支持两种操作模式的发布/订阅事件分发器:
- Queue 模式(默认,向后兼容):将事件收集到队列供后续消费
- Callback 模式:直接调用注册的回调 — 对于 CTP SPI / IB EWrapper 回调驱动的 API 必不可少
错误处理模式¶
| 模式 | 说明 |
|---|---|
LOG |
记录错误并继续 |
RAISE |
抛出第一个错误 |
COLLECT |
收集所有错误并返回 |
主要方法¶
| 方法 | 说明 |
|---|---|
on(event_type, callback, priority) |
订阅事件回调 |
off(event_type, callback) |
取消订阅 |
emit(event_type, data) |
发布事件 |
clear(event_type) |
清除回调 |
listener_count(event_type) |
获取监听器数量 |
使用示例¶
from bt_api_base.event_bus import EventBus, ErrorHandlerMode
bus = EventBus(error_mode=ErrorHandlerMode.LOG)
bus.on("order_filled", lambda data: print(f"订单成交: {data}"))
errors = bus.emit("order_filled", {"order_id": "123", "filled": 0.5})