WebSocketManager¶
The WebSocketManager provides WebSocket connection pooling with built-in features: auto-reconnect, heartbeat, backpressure handling, and subscription management.
Overview¶
Each exchange feed gets its own WebSocketManager instance managing multiple concurrent connections. The manager handles connection lifecycle, automatic reconnection with exponential backoff, and dispatches incoming messages to registered callbacks.
Configuration¶
WebSocketConfig¶
@dataclass
class WebSocketConfig:
url: str # WebSocket endpoint URL
exchange_name: str # Exchange identifier
max_connections: int = 5 # Max concurrent connections
heartbeat_interval: float = 30.0 # Heartbeat interval (seconds)
reconnect_interval: float = 5.0 # Base reconnect interval (seconds)
max_reconnect_attempts: int = 10 # Max reconnection tries
connect_timeout: float = 10.0 # Connection timeout (seconds)
subscription_limit: int = 100 # Max subscriptions per connection
auto_reconnect: bool = True # Enable auto-reconnect
Class Reference¶
WebSocketManager¶
class WebSocketManager:
Methods¶
add_exchange(config: WebSocketConfig) -> WebSocketConnection¶
Add and connect an exchange with the given configuration.
config = WebSocketConfig(
url="wss://stream.binance.com:9443/ws",
exchange_name="BINANCE___SPOT",
max_connections=5,
)
conn = await manager.add_exchange(config)
remove_exchange(exchange_name: str) -> None¶
Disconnect and remove an exchange.
await manager.remove_exchange("BINANCE___SPOT")
subscribe(exchange_name: str, topic: str, symbol: str, callback: Callable) -> str¶
Subscribe to an exchange topic. Returns a subscription ID.
sub_id = await manager.subscribe(
exchange_name="BINANCE___SPOT",
topic="ticker",
symbol="BTCUSDT",
callback=on_ticker,
)
unsubscribe(subscription_id: str) -> None¶
Unsubscribe by subscription ID.
await manager.unsubscribe(sub_id)
is_connected(exchange_name: str) -> bool¶
Check if an exchange is connected.
if manager.is_connected("BINANCE___SPOT"):
...
get_connection(exchange_name: str) -> Optional[WebSocketConnection]¶
Get the connection object for an exchange.
conn = manager.get_connection("BINANCE___SPOT")
close() -> None¶
Close all connections and shut down the manager.
await manager.close()
Connection Lifecycle¶
DISCONNECTED → CONNECTING → CONNECTED → (RECONNECTING) → CLOSED
- CONNECTING: Establishing TCP + TLS handshake
- CONNECTED: WebSocket ready, heartbeats active
- RECONNECTING: Lost connection, attempting reconnect with backoff
- CLOSED: All connections closed, manager shut down
Backpressure Handling¶
When the incoming message rate exceeds processing capacity, the manager applies backpressure by dropping the oldest buffered messages, preventing memory exhaustion.
config = WebSocketConfig(
url="...",
exchange_name="BINANCE___SPOT",
max_connections=5,
# Backpressure is handled automatically
)
中文¶
概述¶
WebSocketManager 提供 WebSocket 连接池,内置自动重连、心跳、背压处理和订阅管理功能。
WebSocketConfig 配置¶
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
url |
str | — | WebSocket 端点 URL |
exchange_name |
str | — | 交易所标识符 |
max_connections |
int | 5 | 最大并发连接数 |
heartbeat_interval |
float | 30.0 | 心跳间隔(秒) |
reconnect_interval |
float | 5.0 | 基础重连间隔(秒) |
max_reconnect_attempts |
int | 10 | 最大重连次数 |
auto_reconnect |
bool | True | 启用自动重连 |
主要方法¶
| 方法 | 说明 |
|---|---|
add_exchange(config) |
添加并连接交易所 |
remove_exchange(name) |
断开并移除交易所 |
subscribe(name, topic, symbol, cb) |
订阅主题 |
unsubscribe(sub_id) |
取消订阅 |
is_connected(name) |
检查连接状态 |
get_connection(name) |
获取连接对象 |
close() |
关闭所有连接 |
使用示例¶
from bt_api_base.websocket_manager import WebSocketManager, WebSocketConfig
config = WebSocketConfig(
url="wss://stream.binance.com:9443/ws",
exchange_name="BINANCE___SPOT",
max_connections=5,
)
manager = WebSocketManager()
await manager.add_exchange(config)
sub_id = await manager.subscribe(
exchange_name="BINANCE___SPOT",
topic="ticker",
symbol="BTCUSDT",
callback=on_ticker,
)