From 740b7a3b23d75751dabcdf8148364d3876802a6e Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Mon, 3 Feb 2020 23:22:11 +0800 Subject: [PATCH] feat: add onSignature pub sub api --- web3.js/module.d.ts | 11 ++- web3.js/module.flow.js | 16 +++- web3.js/src/connection.js | 149 +++++++++++++++++++++++++++++++++++--- 3 files changed, 159 insertions(+), 17 deletions(-) diff --git a/web3.js/module.d.ts b/web3.js/module.d.ts index 4c2fae5f28..781039980b 100644 --- a/web3.js/module.d.ts +++ b/web3.js/module.d.ts @@ -114,6 +114,9 @@ declare module '@solana/web3.js' { keyedAccountInfo: KeyedAccountInfo, ) => void; export type SlotChangeCallback = (slotInfo: SlotInfo) => void; + export type SignatureResultCallback = ( + signatureResult: SignatureStatusResult, + ) => void; export type SignatureSuccess = { Ok: null; @@ -213,8 +216,14 @@ declare module '@solana/web3.js' { programId: PublicKey, callback: ProgramAccountChangeCallback, ): number; - onSlotChange(callback: SlotChangeCallback): number; removeProgramAccountChangeListener(id: number): Promise; + onSlotChange(callback: SlotChangeCallback): number; + removeSlotChangeListener(id: number): Promise; + onSignature( + signature: TransactionSignature, + callback: SignatureResultCallback, + ): number; + removeSignatureListener(id: number): Promise; validatorExit(): Promise; getMinimumBalanceForRentExemption( dataLength: number, diff --git a/web3.js/module.flow.js b/web3.js/module.flow.js index 647eb9ff19..6fa03f1168 100644 --- a/web3.js/module.flow.js +++ b/web3.js/module.flow.js @@ -58,8 +58,7 @@ declare module '@solana/web3.js' { declare export type SignatureStatusResult = | SignatureSuccess - | TransactionError - | null; + | TransactionError; declare export type BlockhashAndFeeCalculator = { blockhash: Blockhash, @@ -96,7 +95,7 @@ declare module '@solana/web3.js' { fee: number, preBalances: Array, postBalances: Array, - status: SignatureStatusResult, + status: ?SignatureStatusResult, }, }>, }; @@ -128,6 +127,9 @@ declare module '@solana/web3.js' { keyedAccountInfo: KeyedAccountInfo, ) => void; declare type SlotChangeCallback = (slotInfo: SlotInfo) => void; + declare type SignatureResultCallback = ( + signatureResult: SignatureStatusResult, + ) => void; declare export type SignatureSuccess = {| Ok: null, @@ -227,8 +229,14 @@ declare module '@solana/web3.js' { programId: PublicKey, callback: ProgramAccountChangeCallback, ): number; - onSlotChange(callback: SlotChangeCallback): number; removeProgramAccountChangeListener(id: number): Promise; + onSlotChange(callback: SlotChangeCallback): number; + removeSlotChangeListener(id: number): Promise; + onSignature( + signature: TransactionSignature, + callback: SignatureResultCallback, + ): number; + removeSignatureListener(id: number): Promise; validatorExit(): Promise; getMinimumBalanceForRentExemption( dataLength: number, diff --git a/web3.js/src/connection.js b/web3.js/src/connection.js index 835666d48a..c025c88a67 100644 --- a/web3.js/src/connection.js +++ b/web3.js/src/connection.js @@ -175,6 +175,14 @@ const GetEpochScheduleResult = struct({ firstNormalSlot: 'number', }); +/** + * Signature status for a transaction + */ +const SignatureStatusResult = struct.union([ + struct({Ok: 'null'}), + struct({Err: 'object'}), +]); + /** * Version info for a node * @@ -204,7 +212,7 @@ type ConfirmedBlock = { fee: number, preBalances: Array, postBalances: Array, - status: SignatureStatusResult, + status?: SignatureStatusResult, }, }>, }; @@ -337,7 +345,7 @@ const SlotInfo = struct({ root: 'number', }); -/*** +/** * Expected JSON RPC response for the "slotNotification" message */ const SlotNotificationResult = struct({ @@ -345,6 +353,14 @@ const SlotNotificationResult = struct({ result: SlotInfo, }); +/** + * Expected JSON RPC response for the "signatureNotification" message + */ +const SignatureNotificationResult = struct({ + subscription: 'number', + result: SignatureStatusResult, +}); + /** * Expected JSON RPC response for the "getProgramAccounts" message */ @@ -419,15 +435,12 @@ const GetVoteAccounts = jsonRpcResult( }), ); -const SignatureStatusResult = struct.union([ - 'null', - struct.union([struct({Ok: 'null'}), struct({Err: 'object'})]), -]); - /** * Expected JSON RPC response for the "getSignatureStatus" message */ -const GetSignatureStatusRpcResult = jsonRpcResult(SignatureStatusResult); +const GetSignatureStatusRpcResult = jsonRpcResult( + struct.union(['null', SignatureStatusResult]), +); /** * Expected JSON RPC response for the "getTransactionCount" message @@ -481,7 +494,7 @@ export const GetConfirmedBlockRpcResult = jsonRpcResult( meta: struct.union([ 'null', struct({ - status: SignatureStatusResult, + status: struct.union(['null', SignatureStatusResult]), fee: 'number', preBalances: struct.array(['number']), postBalances: struct.array(['number']), @@ -578,6 +591,11 @@ type ProgramAccountSubscriptionInfo = { subscriptionId: null | number, // null when there's no current server subscription id }; +/** + * Callback function for slot change notifications + */ +export type SlotChangeCallback = (slotInfo: SlotInfo) => void; + /** * @private */ @@ -587,9 +605,20 @@ type SlotSubscriptionInfo = { }; /** - * Callback function for slot change notifications + * Callback function for signature notifications */ -export type SlotChangeCallback = (slotInfo: SlotInfo) => void; +export type SignatureResultCallback = ( + signatureResult: SignatureStatusResult, +) => void; + +/** + * @private + */ +type SignatureSubscriptionInfo = { + signature: TransactionSignature, // TransactionSignature as a base 58 string + callback: SignatureResultCallback, + subscriptionId: null | number, // null when there's no current server subscription id +}; /** * Signature status: Success @@ -650,6 +679,10 @@ export class Connection { [number]: SlotSubscriptionInfo, } = {}; _slotSubscriptionCounter: number = 0; + _signatureSubscriptions: { + [number]: SignatureSubscriptionInfo, + } = {}; + _signatureSubscriptionCounter: number = 0; /** * Establish a JSON RPC connection @@ -693,6 +726,10 @@ export class Connection { 'slotNotification', this._wsOnSlotNotification.bind(this), ); + this._rpcWebSocket.on( + 'signatureNotification', + this._wsOnSignatureNotification.bind(this), + ); } /** @@ -1289,10 +1326,12 @@ export class Connection { this._programAccountChangeSubscriptions, ).map(Number); const slotKeys = Object.keys(this._slotSubscriptions).map(Number); + const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number); if ( accountKeys.length === 0 && programKeys.length === 0 && - slotKeys.length === 0 + slotKeys.length === 0 && + signatureKeys.length === 0 ) { this._rpcWebSocket.close(); return; @@ -1308,6 +1347,9 @@ export class Connection { for (let id of slotKeys) { this._slotSubscriptions[id].subscriptionId = null; } + for (let id of signatureKeys) { + this._signatureSubscriptions[id].subscriptionId = null; + } this._rpcWebSocket.connect(); return; } @@ -1359,6 +1401,21 @@ export class Connection { } } } + for (let id of signatureKeys) { + const subscription = this._signatureSubscriptions[id]; + if (subscription.subscriptionId === null) { + subscription.subscriptionId = -1; // indicates subscribing + try { + const id = await this._rpcWebSocket.call('signatureSubscribe', [ + subscription.signature, + ]); + subscription.subscriptionId = id; + } catch (err) { + subscription.subscriptionId = null; + console.log(`signatureSubscribe error: ${err.message}`); + } + } + } } /** @@ -1577,4 +1634,72 @@ export class Connection { } return args; } + + /** + * @private + */ + _wsOnSignatureNotification(notification: Object) { + const res = SignatureNotificationResult(notification); + if (res.error) { + throw new Error(res.error.message); + } + assert(typeof res.result !== 'undefined'); + + const keys = Object.keys(this._signatureSubscriptions).map(Number); + for (let id of keys) { + const sub = this._signatureSubscriptions[id]; + if (sub.subscriptionId === res.subscription) { + // Signatures subscriptions are auto-removed by the RPC service so + // no need to explicitly send an unsubscribe message + delete this._signatureSubscriptions[id]; + this._updateSubscriptions(); + sub.callback(res.result); + return; + } + } + } + + /** + * Register a callback to be invoked upon signature updates + * + * @param callback Function to invoke on signature notifications + * @return subscription id + */ + onSignature( + signature: TransactionSignature, + callback: SignatureResultCallback, + ): number { + const id = ++this._signatureSubscriptionCounter; + this._signatureSubscriptions[id] = { + signature, + callback, + subscriptionId: null, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister a signature notification callback + * + * @param id subscription id to deregister + */ + async removeSignatureListener(id: number): Promise { + if (this._signatureSubscriptions[id]) { + const {subscriptionId} = this._signatureSubscriptions[id]; + delete this._signatureSubscriptions[id]; + if (subscriptionId !== null) { + try { + await this._rpcWebSocket.call('signatureUnsubscribe', [ + subscriptionId, + ]); + } catch (err) { + console.log('signatureUnsubscribe error:', err.message); + } + } + this._updateSubscriptions(); + } else { + throw new Error(`Unknown signature change id: ${id}`); + } + } }