Vizium Realtime Chain Monitor API

RTCM API Documentation and guide

API Definitions

Client

Enums

AddressSubDir
+--------+--------+
| "both" | "BOTH" |
| "from" | "FROM" |
| "to"   | "TO"   |
+--------+--------+
Network (Enum)
+-------+----------------------------------+
| Val   | Def                              |
|-------+----------------------------------|
| eth   | ethereum, chainId=1              |
| arb   | arbitrum, chainId=42161          |
| opt   | optimism, chainId=10             |
| bsc   | binanceSC, chainId=56            |
| rop   | ropsten, chainId=3               |
| ply   | polygon, chainId=137             |
| gno   | gnosis, chainId=100              |
| arbn  | arbitrum_nova, chainId=42170     |
| sep   | sepolia, chainId=11155111        |
| bp0   | BP_ARB_TESTNET, chainId=601574   |
| ava   | avalanche_c_chain, chainId=43114 |
+-------+----------------------------------+

Messages

Address
Address monitoring subscription request, dir indicates monitoring to/from or both directions for the given address
+---------+---------------+------------+
| field   | type          | required   |
|---------+---------------+------------|
| nwk     | string        | True       |
| addr    | string        | True       |
| dir     | AddressSubDir | True       |
| opts    | dict          | False      |
+---------+---------------+------------+

Event
Event monitoring subscription request, addr is the address of the contract and eName is the abi defined event name
+---------+------------+------------+
| field   | type       | required   |
|---------+------------+------------|
| nwk     | Network    | True       |
| eName   | string     | True       |
| addr    | hex string | False      |
| vzLabel | string     | False      |
| opts    | dict       | False      |
+---------+------------+------------+
Additional Constraints:
	both `addr` or `vzLabel` cannot be defined`
	either `addr` or `vzLabel` required`

ServerGet
Get service data based on key
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| dKey    | string | True       |
+---------+--------+------------+

Sub
New Subscription Request
+---------+------------------+------------+
| field   | type             | required   |
|---------+------------------+------------|
| clSubId | integer          | True       |
| data    | [Event, Address] | True       |
+---------+------------------+------------+

USub
Unsubscribe from a specific subscription id
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| subId   | string | True       |
+---------+--------+------------+

Server

Enums

ErrorCodes (Enum)
+----+---------------------------+
| -1 | "UNKNOWN"                 |
|  0 | "SERVER_NOT_READY"        |
|  1 | "INTERNAL_SERVER_ERROR"   |
|  2 | "INVALID_MSG_SPEC"        |
|  3 | "INVALID_SUB_REQ"         |
|  4 | "DUPLICATE_SUB"           |
|  5 | "UNKNOWN_CLIENT_SUB"      |
|  6 | "INVALID_AUTH"            |
|  7 | "INVALID_SERVER_DATA_KEY" |
+----+---------------------------+

Network (Enum)
+-------+----------------------------------+
| Val   | Def                              |
|-------+----------------------------------|
| eth   | ethereum, chainId=1              |
| arb   | arbitrum, chainId=42161          |
| opt   | optimism, chainId=10             |
| bsc   | binanceSC, chainId=56            |
| rop   | ropsten, chainId=3               |
| ply   | polygon, chainId=137             |
| gno   | gnosis, chainId=100              |
| arbn  | arbitrum_nova, chainId=42170     |
| sep   | sepolia, chainId=11155111        |
| bp0   | BP_ARB_TESTNET, chainId=601574   |
| ava   | avalanche_c_chain, chainId=43114 |
+-------+----------------------------------+

ShutdownCodes (Enum)
+----+---------------+
| -1 | "UNKNOWN"     |
|  0 | "UNSCHEDULED" |
|  1 | "RESTARTING"  |
|  2 | "SCHEDULED"   |
+----+---------------+

Messages

Authenticated
Server response indicating connection has authenticated successfully as user
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| user    | string | True       |
| exp     | string | True       |
+---------+--------+------------+

Error
Server response indicating some sort of error state
+---------+------------+------------+
| field   | type       | required   |
|---------+------------+------------|
| code    | ErrorCodes | True       |
| rMsg    | dict       | True       |
| eMsg    | string     | True       |
+---------+------------+------------+

