feat: add account change notifications
This commit is contained in:
@ -1,9 +1,14 @@
|
||||
// @flow
|
||||
|
||||
import assert from 'assert';
|
||||
import {
|
||||
parse as urlParse,
|
||||
format as urlFormat,
|
||||
} from 'url';
|
||||
import fetch from 'node-fetch';
|
||||
import jayson from 'jayson/lib/client/browser';
|
||||
import {struct} from 'superstruct';
|
||||
import {Client as RpcWebSocketClient} from 'rpc-websockets';
|
||||
|
||||
import {Transaction} from './transaction';
|
||||
import {PublicKey} from './publickey';
|
||||
@ -11,6 +16,7 @@ import {sleep} from './util/sleep';
|
||||
import type {Account} from './account';
|
||||
import type {TransactionSignature, TransactionId} from './transaction';
|
||||
|
||||
|
||||
type RpcRequest = (methodName: string, args: Array<any>) => any;
|
||||
|
||||
function createRpcRequest(url): RpcRequest {
|
||||
@ -79,11 +85,10 @@ function jsonRpcResult(resultDescription: any) {
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Expected JSON RPC response for the "getAccountInfo" message
|
||||
* @private
|
||||
*/
|
||||
const GetAccountInfoRpcResult = jsonRpcResult({
|
||||
const AccountInfoResult = struct({
|
||||
executable: 'boolean',
|
||||
loader_program_id: 'array',
|
||||
program_id: 'array',
|
||||
@ -91,6 +96,18 @@ const GetAccountInfoRpcResult = jsonRpcResult({
|
||||
userdata: 'array',
|
||||
});
|
||||
|
||||
/**
|
||||
* Expected JSON RPC response for the "getAccountInfo" message
|
||||
*/
|
||||
const GetAccountInfoRpcResult = jsonRpcResult(AccountInfoResult);
|
||||
|
||||
/***
|
||||
* Expected JSON RPC response for the "accountNotification" message
|
||||
*/
|
||||
const AccountNotificationResult = struct({
|
||||
subscription: 'number',
|
||||
result: AccountInfoResult,
|
||||
});
|
||||
|
||||
/**
|
||||
* Expected JSON RPC response for the "confirmTransaction" message
|
||||
@ -148,6 +165,20 @@ type AccountInfo = {
|
||||
userdata: Buffer,
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback function for account change notifications
|
||||
*/
|
||||
export type AccountChangeCallback = (accountInfo: AccountInfo) => void;
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
type AccountSubscriptionInfo = {
|
||||
publicKey: string; // PublicKey of the account as a base 58 string
|
||||
callback: AccountChangeCallback,
|
||||
subscriptionId: null | number; // null when there's no current server subscription id
|
||||
}
|
||||
|
||||
/**
|
||||
* Possible signature status values
|
||||
*
|
||||
@ -164,6 +195,8 @@ export type SignatureStatus = 'Confirmed'
|
||||
*/
|
||||
export class Connection {
|
||||
_rpcRequest: RpcRequest;
|
||||
_rpcWebSocket: RpcWebSocketClient;
|
||||
_rpcWebSocketConnected: boolean = false;
|
||||
|
||||
_lastIdInfo: {
|
||||
lastId: TransactionId | null,
|
||||
@ -171,6 +204,8 @@ export class Connection {
|
||||
transactionSignatures: Array<string>,
|
||||
};
|
||||
_disableLastIdCaching: boolean = false
|
||||
_accountChangeSubscriptions: {[number]: AccountSubscriptionInfo} = {};
|
||||
_accountChangeSubscriptionCounter: number = 0;
|
||||
|
||||
/**
|
||||
* Establish a JSON RPC connection
|
||||
@ -178,15 +213,29 @@ export class Connection {
|
||||
* @param endpoint URL to the fullnode JSON RPC endpoint
|
||||
*/
|
||||
constructor(endpoint: string) {
|
||||
if (typeof endpoint !== 'string') {
|
||||
throw new Error('Connection endpoint not specified');
|
||||
}
|
||||
this._rpcRequest = createRpcRequest(endpoint);
|
||||
let url = urlParse(endpoint);
|
||||
|
||||
this._rpcRequest = createRpcRequest(url.href);
|
||||
this._lastIdInfo = {
|
||||
lastId: null,
|
||||
seconds: -1,
|
||||
transactionSignatures: [],
|
||||
};
|
||||
|
||||
url.protocol = 'ws';
|
||||
url.host = '';
|
||||
url.port = String(Number(url.port) + 1);
|
||||
this._rpcWebSocket = new RpcWebSocketClient(
|
||||
urlFormat(url),
|
||||
{
|
||||
autoconnect: false,
|
||||
max_reconnects: Infinity,
|
||||
}
|
||||
);
|
||||
this._rpcWebSocket.on('open', this._wsOnOpen.bind(this));
|
||||
this._rpcWebSocket.on('error', this._wsOnError.bind(this));
|
||||
this._rpcWebSocket.on('close', this._wsOnClose.bind(this));
|
||||
this._rpcWebSocket.on('accountNotification', this._wsOnAccountNotification.bind(this));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -371,4 +420,133 @@ export class Connection {
|
||||
assert(res.result);
|
||||
return res.result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
_wsOnOpen() {
|
||||
this._rpcWebSocketConnected = true;
|
||||
this._updateSubscriptions();
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
_wsOnError(err: Error) {
|
||||
console.log('ws error:', err.message);
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
_wsOnClose(code: number, message: string) {
|
||||
// 1000 means _rpcWebSocket.close() was called explicitly
|
||||
if (code !== 1000) {
|
||||
console.log('ws close:', code, message);
|
||||
}
|
||||
this._rpcWebSocketConnected = false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
_wsOnAccountNotification(notification: Object) {
|
||||
const res = AccountNotificationResult(notification);
|
||||
if (res.error) {
|
||||
throw new Error(res.error.message);
|
||||
}
|
||||
|
||||
const keys = Object.keys(this._accountChangeSubscriptions).map(Number);
|
||||
for (let id of keys) {
|
||||
const sub = this._accountChangeSubscriptions[id];
|
||||
if (sub.subscriptionId === res.subscription) {
|
||||
const {result} = res;
|
||||
assert(typeof result !== 'undefined');
|
||||
|
||||
sub.callback({
|
||||
executable: result.executable,
|
||||
tokens: result.tokens,
|
||||
programId: new PublicKey(result.program_id),
|
||||
loaderProgramId: new PublicKey(result.loader_program_id),
|
||||
userdata: Buffer.from(result.userdata),
|
||||
});
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
*/
|
||||
async _updateSubscriptions() {
|
||||
const keys = Object.keys(this._accountChangeSubscriptions).map(Number);
|
||||
if (keys.length === 0) {
|
||||
this._rpcWebSocket.close();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this._rpcWebSocketConnected) {
|
||||
for (let id of keys) {
|
||||
this._accountChangeSubscriptions[id].subscriptionId = null;
|
||||
}
|
||||
this._rpcWebSocket.connect();
|
||||
return;
|
||||
}
|
||||
|
||||
for (let id of keys) {
|
||||
const {subscriptionId, publicKey} = this._accountChangeSubscriptions[id];
|
||||
if (subscriptionId === null) {
|
||||
try {
|
||||
this._accountChangeSubscriptions[id].subscriptionId =
|
||||
await this._rpcWebSocket.call(
|
||||
'accountSubscribe',
|
||||
[publicKey]
|
||||
);
|
||||
} catch (err) {
|
||||
console.log(`accountSubscribe error for ${publicKey}: ${err.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a callback to be invoked whenever the specified account changes
|
||||
*
|
||||
* @param publickey Public key of the account to monitor
|
||||
* @param callback Function to invoke whenever the account is changed
|
||||
* @return subscription id
|
||||
*/
|
||||
onAccountChange(publicKey: PublicKey, callback: AccountChangeCallback): number {
|
||||
const id = ++this._accountChangeSubscriptionCounter;
|
||||
this._accountChangeSubscriptions[id] = {
|
||||
publicKey: publicKey.toBase58(),
|
||||
callback,
|
||||
subscriptionId: null
|
||||
};
|
||||
this._updateSubscriptions();
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deregister an account notification callback
|
||||
*
|
||||
* @param id subscription id to deregister
|
||||
*/
|
||||
async removeAccountChangeListener(id: number): Promise<void> {
|
||||
if (this._accountChangeSubscriptions[id]) {
|
||||
const {subscriptionId} = this._accountChangeSubscriptions[id];
|
||||
delete this._accountChangeSubscriptions[id];
|
||||
if (subscriptionId !== null) {
|
||||
try {
|
||||
await this._rpcWebSocket.call('accountUnsubscribe', [subscriptionId]);
|
||||
} catch (err) {
|
||||
console.log('accountUnsubscribe error:', err.message);
|
||||
}
|
||||
}
|
||||
this._updateSubscriptions();
|
||||
} else {
|
||||
throw new Error(`Unknown account change id: ${id}`);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user