feat: 更新 @riwa/api-types 依赖版本至 0.0.138;重构 WebSocket 逻辑以优化订阅和取消订阅功能;调整订单簿组件以支持新的数据结构

This commit is contained in:
2026-01-14 01:47:30 +07:00
parent c3a8908b8d
commit 513e444ec4
6 changed files with 419 additions and 66 deletions

View File

@@ -205,18 +205,14 @@ export class RWADatafeed extends Datafeeds.UDFCompatibleDatafeed {
console.log("[RWADatafeed]: subscribeBars", { symbol: symbolInfo.name, resolution, subscriberUID });
this.subscribers.set(subscriberUID, { callback: onTick, resolution, symbol: symbolInfo.name });
const wsConnection = tradeWebSocket.getSocket();
wsConnection.send({
action: "subscribe",
channels: [{
name: "bar",
symbol: symbolInfo.name,
resolution,
}],
});
tradeWebSocket.subscribeChannel([{
name: "bar",
symbol: symbolInfo.name,
resolution,
}]);
wsConnection.subscribe((message) => {
tradeWebSocket.subscribe((message) => {
const data = message.data as any;
if (data.type !== "bar")
return;
@@ -238,13 +234,10 @@ export class RWADatafeed extends Datafeeds.UDFCompatibleDatafeed {
unsubscribeBars(subscriberUID: string): void {
console.log("[RWADatafeed]: unsubscribeBars", subscriberUID);
const subscriber = this.subscribers.get(subscriberUID);
const wsConnection = tradeWebSocket.getSocket();
wsConnection.send({
action: "unsubscribe",
channels: [{
name: "bar",
symbol: subscriber?.symbol || "",
}],
});
tradeWebSocket.unsubscribeChannel([{
name: "bar",
symbol: subscriber?.symbol as string,
resolution: subscriber?.resolution as ResolutionString,
}]);
}
}

View File

@@ -1,30 +1,364 @@
import type { Treaty } from "@elysiajs/eden";
import type { MarketDataStreaming } from "@/api/types";
import { client } from "@/api";
interface SubscribeMessage {
action: "subscribe" | "unsubscribe" | "ping";
channels: Array<{
name: "depth" | "trades" | "ticker" | "bar";
symbol: string;
resolution?: string;
[key: string]: any;
}>;
}
type MessageData = any;
interface MessageHandler {
id: string;
handler: (data: MessageData) => void;
}
export class TradeWebSocket {
public socket: MarketDataStreaming | null = null;
private socket: MarketDataStreaming | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 3000;
private heartbeatInterval = 30000; // 30秒心跳
private isManualClose = false;
private connectionPromise: Promise<void> | null = null;
private connectionResolve: (() => void) | null = null;
private messageQueue: SubscribeMessage[] = [];
private messageHandlers: MessageHandler[] = [];
private subscriptions = new Set<string>();
constructor() {
if (!this.socket) {
this.socket = client.api.market_data.streaming.subscribe();
this.connect();
}
/**
* 连接 WebSocket
*/
private connect() {
if (this.socket) {
return this.connectionPromise;
}
this.socket.on("error", () => {
this.socket = null;
// 创建连接 Promise
this.connectionPromise = new Promise((resolve) => {
this.connectionResolve = resolve;
});
try {
this.socket = client.api.market_data.streaming.subscribe();
});
this.socket.on("open", () => {
console.log("TradeWebSocket connected");
this.socket.on("open", () => {
console.log("TradeWebSocket connected");
this.reconnectAttempts = 0;
this.startHeartbeat();
// 连接成功,通知等待的请求
if (this.connectionResolve) {
this.connectionResolve();
this.connectionResolve = null;
}
// 处理消息队列
this.processMessageQueue();
// 重新订阅之前的频道
this.resubscribe();
});
this.socket.on("message", (data) => {
this.handleMessage(data);
});
this.socket.on("error", (error: any) => {
console.error("❌ TradeWebSocket error:", error);
this.handleError();
});
this.socket.on("close", () => {
console.log("🔌 TradeWebSocket closed");
this.stopHeartbeat();
this.socket = null;
this.connectionPromise = null;
if (!this.isManualClose) {
this.reconnect();
}
});
}
catch (error) {
console.error("❌ Failed to create WebSocket:", error);
this.handleError();
}
return this.connectionPromise;
}
/**
* 处理接收到的消息
*/
private handleMessage(data: any) {
// 处理心跳响应
if (data.type === "pong") {
return;
}
// 分发消息给所有订阅者
this.messageHandlers.forEach((handler) => {
try {
handler.handler(data);
}
catch (error) {
console.error("❌ Message handler error:", error);
}
});
}
public getSocket(): MarketDataStreaming {
return this.socket!;
/**
* 错误处理
*/
private handleError() {
this.stopHeartbeat();
this.socket = null;
this.connectionPromise = null;
if (!this.isManualClose) {
this.reconnect();
}
}
/**
* 重连逻辑
*/
private reconnect() {
if (this.reconnectTimer || this.reconnectAttempts >= this.maxReconnectAttempts) {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("❌ Max reconnect attempts reached");
}
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.min(this.reconnectAttempts, 5);
console.log(`🔄 Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})...`);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, delay);
}
/**
* 启动心跳
*/
private startHeartbeat() {
this.stopHeartbeat();
this.heartbeatTimer = setInterval(() => {
if (this.socket && this.isConnected()) {
try {
this.socket.send({ action: "ping" });
}
catch (error) {
console.error("❌ Heartbeat error:", error);
}
}
}, this.heartbeatInterval);
}
/**
* 停止心跳
*/
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 处理消息队列
*/
private processMessageQueue() {
if (!this.isConnected()) {
return;
}
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
this.sendImmediate(message);
}
}
}
/**
* 重新订阅所有频道
*/
private resubscribe() {
if (this.subscriptions.size === 0) {
return;
}
const channels = Array.from(this.subscriptions).map((sub) => {
const [name, symbol] = sub.split(":") as any[];
return { name, symbol };
});
if (channels.length > 0) {
this.send({
action: "subscribe",
channels,
});
}
}
/**
* 检查是否已连接
*/
public isConnected(): boolean {
return this.socket !== null && this.connectionResolve === null;
}
/**
* 等待连接建立
*/
public async waitForConnection(): Promise<void> {
if (this.isConnected()) {
return Promise.resolve();
}
if (!this.connectionPromise) {
this.connect();
}
return this.connectionPromise!;
}
/**
* 发送消息(立即发送)
*/
private sendImmediate(message: SubscribeMessage) {
if (!this.socket) {
console.warn("⚠️ Socket not connected, message queued");
return;
}
try {
this.socket.send(message);
// 记录订阅状态
if (message.action === "subscribe") {
message.channels.forEach((channel) => {
const key = channel.symbol ? `${channel.name}:${channel.symbol}` : channel.name;
this.subscriptions.add(key);
});
}
else if (message.action === "unsubscribe") {
message.channels.forEach((channel) => {
const key = channel.symbol ? `${channel.name}:${channel.symbol}` : channel.name;
this.subscriptions.delete(key);
});
}
}
catch (error) {
console.error("❌ Failed to send message:", error);
throw error;
}
}
/**
* 发送消息(带队列)
*/
public async send(message: SubscribeMessage) {
// 等待连接建立
await this.waitForConnection();
// 如果已连接,立即发送
if (this.isConnected()) {
this.sendImmediate(message);
}
else {
// 否则加入队列
this.messageQueue.push(message);
}
}
/**
* 订阅消息
*/
public subscribe(handler: (data: MessageData) => void): string {
const id = `handler_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
this.messageHandlers.push({ id, handler });
return id;
}
/**
* 取消订阅消息
*/
public unsubscribe(id: string) {
const index = this.messageHandlers.findIndex(h => h.id === id);
if (index !== -1) {
this.messageHandlers.splice(index, 1);
}
}
/**
* 订阅频道
*/
public async subscribeChannel(channels: SubscribeMessage["channels"]) {
await this.send({
action: "subscribe",
channels,
});
}
/**
* 取消订阅频道
*/
public async unsubscribeChannel(channels: SubscribeMessage["channels"]) {
await this.send({
action: "unsubscribe",
channels,
});
}
/**
* 关闭连接
*/
public closeSocket() {
this.isManualClose = true;
this.stopHeartbeat();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.socket) {
this.socket.close();
this.socket = null;
}
this.connectionPromise = null;
this.messageQueue = [];
this.subscriptions.clear();
}
/**
* 重置并重新连接
*/
public reset() {
this.isManualClose = false;
this.reconnectAttempts = 0;
this.closeSocket();
this.connect();
}
}