Init
Initial server response providing server's internal client id
+-----------+--------+------------+
| field     | type   | required   |
|-----------+--------+------------|
| cid       | string | True       |
| deployVer | string | True       |
| msgDefVer | string | True       |
+-----------+--------+------------+

Notify
Notify message for a given subscription id, provided in the Subscription confirmation message and actual subscription event data
+--------------+--------------+------------+
| field        | type         | required   |
|--------------+--------------+------------|
| subId        | string       | True       |
| ts           | datetime     | True       |
| bn           | integer      | True       |
| nwk          | Network      | True       |
| txHash       | hex string   | True       |
| data         | [list, dict] | True       |
| nInfo        | dict         | False      |
| extraCntInfo | dict         | False      |
+--------------+--------------+------------+

ServerData
Respond to a service data get message with requested data
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| dKey    | string | True       |
| data    | list   | True       |
+---------+--------+------------+

ShutdownNotice
Alert message indicating the server will be disconnecting
+---------+---------------+------------+
| field   | type          | required   |
|---------+---------------+------------|
| ts      | d        | True       |
| reason  | ShutdownCodes | True       |
| sMsg    | string        | True       |
+---------+---------------+------------+

SubConfirm
Subscription confirmation with unique subscription id which maps to Subscription Notify messages as well as the confirmed subscription settings
+---------+---------+------------+
| field   | type    | required   |
|---------+---------+------------|
| subId   | string  | True       |
| clSubId | integer | True       |
| data    | list    | True       |
| info    | dict    | False      |
+---------+---------+------------+

SubInitState
Special notify message for a given subscription id containing initial state of the subscription
+--------------+--------------+------------+
| field        | type         | required   |
|--------------+--------------+------------|
| subId        | string       | True       |
| ts           | string       | True       |
| bn           | integer      | True       |
| nwk          | Network      | True       |
| txHash       | string       | True       |
| data         | [list, dict] | True       |
| extraCntInfo | dict         | False      |
+--------------+--------------+------------+

SubPendingState
Special notify message for a given subscription id is still itializing
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| subId   | string | True       |
+---------+--------+------------+

USubConfirm
Unsubscribe confirmation message for supplied subscription id and subscription settings
+---------+--------+------------+
| field   | type   | required   |
|---------+--------+------------|
| subId   | string | True       |
| data    | list   | True       |
+---------+--------+------------+

Setting up Access

The RTCM instances are deployed at 162.243.162.191 and require both SSL and Vizium credentials (please ask Stephen to generate new credentials).

The Ethereum and Arbitrum RTCM instances run on ports 4321 and 5331 respectively.

Interacting with the Server

On client connection, the server sends an initial message informing the client of its client id (cid):

{
    "type": "init",
    "cid": "e8011154-b673-11ee-9e52-a65917587bba"
}

The client can request a list of available contracts: {'type': 'serverget', 'dKey': 'availableContracts'}

The server provides a list of defined contracts, where a unique definition is Vizium label, network, and address combination ([VZ_LABEL, network, address]):

{
    "type": "serverdata",
    "dKey": "availableContracts",
    "data": [
        [
            "USDT",
            "eth",
            "0xdAC17F958D2ee523a2206206994597C13D831ec7"
        ],
        [
            "USDC",
            "eth",
            "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
        ],
        [
            "ChainlinkFeed-ETH/USD",
            "eth",
            "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419"
        ],
        [
            "ChainlinkFeed-BTC/USD",
            "eth",
            "0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88c"
        ],
        [
            "ChainlinkFeed-MATIC/USD",
            "eth",
            "0x7bAC85A8a13A4BcD8abb3eB7d6b4d632c5a57676"
        ],
        [
            "ChainlinkFeed-LINK/USD",
            "eth",
            "0x2c1d072e956AFFC0D435Cb7AC38EF18d24d9127c"
        ],
        [
            "HopBridgeL1-ETH",
            "eth",
            "0xb8901acB165ed027E32754E0FFe830802919727f"
        ]
    ]
}

