Python Websocket - Data Stream Documentation
The Websocket API is only available for Premium, Business and Enterprise plans. With the Websocket API you can stream: Parsed transactions (per pair or for a wallet), receive new pools/tokens, price updates and more.
This document provides information on how to use the WebSocket
and the various room types available for WebSocket communication.
WebSocketService Class
Below is the WebSocketService
class that can be used to establish WebSocket connections and manage room subscriptions:
import json
import time
import threading
from typing import Set
import websocket
from pyee import EventEmitter
class WebSocketService:
def __init__(self, ws_url: str):
self.ws_url = ws_url
self.socket = None
self.transaction_socket = None
self.reconnect_attempts = 0
self.reconnect_delay = 2.5
self.reconnect_delay_max = 4.5
self.randomization_factor = 0.5
self.emitter = EventEmitter()
self.subscribed_rooms: Set[str] = set()
self.transactions: Set[str] = set()
self.connect()
def connect(self):
if self.socket and self.transaction_socket:
return
try:
self.socket = websocket.WebSocketApp(
self.ws_url,
on_open=lambda ws: self.on_open(ws, "main"),
on_close=lambda ws: self.on_close(ws, "main"),
on_message=self.on_message
)
self.transaction_socket = websocket.WebSocketApp(
self.ws_url,
on_open=lambda ws: self.on_open(ws, "transaction"),
on_close=lambda ws: self.on_close(ws, "transaction"),
on_message=self.on_message
)
threading.Thread(target=self.socket.run_forever, daemon=True).start()
threading.Thread(target=self.transaction_socket.run_forever, daemon=True).start()
except Exception as e:
print(f"Error connecting to WebSocket: {e}")
self.reconnect()
def on_open(self, ws, socket_type):
print(f"Connected to {socket_type} WebSocket server")
self.reconnect_attempts = 0
self.resubscribe_to_rooms()
def on_close(self, ws, socket_type):
print(f"Disconnected from {socket_type} WebSocket server")
if socket_type == "main":
self.socket = None
elif socket_type == "transaction":
self.transaction_socket = None
self.reconnect()
def on_message(self, ws, message):
try:
message = json.loads(message)
if message["type"] == "message":
if message["data"].get("tx") and message["data"]["tx"] in self.transactions:
return
elif message["data"].get("tx"):
self.transactions.add(message["data"]["tx"])
if "price:" in message["room"]:
self.emitter.emit(f"price-by-token:{message['data']['token']}", message["data"])
self.emitter.emit(message["room"], message["data"])
except Exception as e:
print(f"Error processing message: {e}")
def disconnect(self):
if self.socket:
self.socket.close()
self.socket = None
if self.transaction_socket:
self.transaction_socket.close()
self.transaction_socket = None
self.subscribed_rooms.clear()
self.transactions.clear()
def reconnect(self):
print("Reconnecting to WebSocket server")
delay = min(
self.reconnect_delay * (2 ** self.reconnect_attempts),
self.reconnect_delay_max
)
jitter = delay * self.randomization_factor
reconnect_delay = delay + (jitter * (2 * time.time() % 1 - 0.5))
def delayed_reconnect():
time.sleep(reconnect_delay)
self.reconnect_attempts += 1
self.connect()
threading.Thread(target=delayed_reconnect, daemon=True).start()
def join_room(self, room: str):
self.subscribed_rooms.add(room)
socket = self.transaction_socket if "transaction" in room else self.socket
if socket and socket.sock and socket.sock.connected:
socket.send(json.dumps({"type": "join", "room": room}))
def leave_room(self, room: str):
self.subscribed_rooms.discard(room)
socket = self.transaction_socket if "transaction" in room else self.socket
if socket and socket.sock and socket.sock.connected:
socket.send(json.dumps({"type": "leave", "room": room}))
def on(self, room: str, listener):
self.emitter.on(room, listener)
def off(self, room: str, listener):
self.emitter.remove_listener(room, listener)
def get_socket(self):
return self.socket
def resubscribe_to_rooms(self):
if (self.socket and self.socket.sock and self.socket.sock.connected and
self.transaction_socket and self.transaction_socket.sock and self.transaction_socket.sock.connected):
for room in self.subscribed_rooms:
socket = self.transaction_socket if "transaction" in room else self.socket
socket.send(json.dumps({"type": "join", "room": room}))
# Usage example:
ws_service = WebSocketService("wss://datastream.solanatracker.io/your-datastream-url-here")
ws_service.join_room("price-by-token:GqmEdRD3zGUZdYPeuDeXxCc8Cj1DBmGSYK97TCwSpump")
def on_price_update(data):
print(f"Received price update for {data['token']}: {data['price']}")
def main():
# Initialize the WebSocket service
ws_url = "wss://datastream.solanatracker.io/your-datastream-url-here"
ws_service = WebSocketService(ws_url)
# Join a room (in this case, for a specific token's price updates)
token_room = "price-by-token:GqmEdRD3zGUZdYPeuDeXxCc8Cj1DBmGSYK97TCwSpump"
ws_service.join_room(token_room)
# Register the event listener
ws_service.on(token_room, on_price_update)
print("Listening for price updates. Press Ctrl+C to exit.")
try:
# Keep the script running
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping the WebSocket listener...")
finally:
# Clean up
ws_service.off(token_room, on_price_update)
ws_service.leave_room(token_room)
ws_service.disconnect()
if __name__ == "__main__":
main()