Ethereum: Problem with websocket output into dataframe with pandas


Here is an article about the issue of WebSocket output to a Pandas DataFrame with Binance:

Problem: Infinite loop of data output to Pandas DataFrame

Ethereum: Problem with websocket output into dataframe with pandas

Now that you have successfully integrated the WebSocket connection to Binance into your script, it is essential to address another common challenge that arises from this integration. The problem lies in the way the data is collected and stored in a Pandas DataFrame.

When using a WebSocket API, such as Binance’s WebSockets, each message received by the client is typically stored as a separate element in the data attribute of an object returned by the WebSocket connection. This can lead to exponential growth of data in the Pandas DataFrame, resulting in an infinite loop of output data.

Why is this happening?

In Binance’s WebSockets API, messages are sent in chunks with a timestamp and the content of the message. When you subscribe to multiple feeds (e.g., Bitcoin price and pair volumes), each feed receives its own separate set of messages. Since the WebSocket connection runs indefinitely, it will continue to receive new messages from each feed, creating an endless loop.

Solution: Handling Infinite Data Output with Pandas

To avoid this infinite data output and prevent your script from running out of memory, you can use several strategies:

1. Use Dask

Dask is a parallel computing library that allows you to scale up the computation of large datasets without having to use a full-fledged cluster. Using Dask, you can break down the massive amount of data into smaller chunks and process them in parallel, reducing memory usage.

import dask.dataframe as dd


Create an empty DataFrame with 1000 rows (a reasonable chunk size)

d = dd.from_pandas(pd.DataFrame({'price': np.random.rand(1000)}), npartitions=10)


Perform calculations on the data in chunks of 100 rows at a time

d.compute()

2. Use numpy Buffer

If you are working with large binary datasets, consider using NumPy’s buffer-based approach to store and manipulate them more efficiently.

import numpy as np

from io import BytesIO


Create an empty list to hold the data (as NumPy buffers)

data = []


Process each chunk of data in a loop

for i in range(1000):


Read 10000 bytes from the WebSocket connection into the buffer

chunk = np.frombuffer(b'chunk_data' * 10, dtype=np.int32).tobytes()


Append the chunk to the list (as NumPy Buffer)

data.append(np.BufferManager(buffer=BytesIO(chunk)))


Combine the buffers into a single DataFrame

df = pd.concat(data)


Now you can perform calculations on the entire dataset using Dask or Pandas

3. Use a streaming data processing library

There are libraries like starlette which provides streaming data processing capabilities for Binance’s WebSockets API.

from web import Starlette, HTTPView

import asyncio

class WebSocketProcessor(HTMLView):

call defined async(self, request):


Get the message from the WebSocket connection

message = await request.json()


Process the message and store it in a DataFrame (using Dask for efficient processing)

df = dd.from_pandas(pd.DataFrame({'content': [message['data']]}), npartitions=10)


Perform computations on the data in parallel using Dask

result = await dask.compute(df).compute()

return web.json_response(result)


Start the server to handle incoming requests

app = web.Application([WebSocketsProcessor])

web.run_app(application, host='0.0.0.0', port=8000)

Conclusion

In conclusion, the issue of infinite data output to a Pandas DataFrame from Binance’s WebSockets API can be addressed by using strategies like Dask or using NumPy buffers for efficient processing and storage.

Hedera Market Signals


Leave a Reply

Your email address will not be published. Required fields are marked *