The client can subscribe to new EVM Events (Logs) defined with an address and event name (defined by a contract's ABI) with a subscription request message:

{
    "type": "sub",
    "clSubId": 0,
    "data": {
        "sType": "event",
        "eName": "Transfer",
        "addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "nwk": "eth"
    }
}

The client can also use subscribe to events by using a Vizium label instead of an address:

{
    "type": "sub",
    "clSubId": 0,
    "data": {
        "sType": "event",
        "eName": "Transfer",
        "vzLabel": "USDC",
        "nwk": "eth"
    }
}

The server will confirm a successful subscription by sending a confirm message:

{
    "type": "subconfirm",
    "subId": "a913fdeb-b683-11ee-bf7b-a65917587bba",
    "clSubId": 0,
    "subType": "Event",
    "data": [
        "eth",
        "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "Transfer"
    ],
    "info": {
        "nwk": "eth",
        "addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "vzLabel": "USDC"
    }
}
  • 'data' corresponds to the unique key definition of the requested subscription

  • 'info' provides Vizium metadata information

The returned confirmed subscription id corresponds to the supplied client subscription id and can be used to map future new Event notifications with subscription information.

Events subscriptions are 'stateful' meaning that the server, after confirming the subscription, will send the last the seen requested Event and implies the client will now receive any future events:

{
    "type": "subinitstate",
    "subId": "e80263b3-b673-11ee-aa80-a65917587bba",
    "data": {
        "nwk": "eth",
        "ts": "2024-01-19 02:38:59",
        "bn": 19037976,
        "txHash": "0x0ad186b88ded27ef7754a8544b7571a1123f6b24d0db21e587c758ac18c27a7a",
        "eTxData": {
            "asset": "USDC",
            "from": "0xC475f5f4Df98D11FaFcF1C50f887e7F6902AC27D",
            "to": "0x22dA1eEdeBC60C1b8c3a0c48f5C81BBE2b943dD9",
            "qty": "1000"
        }
    }
}

If the server is still getting the last state for a requested Event subscription, such as whenever the server starts, it will notify the client that the stateful subscription is pending:

{
    "type": "subpendingstate",
    "subId": "08128cff-b6ae-11ee-ab63-a65917587bba"
}

After receiving a the initial state message, the client will begin receiving notification messages on any subscribed Event confirmed in a transaction:

{
    "type": "notify",
    "subId": "c04cbeb8-b676-11ee-b75c-a65917587bba",
    "data": {
        "nwk": "eth",
        "ts": "2024-01-19 02:59:23",
        "bn": 19038077,
        "txHash": "0x14880056532659c5c09461a574e313d51b3526f8ed5e15c8e1ed02a17f47ccde",
        "eTxData": {
            "asset": "USDC",
            "from": "0xce16F69375520ab01377ce7B88f5BA8C48F8D666",
            "to": "0xE743a49F04F2f77eB2D3b753aE3AD599dE8CEA84",
            "qty": "490.504685"
        }
    }
}

There is no guarantee that a Event will be sent to a client only once due to chain reorganizations (reorg).

Any reorg, based on Vizium's respective underlying blockchain node, that changes the canonically included blocks will result in the client receiving duplicated Events that were emitted in the pre-reorg chain but this will obviously be a rare occurrence under normal conditions.

Important Notes

Vizium will only send responses and data with check-summed addresses (https://eips.ethereum.org/EIPS/eip-55). Thus, any confirmation defined with an address will be returned check-summed even if not requested by the client in checksum form. We highly recommend the client only uses check-summed addresses to follow best practices.

Until fully initialized after a restart, the server will respond to server data and Event subscription requests with an error message on which the client should sleep and retry:

{
    "type": "error",
    "code": 0,
    "rMsg": {
        "type": "sub",
        "clSubId": 0,
        "data": {
            "sType": "event",
            "addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
            "eName": "Transfer",
            "opts": {}
        }
    },
    "eMsg": "server still initializing, cannot subscribe to Event ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transfer') (vzLabel='USDC')"
}
{
    "type": "error",
    "code": 0,
    "rMsg": {
        "type": "serverget",
        "dKey": "availableContracts"
    },
    "eMsg": "server still initializing, data for 'availableContracts' not ready"
}

Example Server Responses to Common Incorrect Client Behavior

If a requested Event subscription is not defined in the available contracts or malformed with an invalid address/name/network:

{
    "type": "error",
    "code": 3,
    "rMsg": {
        "type": "sub",
        "clSubId": 0,
        "data": {
            "sType": "event",
            "addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
            "eName": "Transferf",
            "opts": {}
        }
    },
    "eMsg": "subscription type=Event has unsupported args ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transferf') (vzLabel='USDC')"
}

Sending a malformed/invalid message:

{
    "type": "error",
    "code": 0,
    "rMsg": "[{\"ftype\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"addr\": \"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48\", \"eName\": \"Transfer\"}}, {\"ftype\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"addr\": \"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48\", \"eName\": \"Transfer\"}}]",
    "eMsg": "Object missing required field type - at $[0]"
}
{
    "type": "error",
    "code": 0,
    "rMsg": "[{\"type\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"eName\": \"Transfer\", \"addr\": \"\", \"vzLabel\": \"USDC\", \"nwk\": \"eth\"}}, {\"type\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"eName\": \"Transfer\", \"addr\": \"\", \"vzLabel\": \"USDC\", \"nwk\": \"eth\"}}]",
    "eMsg": "both addr or vzLabel cannot be defined- at$[0].data`"
}

Sending a duplicate subscription request:

{
    "type": "error",
    "code": 1,
    "rMsg": {
        "type": "sub",
        "clSubId": 0,
        "data": {
            "sType": "event",
            "addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
            "eName": "Transfer",
            "opts": {}
        }
    },
    "eMsg": "already subscribed to ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transfer')"
}
  • 'clSubId' is existing client subscription id

Sequence Diagram

Sample Code

The following python code demonstrates how to connect to the server and do basic operations. It works for both the RTCM and SM.

You could use the example as follows, assuming you have proper credentials:

python3 ws_client.py -n arb -m rtcm --host wss://api.vizium.xyz --port 5331 --authHost https://api.vizium.xyz -a --authCreds demo tt <VIZIUM_AUTH_PASSWORD>
import aiohttp
import argparse
import asyncio
import enum
import importlib
import json
import logging
import ssl
import traceback as tb
import uuid

import msgspec
import pandas as pd
import websockets
from websockets import exceptions as W_E

class Custom_JSON_Encoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, msgspec.Struct):
            return json.loads(msgspec.json.encode(obj))
        if isinstance(obj, enum.Enum):
            return str(obj)

logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')

def pprint(m=None, jm=None):
    if jm is None:
        pOut = m
    else:
        pOut = f'{m}\n{json.dumps(jm, indent=4, cls=Custom_JSON_Encoder)}'
    logging.info(f'{pOut}')

class BaseWebSocketClient:

    def __init__(self, enc, dec, url, logRM=False, logSM=False, authInfo=None):
        self._encoder = enc
        self._decoder = dec
        self._ready = asyncio.Future()
        self._wsUrl = url
        self._ws = None
        self._subIds = {}
        self._subReqCnt = 0
        self._connectSleepSecs = 3
        self._firstConnectFinished = asyncio.Future()
        self._connectFinished = asyncio.Future()
        self._lRMsgs = logRM
        self._lSMsgs = logSM
        self._auth, self._authHost, self._authPort, self._authCreds = (False, '', '', {}) if authInfo is None else authInfo
        self._shouldRun = False
        self.running = None

    @property
    def connected(self):
        return not self._ws is None

    async def _recv(self, msg):
        raise NotImplementedError()

    async def start(self):
        asyncio.ensure_future(self._connect())
        await self.wait_first_connection()

    async def wait_first_connection(self):
        await self._firstConnectFinished

    async def _wait_ready(self):
        return await self._ready

    async def _connect(self):
        self._shouldRun = True
        self.running = asyncio.Future()
        async def _conn_loop():
            try:
                actualUrl = self._wsUrl
                sslContext = None
                if actualUrl.startswith('wss'):
                    sslContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
                if self._auth:
                    authUrl = f'{self._authHost}:{self._authPort}/authTkn'
                    pprint(f'{self} auth required, trying @ {authUrl}')
                    async with aiohttp.ClientSession() as session:
                        # sslCtx = None
                        authResp = await session.request('POST', authUrl, json=self._authCreds, ssl=sslContext)
                        authD = await authResp.json()
                        authErr = authD.get('error')
                        if authErr:
                            raise RuntimeError(f'{self} auth error {authErr}')
                        else:
                            tkn, exp = authD['token'], authD['exp']
                            pprint(m=f'{self} got auth tkn successfully, expires @ {exp}')
                            actualUrl = f'{actualUrl}/?token={tkn}'
                pprint(f'{self} connecting to {actualUrl}')
                async with websockets.connect(actualUrl, ssl=sslContext) as ws:
                    self._ws = ws
                    self._ready.set_result(True)
                    if not self._firstConnectFinished.done():
                        self._firstConnectFinished.set_result(True)
                        pprint(m=f'{self} first connection complete!')
                    while True:
                        msg = await ws.recv()
                        try:
                            await self._recv(self._decoder.decode(msg))
                        except:
                            tb.print_exc()
                            pprint(m=f'{self} error with msg', jm=json.loads(msg))
            except ConnectionRefusedError as e:
                pprint(m=f'{self} connection refused!')
                self._ready.set_result(False)
            except (W_E.InvalidMessage, W_E.ConnectionClosedError) as we:
                pprint(m=f'{self} websocket protocol exception {we}')
            except (W_E.InvalidStatusCode, ) as we:
                pprint(m=f'{self} server responded with Invalid Status Code: {we}')
                self._ready.set_result(False)
            except Exception as e:
                tb.print_exc()
                self._shouldRun = False
                pprint(m=f'{self} {e}')
                if not self._firstConnectFinished.done():
                    self._firstConnectFinished.set_result(True)

            self._connectFinished.set_result(True)

        isReconnect = False
        while self._shouldRun:
            asyncio.ensure_future(_conn_loop())
            if isReconnect:
                readySuccess = await self._wait_ready()
                if readySuccess:
                    asyncio.ensure_future(self._on_reconnect())
            await self._connectFinished
            if self._shouldRun:
                pprint(f'{self} connect loop finished, sleeping {self._connectSleepSecs} seconds and reconnecting')
                await asyncio.sleep(self._connectSleepSecs)
                isReconnect = True
                self._ready = asyncio.Future()
                self._connectFinished = asyncio.Future()

        self.running.set_result(False)

    async def _on_reconnect():
        pass

    async def _send(self, msg):
        if self._lSMsgs:
            pprint(m=f'{self} sending msg: ', jm=msg)
        await self._ws.send(self._encoder.encode(msg))

class TestWebSocketClient(BaseWebSocketClient):

    _serverdata_get_fut = None
    _pending_sub_futs = {}

    def __init__(self, mode, network, *args, **kwargs):
        self._mode = mode
        self._network = network
        encoder = msgspec.json.Encoder()
        msgspec_defs = importlib.import_module(f'msgspec_defs_{self._mode.upper()}.server')
        decoder = msgspec.json.Decoder(msgspec_defs.Messages)
        super().__init__(encoder, decoder, *args, **kwargs)
        self._subscribed_contract_events = set()
        self._notify_cbs = []

    def add_notify_cb(self, cb):
        self._notify_cbs.append(cb)

    async def _on_reconnect(self):
        match self._mode:
            case 'rtcm':
                for x in self._subscribed_contract_events:
                    await self.subscribe_contract_event(*x)

    async def _recv(self, msg):
        if self._lRMsgs:
            pprint(m=f'{self} recv msg', jm=msg)
        mType = type(msg).__name__.lower()
        match mType:
            case 'init':
                pprint(m=f'{self} init from server {msg}')
            case 'error':
                oMsg = msg.rMsg
                oMsgType = oMsg['type']
                match oMsgType:
                    case 'serverget':
                        self._serverdata_get_fut.set_result((False, msg.eMsg))
                    case 'sub':
                        clSubId = oMsg['clSubId']
                        pSubFut = self._pending_sub_futs[clSubId]
                        pSubFut.set_result((False, msg))
                    case _:
                        pprint(m=f'{self} unhandled error from original client msg type {oMsgType}', jm=msg)
            case 'subpendingstate':
                subId = msg.subId
                pprint(m=f'{self} requested subId={subId} is still intializing and pending')
            case 'subinitstate':
                clSubId, subData = self._subIds[msg.subId]
                pSubFut = self._pending_sub_futs[clSubId]
                pSubFut.set_result((True, msg))
            case 'subconfirm':
                clSubId = msg.clSubId
                pprint(msg)
                self._subIds[msg.subId] = (clSubId, msg.data)
            case 'usubconfirm':
                clSubId, subData = self._subIds[msg.subId]
                del self._subIds[msg.subId]
                pprint(m=f'{self} confirmed unsubsubscribed clSubId={clSubId} subId={msg.subId} {subData}', jm=msg)
            case 'serverdata':
                if not self._serverdata_get_fut.done():
                    self._serverdata_get_fut.set_result((True, msg.data))
            case 'notify':
                clSubId, subData = self._subIds[msg.subId]
                for cb in self._notify_cbs:
                    cb(msg, clSubId, subData)
            case 'shutdownnotice':
                sTs = msg.ts
                pprint(m=f'{self} received SHUTDOWN notice ts={msg.ts} reason={msg.reason} sMsg={msg.sMsg}')

    async def get_supported_rtcm_contracts(self):
        self._serverdata_get_fut = asyncio.Future()
        msg = {'type': 'serverget', 'dKey': 'availableContracts'}
        await self._send(msg)
        await self._serverdata_get_fut
        success, res = self._serverdata_get_fut.result()
        if success:
            pprint(res)
            return res
        pprint(f'{self} failed to get supported contracts with error: "{res}"')
        return None

    async def subscribe_contract_event(self, vzLabel, eName):
        self._subscribed_contract_events.add((vzLabel, eName))
        subRParams = {'sType': 'event', 'eName': eName, 'vzLabel': vzLabel, 'nwk': self._network}
        pFut = await self._send_sub_msg_single(subRParams)
        await pFut
        success, resMsg = pFut.result()
        if success:
            pprint(m=f'{self} successfully subscribed and init state received for Event vzLabel={vzLabel} eName={eName} from {resMsg.ts} bn={resMsg.bn} txHash={resMsg.txHash} with subId={resMsg.subId} data:', jm=resMsg.data)
        else:
            pprint(f'{self} failed to subscribe to Event vzLabel={vzLabel} eName={eName} with error: "{resMsg.eMsg}"')
        return success

    async def get_supported_sm_contracts(self):
        self._serverdata_get_fut = asyncio.Future()
        msg = {'type': 'serverget', 'dKey': 'supportedContracts'}
        await self._send(msg)
        await self._serverdata_get_fut
        success, res = self._serverdata_get_fut.result()
        if success:
            pprint(res)
            return res
        pprint(f'{self} failed to get supported contracts with error: "{res}"')
        return None

    async def subscribe_sm_state(self, nwk, vzLabel):
        subRParams = {'sType': 'state', 'nwk': nwk, 'vzLabel': vzLabel}
        pFut = await self._send_sub_msg_single(subRParams)
        await pFut
        success, resMsg = pFut.result()
        if success:
            pprint(m=f'{self} successfully subscribed and init state received for State Monitoring nwk={nwk} vzLabel={vzLabel} from {resMsg.ts} bn={resMsg.bn} subId={resMsg.subId} data:', jm=resMsg.data)
        else:
            pprint(f'{self} failed to subscribe to State nwk={nwk} vzLabel={vzLabel} with error: "{resMsg.eMsg}"')
        return success

    async def subscribe_sm_position(self, nwk, vzLabel, userAddr):
        subRParams = {'sType': 'position', 'nwk': nwk, 'vzLabel': vzLabel, 'posAddr': userAddr}
        pFut = await self._send_sub_msg_single(subRParams)
        await pFut
        success, resMsg = pFut.result()
        if success:
            pprint(m=f'{self} successfully subscribed and init state received for Position Monitoring nwk={nwk} vzLabel={vzLabel} userAddr={userAddr} from {resMsg.ts} bn={resMsg.bn} subId={resMsg.subId} data:', jm=resMsg.data)
        else:
            pprint(f'{self} failed to subscribe to Position nwk={nwk} vzLabel={vzLabel} userAddr={userAddr} with error: "{resMsg.eMsg}"')
        return success

    async def _send_sub_msg_single(self, sData):
        sId = self._subReqCnt
        self._pending_sub_futs[sId] = asyncio.Future()
        msg = {'type': 'sub', 'clSubId': sId, 'data': sData}
        self._subReqCnt += 1
        await self._send(msg)
        return self._pending_sub_futs[sId]

    async def _unsub_test(self):
        usId = list(self._subIds.keys())[0]
        await self._send_usub_msg(usId)

    async def _send_usub_msg(self, uSubId):
        msg = {'type': 'usub', 'subId': uSubId}
        await self._send(msg)

    def __str__(self):
        return self.__repr__()

    def __repr__(self):
        return f'TestViziumWebSocketClient-{self._network}'

def log_notify(msg, clSubId, subData):
    pprint(m=f'NOTIFICATION {msg.sType} {subData} @ {msg.ts} bn={msg.bn}', jm=msg)

async def gen_client(mode, nwk, host, port, logRMsgs=False, logSMsgs=False, authInfo=None, sslCreds=None):
    if authInfo is None:
        authInfo = (False, '', '', {})
    client = TestWebSocketClient(mode, nwk, f'{host}:{port}', logRM=logRMsgs, logSM=logSMsgs, authInfo=authInfo, sslCreds=sslCreds)
    await client.start()
    return client

async def main():
    parser = argparse.ArgumentParser(description="Sample Vizium API Client")
    parser.add_argument('--host', help='vizium server host', default='ws://localhost')
    parser.add_argument('-m', '--mode', help='connect to either realtime chain monitor or state monitoring service', default='rtcm', choices=['rtcm', 'sm'])
    parser.add_argument('-n', '--network', help='target RTCM EVM network type')
    parser.add_argument('--port', help='vizium server port', default=4321)
    parser.add_argument('--logRMsgs', help='log received messages', action='store_true', default=False)
    parser.add_argument('--logSMsgs', help='log sent messages', action='store_true', default=False)
    parser.add_argument('-l', '--listenType', help='selector for RTCM vizium supported contracts', default="USDC", choices=["USDC", "USDT", "Chainlink"])
    parser.add_argument('-a', '--authenticated', help='indicates server requires authentication', action='store_true', default=False)
    parser.add_argument('--authHost', help='vizium auth server host', default='http://localhost')
    parser.add_argument('--authPort', help='vizium auth server port', default=5174)
    parser.add_argument('--authCreds', help='vizium auth credentials in form of "username org password"', nargs='+', default=None)

    args = parser.parse_args()

    if args.mode == 'rtcm' and args.network is None:
        parser.error(f'network argument must be provided if running in rtcm mode')

    args = parser.parse_args()
    fut = asyncio.Future()

    if args.authCreds is None:
        authCreds = {}
    elif len(args.authCreds) != 3:
        parser.error(f'authCreds must be in form "username org password"')
    else:
        username, org, password = args.authCreds
        authCreds = {'username': username, 'org': org, 'password': password}

    clientAuthInfo = (args.authenticated, args.authHost, args.authPort, authCreds)

    async def test_rtcd1():
        client = await gen_client(args.mode, args.network, args.host, args.port, logRMsgs=args.logRMsgs, logSMsgs=args.logSMsgs, authInfo=clientAuthInfo)
        if client.connected:
            client.add_notify_cb(log_notify)
            cnts = await client.get_supported_rtcm_contracts()
            if not cnts:
                fut.set_result(False)
            else:
                for cnt in cnts:
                    if args.listenType in cnt['vzLabel']:
                        await client.subscribe_contract_event(cnt['vzLabel'], cnt['events'][0])

                # await asyncio.sleep(20)
                # await client._unsub_test()
            await client.running

    async def test_sm1():
        client = await gen_client(args.mode, args.network, args.host, args.port, logRMsgs=args.logRMsgs, logSMsgs=args.logSMsgs, authInfo=clientAuthInfo)
        if client.connected:
            targets = await client.get_supported_sm_contracts()

            testAddr = '0x799fDB02e49c5B77FdDB4B271aDf11bf42DCA586'
            for nwk in ['eth', 'arb']:
                for p in ['AavePoolV3']:
                    await client.subscribe_sm_state(nwk, p)
                    await client.subscribe_sm_position(nwk, p, testAddr)
            await client.running

    match args.mode:
        case 'rtcm':
            await test_rtcd1()
        case 'sm':
            await test_sm1()

if __name__ == '__main__':
    asyncio.run(main())

Last updated