如何处理来自binance websocket的多个流数据?

Dro*_*id 3 python json algorithmic-trading websocket binance

我正在使用 unicorn_binance_websocket_api 传输 100 个加密货币和 2 个不同时间范围的价格数据,我想处理这些数据以存储不同加密货币相对于其时间范围的收盘价,然后执行我的策略以查看我需要哪个加密货币和时间范围交易

我将分享关于如何为单个加密货币和单个时间范围编写策略的代码

from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import
BinanceWebSocketApiManager
import json, numpy, talib

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")

binance_websocket_api_manager.create_stream('kline_1m', 'btcusdt')


closes =[]

RSI_PERIOD = 14
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30

while True:
received_stream_data_json = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
if received_stream_data_json:
    json_data = json.loads(received_stream_data_json)
    candle_data = json_data.get('data',{})
    candle = candle_data.get('k', {})

    symboll = candle.get('s',{})
    timeframe = candle.get('i',{})
    close_prices = candle.get('c',{})
    open_prices = candle.get('o',{})
    is_candle_closed = candle.get('x',{})

    if is_candle_closed:
        closes.append(float(close_prices))

    if len(closes) > RSI_PERIOD:
        np_closes = numpy.array(closes)
        rsi = talib.RSI(np_closes,RSI_PERIOD)
        
    if (rsi[-1] > RSI_OVERBOUGHT):
        print("SELL")

    elif (rsi[-1] < RSI_OVERSOLD):
        print('BUY')
Run Code Online (Sandbox Code Playgroud)

Che*_*603 7

您只需使用该subscribe_to_stream功能并附加您想要观看的其他频道和市场。我试图通过 python-binance 库手动编写这个代码,但它看起来很粗鲁、老套且低效。所以我发现了你的问题并决定使用这个独角兽库,我不得不说,它非常棒。这是我的解决方案,您不需要使用 asyncio 顺便说一句

   class BinanceWs:

    def __init__(self, channels, markets):
        market = 'btcusdt'
        tf = 'kline_1w'
        self.binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")
        stream = self.binance_websocket_api_manager.create_stream(tf, market)
        self.binance_websocket_api_manager.subscribe_to_stream(stream, channels, markets)

    async def run(self):
        while True:
            received_stream_data_json = self.binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
            if received_stream_data_json:
                json_data = json.loads(received_stream_data_json)
                candle_data = json_data.get('data', {})

                candle = candle_data.get('k', {})
                symbol = candle.get('s', {})
                timeframe = candle.get('i', {})
                close_prices = candle.get('c', {})
                open_prices = candle.get('o', {})
                is_candle_closed = candle.get('x', {})
                print(candle_data)
                # do stuff with data ... 

async def main():
    tasks = []

    channels = ['kline_1m', 'kline_5m', 'kline_15m', 'kline_30m', 'kline_1h', 'kline_12h', 'miniTicker']
    markets = {'btcusdt', 'ethusdt', 'ltcusdt'}

    print(f'Main starting streams ... ')
    kl_socket = BinanceWs(channels=channels, markets=markets)
    task = await kl_socket.run()
    tasks.append(task)
    print(tasks)
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)