Vizium Realtime Chain Monitor API
RTCM API Documentation and guide
API Definitions
Client
Enums
AddressSubDir
+--------+--------+
| "both" | "BOTH" |
| "from" | "FROM" |
| "to" | "TO" |
+--------+--------+
Network (Enum)
+-------+----------------------------------+
| Val | Def |
|-------+----------------------------------|
| eth | ethereum, chainId=1 |
| arb | arbitrum, chainId=42161 |
| opt | optimism, chainId=10 |
| bsc | binanceSC, chainId=56 |
| rop | ropsten, chainId=3 |
| ply | polygon, chainId=137 |
| gno | gnosis, chainId=100 |
| arbn | arbitrum_nova, chainId=42170 |
| sep | sepolia, chainId=11155111 |
| bp0 | BP_ARB_TESTNET, chainId=601574 |
| ava | avalanche_c_chain, chainId=43114 |
+-------+----------------------------------+
Messages
Address
Address monitoring subscription request, dir indicates monitoring to/from or both directions for the given address
+---------+---------------+------------+
| field | type | required |
|---------+---------------+------------|
| nwk | string | True |
| addr | string | True |
| dir | AddressSubDir | True |
| opts | dict | False |
+---------+---------------+------------+
Event
Event monitoring subscription request, addr is the address of the contract and eName is the abi defined event name
+---------+------------+------------+
| field | type | required |
|---------+------------+------------|
| nwk | Network | True |
| eName | string | True |
| addr | hex string | False |
| vzLabel | string | False |
| opts | dict | False |
+---------+------------+------------+
Additional Constraints:
both `addr` or `vzLabel` cannot be defined`
either `addr` or `vzLabel` required`
ServerGet
Get service data based on key
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| dKey | string | True |
+---------+--------+------------+
Sub
New Subscription Request
+---------+------------------+------------+
| field | type | required |
|---------+------------------+------------|
| clSubId | integer | True |
| data | [Event, Address] | True |
+---------+------------------+------------+
USub
Unsubscribe from a specific subscription id
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| subId | string | True |
+---------+--------+------------+
Server
Enums
ErrorCodes (Enum)
+----+---------------------------+
| -1 | "UNKNOWN" |
| 0 | "SERVER_NOT_READY" |
| 1 | "INTERNAL_SERVER_ERROR" |
| 2 | "INVALID_MSG_SPEC" |
| 3 | "INVALID_SUB_REQ" |
| 4 | "DUPLICATE_SUB" |
| 5 | "UNKNOWN_CLIENT_SUB" |
| 6 | "INVALID_AUTH" |
| 7 | "INVALID_SERVER_DATA_KEY" |
+----+---------------------------+
Network (Enum)
+-------+----------------------------------+
| Val | Def |
|-------+----------------------------------|
| eth | ethereum, chainId=1 |
| arb | arbitrum, chainId=42161 |
| opt | optimism, chainId=10 |
| bsc | binanceSC, chainId=56 |
| rop | ropsten, chainId=3 |
| ply | polygon, chainId=137 |
| gno | gnosis, chainId=100 |
| arbn | arbitrum_nova, chainId=42170 |
| sep | sepolia, chainId=11155111 |
| bp0 | BP_ARB_TESTNET, chainId=601574 |
| ava | avalanche_c_chain, chainId=43114 |
+-------+----------------------------------+
ShutdownCodes (Enum)
+----+---------------+
| -1 | "UNKNOWN" |
| 0 | "UNSCHEDULED" |
| 1 | "RESTARTING" |
| 2 | "SCHEDULED" |
+----+---------------+
Messages
Authenticated
Server response indicating connection has authenticated successfully as user
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| user | string | True |
| exp | string | True |
+---------+--------+------------+
Error
Server response indicating some sort of error state
+---------+------------+------------+
| field | type | required |
|---------+------------+------------|
| code | ErrorCodes | True |
| rMsg | dict | True |
| eMsg | string | True |
+---------+------------+------------+
Init
Initial server response providing server's internal client id
+-----------+--------+------------+
| field | type | required |
|-----------+--------+------------|
| cid | string | True |
| deployVer | string | True |
| msgDefVer | string | True |
+-----------+--------+------------+
Notify
Notify message for a given subscription id, provided in the Subscription confirmation message and actual subscription event data
+--------------+--------------+------------+
| field | type | required |
|--------------+--------------+------------|
| subId | string | True |
| ts | datetime | True |
| bn | integer | True |
| nwk | Network | True |
| txHash | hex string | True |
| data | [list, dict] | True |
| nInfo | dict | False |
| extraCntInfo | dict | False |
+--------------+--------------+------------+
ServerData
Respond to a service data get message with requested data
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| dKey | string | True |
| data | list | True |
+---------+--------+------------+
ShutdownNotice
Alert message indicating the server will be disconnecting
+---------+---------------+------------+
| field | type | required |
|---------+---------------+------------|
| ts | d | True |
| reason | ShutdownCodes | True |
| sMsg | string | True |
+---------+---------------+------------+
SubConfirm
Subscription confirmation with unique subscription id which maps to Subscription Notify messages as well as the confirmed subscription settings
+---------+---------+------------+
| field | type | required |
|---------+---------+------------|
| subId | string | True |
| clSubId | integer | True |
| data | list | True |
| info | dict | False |
+---------+---------+------------+
SubInitState
Special notify message for a given subscription id containing initial state of the subscription
+--------------+--------------+------------+
| field | type | required |
|--------------+--------------+------------|
| subId | string | True |
| ts | string | True |
| bn | integer | True |
| nwk | Network | True |
| txHash | string | True |
| data | [list, dict] | True |
| extraCntInfo | dict | False |
+--------------+--------------+------------+
SubPendingState
Special notify message for a given subscription id is still itializing
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| subId | string | True |
+---------+--------+------------+
USubConfirm
Unsubscribe confirmation message for supplied subscription id and subscription settings
+---------+--------+------------+
| field | type | required |
|---------+--------+------------|
| subId | string | True |
| data | list | True |
+---------+--------+------------+
Setting up Access
The RTCM instances are deployed at 162.243.162.191 and require both SSL and Vizium credentials (please ask Stephen to generate new credentials).
The Ethereum and Arbitrum RTCM instances run on ports 4321 and 5331 respectively.
Interacting with the Server
On client connection, the server sends an initial message informing the client of its client id (cid):
{
"type": "init",
"cid": "e8011154-b673-11ee-9e52-a65917587bba"
}
The client can request a list of available contracts: {'type': 'serverget', 'dKey': 'availableContracts'}
The server provides a list of defined contracts, where a unique definition is Vizium label, network, and address combination ([VZ_LABEL, network, address]):
{
"type": "serverdata",
"dKey": "availableContracts",
"data": [
[
"USDT",
"eth",
"0xdAC17F958D2ee523a2206206994597C13D831ec7"
],
[
"USDC",
"eth",
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"
],
[
"ChainlinkFeed-ETH/USD",
"eth",
"0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419"
],
[
"ChainlinkFeed-BTC/USD",
"eth",
"0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88c"
],
[
"ChainlinkFeed-MATIC/USD",
"eth",
"0x7bAC85A8a13A4BcD8abb3eB7d6b4d632c5a57676"
],
[
"ChainlinkFeed-LINK/USD",
"eth",
"0x2c1d072e956AFFC0D435Cb7AC38EF18d24d9127c"
],
[
"HopBridgeL1-ETH",
"eth",
"0xb8901acB165ed027E32754E0FFe830802919727f"
]
]
}
The client can subscribe to new EVM Events (Logs) defined with an address and event name (defined by a contract's ABI) with a subscription request message:
{
"type": "sub",
"clSubId": 0,
"data": {
"sType": "event",
"eName": "Transfer",
"addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"nwk": "eth"
}
}
The client can also use subscribe to events by using a Vizium label instead of an address:
{
"type": "sub",
"clSubId": 0,
"data": {
"sType": "event",
"eName": "Transfer",
"vzLabel": "USDC",
"nwk": "eth"
}
}
The server will confirm a successful subscription by sending a confirm message:
{
"type": "subconfirm",
"subId": "a913fdeb-b683-11ee-bf7b-a65917587bba",
"clSubId": 0,
"subType": "Event",
"data": [
"eth",
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"Transfer"
],
"info": {
"nwk": "eth",
"addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"vzLabel": "USDC"
}
}
'data' corresponds to the unique key definition of the requested subscription
'info' provides Vizium metadata information
The returned confirmed subscription id corresponds to the supplied client subscription id and can be used to map future new Event notifications with subscription information.
Events subscriptions are 'stateful' meaning that the server, after confirming the subscription, will send the last the seen requested Event and implies the client will now receive any future events:
{
"type": "subinitstate",
"subId": "e80263b3-b673-11ee-aa80-a65917587bba",
"data": {
"nwk": "eth",
"ts": "2024-01-19 02:38:59",
"bn": 19037976,
"txHash": "0x0ad186b88ded27ef7754a8544b7571a1123f6b24d0db21e587c758ac18c27a7a",
"eTxData": {
"asset": "USDC",
"from": "0xC475f5f4Df98D11FaFcF1C50f887e7F6902AC27D",
"to": "0x22dA1eEdeBC60C1b8c3a0c48f5C81BBE2b943dD9",
"qty": "1000"
}
}
}
If the server is still getting the last state for a requested Event subscription, such as whenever the server starts, it will notify the client that the stateful subscription is pending:
{
"type": "subpendingstate",
"subId": "08128cff-b6ae-11ee-ab63-a65917587bba"
}
After receiving a the initial state message, the client will begin receiving notification messages on any subscribed Event confirmed in a transaction:
{
"type": "notify",
"subId": "c04cbeb8-b676-11ee-b75c-a65917587bba",
"data": {
"nwk": "eth",
"ts": "2024-01-19 02:59:23",
"bn": 19038077,
"txHash": "0x14880056532659c5c09461a574e313d51b3526f8ed5e15c8e1ed02a17f47ccde",
"eTxData": {
"asset": "USDC",
"from": "0xce16F69375520ab01377ce7B88f5BA8C48F8D666",
"to": "0xE743a49F04F2f77eB2D3b753aE3AD599dE8CEA84",
"qty": "490.504685"
}
}
}
There is no guarantee that a Event will be sent to a client only once due to chain reorganizations (reorg).
Any reorg, based on Vizium's respective underlying blockchain node, that changes the canonically included blocks will result in the client receiving duplicated Events that were emitted in the pre-reorg chain but this will obviously be a rare occurrence under normal conditions.
Important Notes
Vizium will only send responses and data with check-summed addresses (https://eips.ethereum.org/EIPS/eip-55). Thus, any confirmation defined with an address will be returned check-summed even if not requested by the client in checksum form. We highly recommend the client only uses check-summed addresses to follow best practices.
Until fully initialized after a restart, the server will respond to server data and Event subscription requests with an error message on which the client should sleep and retry:
{
"type": "error",
"code": 0,
"rMsg": {
"type": "sub",
"clSubId": 0,
"data": {
"sType": "event",
"addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"eName": "Transfer",
"opts": {}
}
},
"eMsg": "server still initializing, cannot subscribe to Event ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transfer') (vzLabel='USDC')"
}
{
"type": "error",
"code": 0,
"rMsg": {
"type": "serverget",
"dKey": "availableContracts"
},
"eMsg": "server still initializing, data for 'availableContracts' not ready"
}
Example Server Responses to Common Incorrect Client Behavior
If a requested Event subscription is not defined in the available contracts or malformed with an invalid address/name/network:
{
"type": "error",
"code": 3,
"rMsg": {
"type": "sub",
"clSubId": 0,
"data": {
"sType": "event",
"addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"eName": "Transferf",
"opts": {}
}
},
"eMsg": "subscription type=Event has unsupported args ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transferf') (vzLabel='USDC')"
}
Sending a malformed/invalid message:
{
"type": "error",
"code": 0,
"rMsg": "[{\"ftype\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"addr\": \"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48\", \"eName\": \"Transfer\"}}, {\"ftype\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"addr\": \"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48\", \"eName\": \"Transfer\"}}]",
"eMsg": "Object missing required field type - at $[0]"
}
{
"type": "error",
"code": 0,
"rMsg": "[{\"type\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"eName\": \"Transfer\", \"addr\": \"\", \"vzLabel\": \"USDC\", \"nwk\": \"eth\"}}, {\"type\": \"sub\", \"clSubId\": 0, \"data\": {\"sType\": \"event\", \"eName\": \"Transfer\", \"addr\": \"\", \"vzLabel\": \"USDC\", \"nwk\": \"eth\"}}]",
"eMsg": "both addr or vzLabel cannot be defined- at$[0].data`"
}
Sending a duplicate subscription request:
{
"type": "error",
"code": 1,
"rMsg": {
"type": "sub",
"clSubId": 0,
"data": {
"sType": "event",
"addr": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"eName": "Transfer",
"opts": {}
}
},
"eMsg": "already subscribed to ('0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48', 'Transfer')"
}
'clSubId' is existing client subscription id
Sequence Diagram
Sample Code
The following python code demonstrates how to connect to the server and do basic operations. It works for both the RTCM and SM.
You could use the example as follows, assuming you have proper credentials:
python3 ws_client.py -n arb -m rtcm --host wss://api.vizium.xyz --port 5331 --authHost https://api.vizium.xyz -a --authCreds demo tt <VIZIUM_AUTH_PASSWORD>
import aiohttp
import argparse
import asyncio
import enum
import importlib
import json
import logging
import ssl
import traceback as tb
import uuid
import msgspec
import pandas as pd
import websockets
from websockets import exceptions as W_E
class Custom_JSON_Encoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, msgspec.Struct):
return json.loads(msgspec.json.encode(obj))
if isinstance(obj, enum.Enum):
return str(obj)
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
def pprint(m=None, jm=None):
if jm is None:
pOut = m
else:
pOut = f'{m}\n{json.dumps(jm, indent=4, cls=Custom_JSON_Encoder)}'
logging.info(f'{pOut}')
class BaseWebSocketClient:
def __init__(self, enc, dec, url, logRM=False, logSM=False, authInfo=None):
self._encoder = enc
self._decoder = dec
self._ready = asyncio.Future()
self._wsUrl = url
self._ws = None
self._subIds = {}
self._subReqCnt = 0
self._connectSleepSecs = 3
self._firstConnectFinished = asyncio.Future()
self._connectFinished = asyncio.Future()
self._lRMsgs = logRM
self._lSMsgs = logSM
self._auth, self._authHost, self._authPort, self._authCreds = (False, '', '', {}) if authInfo is None else authInfo
self._shouldRun = False
self.running = None
@property
def connected(self):
return not self._ws is None
async def _recv(self, msg):
raise NotImplementedError()
async def start(self):
asyncio.ensure_future(self._connect())
await self.wait_first_connection()
async def wait_first_connection(self):
await self._firstConnectFinished
async def _wait_ready(self):
return await self._ready
async def _connect(self):
self._shouldRun = True
self.running = asyncio.Future()
async def _conn_loop():
try:
actualUrl = self._wsUrl
sslContext = None
if actualUrl.startswith('wss'):
sslContext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if self._auth:
authUrl = f'{self._authHost}:{self._authPort}/authTkn'
pprint(f'{self} auth required, trying @ {authUrl}')
async with aiohttp.ClientSession() as session:
# sslCtx = None
authResp = await session.request('POST', authUrl, json=self._authCreds, ssl=sslContext)
authD = await authResp.json()
authErr = authD.get('error')
if authErr:
raise RuntimeError(f'{self} auth error {authErr}')
else:
tkn, exp = authD['token'], authD['exp']
pprint(m=f'{self} got auth tkn successfully, expires @ {exp}')
actualUrl = f'{actualUrl}/?token={tkn}'
pprint(f'{self} connecting to {actualUrl}')
async with websockets.connect(actualUrl, ssl=sslContext) as ws:
self._ws = ws
self._ready.set_result(True)
if not self._firstConnectFinished.done():
self._firstConnectFinished.set_result(True)
pprint(m=f'{self} first connection complete!')
while True:
msg = await ws.recv()
try:
await self._recv(self._decoder.decode(msg))
except:
tb.print_exc()
pprint(m=f'{self} error with msg', jm=json.loads(msg))
except ConnectionRefusedError as e:
pprint(m=f'{self} connection refused!')
self._ready.set_result(False)
except (W_E.InvalidMessage, W_E.ConnectionClosedError) as we:
pprint(m=f'{self} websocket protocol exception {we}')
except (W_E.InvalidStatusCode, ) as we:
pprint(m=f'{self} server responded with Invalid Status Code: {we}')
self._ready.set_result(False)
except Exception as e:
tb.print_exc()
self._shouldRun = False
pprint(m=f'{self} {e}')
if not self._firstConnectFinished.done():
self._firstConnectFinished.set_result(True)
self._connectFinished.set_result(True)
isReconnect = False
while self._shouldRun:
asyncio.ensure_future(_conn_loop())
if isReconnect:
readySuccess = await self._wait_ready()
if readySuccess:
asyncio.ensure_future(self._on_reconnect())
await self._connectFinished
if self._shouldRun:
pprint(f'{self} connect loop finished, sleeping {self._connectSleepSecs} seconds and reconnecting')
await asyncio.sleep(self._connectSleepSecs)
isReconnect = True
self._ready = asyncio.Future()
self._connectFinished = asyncio.Future()
self.running.set_result(False)
async def _on_reconnect():
pass
async def _send(self, msg):
if self._lSMsgs:
pprint(m=f'{self} sending msg: ', jm=msg)
await self._ws.send(self._encoder.encode(msg))
class TestWebSocketClient(BaseWebSocketClient):
_serverdata_get_fut = None
_pending_sub_futs = {}
def __init__(self, mode, network, *args, **kwargs):
self._mode = mode
self._network = network
encoder = msgspec.json.Encoder()
msgspec_defs = importlib.import_module(f'msgspec_defs_{self._mode.upper()}.server')
decoder = msgspec.json.Decoder(msgspec_defs.Messages)
super().__init__(encoder, decoder, *args, **kwargs)
self._subscribed_contract_events = set()
self._notify_cbs = []
def add_notify_cb(self, cb):
self._notify_cbs.append(cb)
async def _on_reconnect(self):
match self._mode:
case 'rtcm':
for x in self._subscribed_contract_events:
await self.subscribe_contract_event(*x)
async def _recv(self, msg):
if self._lRMsgs:
pprint(m=f'{self} recv msg', jm=msg)
mType = type(msg).__name__.lower()
match mType:
case 'init':
pprint(m=f'{self} init from server {msg}')
case 'error':
oMsg = msg.rMsg
oMsgType = oMsg['type']
match oMsgType:
case 'serverget':
self._serverdata_get_fut.set_result((False, msg.eMsg))
case 'sub':
clSubId = oMsg['clSubId']
pSubFut = self._pending_sub_futs[clSubId]
pSubFut.set_result((False, msg))
case _:
pprint(m=f'{self} unhandled error from original client msg type {oMsgType}', jm=msg)
case 'subpendingstate':
subId = msg.subId
pprint(m=f'{self} requested subId={subId} is still intializing and pending')
case 'subinitstate':
clSubId, subData = self._subIds[msg.subId]
pSubFut = self._pending_sub_futs[clSubId]
pSubFut.set_result((True, msg))
case 'subconfirm':
clSubId = msg.clSubId
pprint(msg)
self._subIds[msg.subId] = (clSubId, msg.data)
case 'usubconfirm':
clSubId, subData = self._subIds[msg.subId]
del self._subIds[msg.subId]
pprint(m=f'{self} confirmed unsubsubscribed clSubId={clSubId} subId={msg.subId} {subData}', jm=msg)
case 'serverdata':
if not self._serverdata_get_fut.done():
self._serverdata_get_fut.set_result((True, msg.data))
case 'notify':
clSubId, subData = self._subIds[msg.subId]
for cb in self._notify_cbs:
cb(msg, clSubId, subData)
case 'shutdownnotice':
sTs = msg.ts
pprint(m=f'{self} received SHUTDOWN notice ts={msg.ts} reason={msg.reason} sMsg={msg.sMsg}')
async def get_supported_rtcm_contracts(self):
self._serverdata_get_fut = asyncio.Future()
msg = {'type': 'serverget', 'dKey': 'availableContracts'}
await self._send(msg)
await self._serverdata_get_fut
success, res = self._serverdata_get_fut.result()
if success:
pprint(res)
return res
pprint(f'{self} failed to get supported contracts with error: "{res}"')
return None
async def subscribe_contract_event(self, vzLabel, eName):
self._subscribed_contract_events.add((vzLabel, eName))
subRParams = {'sType': 'event', 'eName': eName, 'vzLabel': vzLabel, 'nwk': self._network}
pFut = await self._send_sub_msg_single(subRParams)
await pFut
success, resMsg = pFut.result()
if success:
pprint(m=f'{self} successfully subscribed and init state received for Event vzLabel={vzLabel} eName={eName} from {resMsg.ts} bn={resMsg.bn} txHash={resMsg.txHash} with subId={resMsg.subId} data:', jm=resMsg.data)
else:
pprint(f'{self} failed to subscribe to Event vzLabel={vzLabel} eName={eName} with error: "{resMsg.eMsg}"')
return success
async def get_supported_sm_contracts(self):
self._serverdata_get_fut = asyncio.Future()
msg = {'type': 'serverget', 'dKey': 'supportedContracts'}
await self._send(msg)
await self._serverdata_get_fut
success, res = self._serverdata_get_fut.result()
if success:
pprint(res)
return res
pprint(f'{self} failed to get supported contracts with error: "{res}"')
return None
async def subscribe_sm_state(self, nwk, vzLabel):
subRParams = {'sType': 'state', 'nwk': nwk, 'vzLabel': vzLabel}
pFut = await self._send_sub_msg_single(subRParams)
await pFut
success, resMsg = pFut.result()
if success:
pprint(m=f'{self} successfully subscribed and init state received for State Monitoring nwk={nwk} vzLabel={vzLabel} from {resMsg.ts} bn={resMsg.bn} subId={resMsg.subId} data:', jm=resMsg.data)
else:
pprint(f'{self} failed to subscribe to State nwk={nwk} vzLabel={vzLabel} with error: "{resMsg.eMsg}"')
return success
async def subscribe_sm_position(self, nwk, vzLabel, userAddr):
subRParams = {'sType': 'position', 'nwk': nwk, 'vzLabel': vzLabel, 'posAddr': userAddr}
pFut = await self._send_sub_msg_single(subRParams)
await pFut
success, resMsg = pFut.result()
if success:
pprint(m=f'{self} successfully subscribed and init state received for Position Monitoring nwk={nwk} vzLabel={vzLabel} userAddr={userAddr} from {resMsg.ts} bn={resMsg.bn} subId={resMsg.subId} data:', jm=resMsg.data)
else:
pprint(f'{self} failed to subscribe to Position nwk={nwk} vzLabel={vzLabel} userAddr={userAddr} with error: "{resMsg.eMsg}"')
return success
async def _send_sub_msg_single(self, sData):
sId = self._subReqCnt
self._pending_sub_futs[sId] = asyncio.Future()
msg = {'type': 'sub', 'clSubId': sId, 'data': sData}
self._subReqCnt += 1
await self._send(msg)
return self._pending_sub_futs[sId]
async def _unsub_test(self):
usId = list(self._subIds.keys())[0]
await self._send_usub_msg(usId)
async def _send_usub_msg(self, uSubId):
msg = {'type': 'usub', 'subId': uSubId}
await self._send(msg)
def __str__(self):
return self.__repr__()
def __repr__(self):
return f'TestViziumWebSocketClient-{self._network}'
def log_notify(msg, clSubId, subData):
pprint(m=f'NOTIFICATION {msg.sType} {subData} @ {msg.ts} bn={msg.bn}', jm=msg)
async def gen_client(mode, nwk, host, port, logRMsgs=False, logSMsgs=False, authInfo=None, sslCreds=None):
if authInfo is None:
authInfo = (False, '', '', {})
client = TestWebSocketClient(mode, nwk, f'{host}:{port}', logRM=logRMsgs, logSM=logSMsgs, authInfo=authInfo, sslCreds=sslCreds)
await client.start()
return client
async def main():
parser = argparse.ArgumentParser(description="Sample Vizium API Client")
parser.add_argument('--host', help='vizium server host', default='ws://localhost')
parser.add_argument('-m', '--mode', help='connect to either realtime chain monitor or state monitoring service', default='rtcm', choices=['rtcm', 'sm'])
parser.add_argument('-n', '--network', help='target RTCM EVM network type')
parser.add_argument('--port', help='vizium server port', default=4321)
parser.add_argument('--logRMsgs', help='log received messages', action='store_true', default=False)
parser.add_argument('--logSMsgs', help='log sent messages', action='store_true', default=False)
parser.add_argument('-l', '--listenType', help='selector for RTCM vizium supported contracts', default="USDC", choices=["USDC", "USDT", "Chainlink"])
parser.add_argument('-a', '--authenticated', help='indicates server requires authentication', action='store_true', default=False)
parser.add_argument('--authHost', help='vizium auth server host', default='http://localhost')
parser.add_argument('--authPort', help='vizium auth server port', default=5174)
parser.add_argument('--authCreds', help='vizium auth credentials in form of "username org password"', nargs='+', default=None)
args = parser.parse_args()
if args.mode == 'rtcm' and args.network is None:
parser.error(f'network argument must be provided if running in rtcm mode')
args = parser.parse_args()
fut = asyncio.Future()
if args.authCreds is None:
authCreds = {}
elif len(args.authCreds) != 3:
parser.error(f'authCreds must be in form "username org password"')
else:
username, org, password = args.authCreds
authCreds = {'username': username, 'org': org, 'password': password}
clientAuthInfo = (args.authenticated, args.authHost, args.authPort, authCreds)
async def test_rtcd1():
client = await gen_client(args.mode, args.network, args.host, args.port, logRMsgs=args.logRMsgs, logSMsgs=args.logSMsgs, authInfo=clientAuthInfo)
if client.connected:
client.add_notify_cb(log_notify)
cnts = await client.get_supported_rtcm_contracts()
if not cnts:
fut.set_result(False)
else:
for cnt in cnts:
if args.listenType in cnt['vzLabel']:
await client.subscribe_contract_event(cnt['vzLabel'], cnt['events'][0])
# await asyncio.sleep(20)
# await client._unsub_test()
await client.running
async def test_sm1():
client = await gen_client(args.mode, args.network, args.host, args.port, logRMsgs=args.logRMsgs, logSMsgs=args.logSMsgs, authInfo=clientAuthInfo)
if client.connected:
targets = await client.get_supported_sm_contracts()
testAddr = '0x799fDB02e49c5B77FdDB4B271aDf11bf42DCA586'
for nwk in ['eth', 'arb']:
for p in ['AavePoolV3']:
await client.subscribe_sm_state(nwk, p)
await client.subscribe_sm_position(nwk, p, testAddr)
await client.running
match args.mode:
case 'rtcm':
await test_rtcd1()
case 'sm':
await test_sm1()
if __name__ == '__main__':
asyncio.run(main())
Last updated