Websocket Python

Python Websocket - Data Stream Documentation

The Websocket API is only available for Premium, Business and Enterprise plans. With the Websocket API you can stream: Parsed transactions (per pair or for a wallet), receive new pools/tokens, price updates and more.

This document provides information on how to use the WebSocket and the various room types available for WebSocket communication.

WebSocketService Class

Below is the WebSocketService class that can be used to establish WebSocket connections and manage room subscriptions:

import json
import time
import threading
from typing import Set
import websocket
from pyee import EventEmitter
 
class WebSocketService:
    def __init__(self, ws_url: str):
        self.ws_url = ws_url
        self.socket = None
        self.transaction_socket = None
        self.reconnect_attempts = 0
        self.reconnect_delay = 2.5
        self.reconnect_delay_max = 4.5
        self.randomization_factor = 0.5
        self.emitter = EventEmitter()
        self.subscribed_rooms: Set[str] = set()
        self.transactions: Set[str] = set()
 
        self.connect()
 
    def connect(self):
        if self.socket and self.transaction_socket:
            return
 
        try:
            self.socket = websocket.WebSocketApp(
                self.ws_url,
                on_open=lambda ws: self.on_open(ws, "main"),
                on_close=lambda ws: self.on_close(ws, "main"),
                on_message=self.on_message
            )
            self.transaction_socket = websocket.WebSocketApp(
                self.ws_url,
                on_open=lambda ws: self.on_open(ws, "transaction"),
                on_close=lambda ws: self.on_close(ws, "transaction"),
                on_message=self.on_message
            )
 
            threading.Thread(target=self.socket.run_forever, daemon=True).start()
            threading.Thread(target=self.transaction_socket.run_forever, daemon=True).start()
        except Exception as e:
            print(f"Error connecting to WebSocket: {e}")
            self.reconnect()
 
    def on_open(self, ws, socket_type):
        print(f"Connected to {socket_type} WebSocket server")
        self.reconnect_attempts = 0
        self.resubscribe_to_rooms()
 
    def on_close(self, ws, socket_type):
        print(f"Disconnected from {socket_type} WebSocket server")
        if socket_type == "main":
            self.socket = None
        elif socket_type == "transaction":
            self.transaction_socket = None
        self.reconnect()
 
    def on_message(self, ws, message):
        try:
            message = json.loads(message)
            if message["type"] == "message":
                if message["data"].get("tx") and message["data"]["tx"] in self.transactions:
                    return
                elif message["data"].get("tx"):
                    self.transactions.add(message["data"]["tx"])
                
                if "price:" in message["room"]:
                    self.emitter.emit(f"price-by-token:{message['data']['token']}", message["data"])
                self.emitter.emit(message["room"], message["data"])
        except Exception as e:
            print(f"Error processing message: {e}")
 
    def disconnect(self):
        if self.socket:
            self.socket.close()
            self.socket = None
        if self.transaction_socket:
            self.transaction_socket.close()
            self.transaction_socket = None
        self.subscribed_rooms.clear()
        self.transactions.clear()
 
    def reconnect(self):
        print("Reconnecting to WebSocket server")
        delay = min(
            self.reconnect_delay * (2 ** self.reconnect_attempts),
            self.reconnect_delay_max
        )
        jitter = delay * self.randomization_factor
        reconnect_delay = delay + (jitter * (2 * time.time() % 1 - 0.5))
 
        def delayed_reconnect():
            time.sleep(reconnect_delay)
            self.reconnect_attempts += 1
            self.connect()
 
        threading.Thread(target=delayed_reconnect, daemon=True).start()
 
    def join_room(self, room: str):
        self.subscribed_rooms.add(room)
        socket = self.transaction_socket if "transaction" in room else self.socket
        if socket and socket.sock and socket.sock.connected:
            socket.send(json.dumps({"type": "join", "room": room}))
 
    def leave_room(self, room: str):
        self.subscribed_rooms.discard(room)
        socket = self.transaction_socket if "transaction" in room else self.socket
        if socket and socket.sock and socket.sock.connected:
            socket.send(json.dumps({"type": "leave", "room": room}))
 
    def on(self, room: str, listener):
        self.emitter.on(room, listener)
 
    def off(self, room: str, listener):
        self.emitter.remove_listener(room, listener)
 
    def get_socket(self):
        return self.socket
 
    def resubscribe_to_rooms(self):
        if (self.socket and self.socket.sock and self.socket.sock.connected and
            self.transaction_socket and self.transaction_socket.sock and self.transaction_socket.sock.connected):
            for room in self.subscribed_rooms:
                socket = self.transaction_socket if "transaction" in room else self.socket
                socket.send(json.dumps({"type": "join", "room": room}))
 
# Usage example:
ws_service = WebSocketService("wss://datastream.solanatracker.io/your-datastream-url-here")
ws_service.join_room("price-by-token:GqmEdRD3zGUZdYPeuDeXxCc8Cj1DBmGSYK97TCwSpump")
 
 
def on_price_update(data):
    print(f"Received price update for {data['token']}: {data['price']}")
 
def main():
    # Initialize the WebSocket service
    ws_url = "wss://datastream.solanatracker.io/your-datastream-url-here"
    ws_service = WebSocketService(ws_url)
 
    # Join a room (in this case, for a specific token's price updates)
    token_room = "price-by-token:GqmEdRD3zGUZdYPeuDeXxCc8Cj1DBmGSYK97TCwSpump"
    ws_service.join_room(token_room)
 
    # Register the event listener
    ws_service.on(token_room, on_price_update)
 
    print("Listening for price updates. Press Ctrl+C to exit.")
 
    try:
        # Keep the script running
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("Stopping the WebSocket listener...")
    finally:
        # Clean up
        ws_service.off(token_room, on_price_update)
        ws_service.leave_room(token_room)
        ws_service.disconnect()
 
if __name__ == "__main__":
    main()