diff --git a/web3.js/module.flow.js b/web3.js/module.flow.js index dbf7adf624..2f81acbbc2 100644 --- a/web3.js/module.flow.js +++ b/web3.js/module.flow.js @@ -96,10 +96,17 @@ declare module '@solana/web3.js' { commission: number, }; + declare export type SlotInfo = { + parent: 'number', + slot: 'number', + root: 'number', + }; + declare type AccountChangeCallback = (accountInfo: AccountInfo) => void; declare type ProgramAccountChangeCallback = ( keyedAccountInfo: KeyedAccountInfo, ) => void; + declare type SlotChangeCallback = (slotInfo: SlotInfo) => void; declare export type SignatureSuccess = {| Ok: null, @@ -191,6 +198,7 @@ declare module '@solana/web3.js' { programId: PublicKey, callback: ProgramAccountChangeCallback, ): number; + onSlotChange(callback: SlotChangeCallback): number; removeProgramAccountChangeListener(id: number): Promise; validatorExit(): Promise; } diff --git a/web3.js/src/connection.js b/web3.js/src/connection.js index e8bc79488e..57e4ade906 100644 --- a/web3.js/src/connection.js +++ b/web3.js/src/connection.js @@ -318,6 +318,23 @@ const ProgramAccountNotificationResult = struct({ result: ProgramAccountInfoResult, }); +/** + * @private + */ +const SlotInfo = struct({ + parent: 'number', + slot: 'number', + root: 'number', +}); + +/*** + * Expected JSON RPC response for the "slotNotification" message + */ +const SlotNotificationResult = struct({ + subscription: 'number', + result: SlotInfo, +}); + /** * Expected JSON RPC response for the "getProgramAccounts" message */ @@ -569,6 +586,19 @@ type ProgramAccountSubscriptionInfo = { subscriptionId: null | number, // null when there's no current server subscription id }; +/** + * @private + */ +type SlotSubscriptionInfo = { + callback: SlotChangeCallback, + subscriptionId: null | number, // null when there's no current server subscription id +}; + +/** + * Callback function for slot change notifications + */ +export type SlotChangeCallback = (slotInfo: SlotInfo) => void; + /** * Signature status: Success * @@ -618,6 +648,10 @@ export class Connection { [number]: ProgramAccountSubscriptionInfo, } = {}; _programAccountChangeSubscriptionCounter: number = 0; + _slotSubscriptions: { + [number]: SlotSubscriptionInfo, + } = {}; + _slotSubscriptionCounter: number = 0; /** * Establish a JSON RPC connection @@ -657,6 +691,10 @@ export class Connection { 'programNotification', this._wsOnProgramAccountNotification.bind(this), ); + this._rpcWebSocket.on( + 'slotNotification', + this._wsOnSlotNotification.bind(this), + ); } /** @@ -1256,7 +1294,12 @@ export class Connection { const programKeys = Object.keys( this._programAccountChangeSubscriptions, ).map(Number); - if (accountKeys.length === 0 && programKeys.length === 0) { + const slotKeys = Object.keys(this._slotSubscriptions).map(Number); + if ( + accountKeys.length === 0 && + programKeys.length === 0 && + slotKeys.length === 0 + ) { this._rpcWebSocket.close(); return; } @@ -1268,6 +1311,9 @@ export class Connection { for (let id of programKeys) { this._programAccountChangeSubscriptions[id].subscriptionId = null; } + for (let id of slotKeys) { + this._slotSubscriptions[id].subscriptionId = null; + } this._rpcWebSocket.connect(); return; } @@ -1307,6 +1353,18 @@ export class Connection { } } } + for (let id of slotKeys) { + const {subscriptionId} = this._slotSubscriptions[id]; + if (subscriptionId === null) { + try { + this._slotSubscriptions[ + id + ].subscriptionId = await this._rpcWebSocket.call('slotSubscribe', []); + } catch (err) { + console.log(`slotSubscribe error: ${err.message}`); + } + } + } } /** @@ -1455,6 +1513,69 @@ export class Connection { } } + /** + * @private + */ + _wsOnSlotNotification(notification: Object) { + const res = SlotNotificationResult(notification); + if (res.error) { + throw new Error(res.error.message); + } + assert(typeof res.result !== 'undefined'); + const {parent, slot, root} = res.result; + + const keys = Object.keys(this._slotSubscriptions).map(Number); + for (let id of keys) { + const sub = this._slotSubscriptions[id]; + if (sub.subscriptionId === res.subscription) { + sub.callback({ + parent, + slot, + root, + }); + return true; + } + } + } + + /** + * Register a callback to be invoked upon slot changes + * + * @param callback Function to invoke whenever the slot changes + * @return subscription id + */ + onSlotChange(callback: SlotChangeCallback): number { + const id = ++this._slotSubscriptionCounter; + this._slotSubscriptions[id] = { + callback, + subscriptionId: id, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister a slot notification callback + * + * @param id subscription id to deregister + */ + async removeSlotChangeListener(id: number): Promise { + if (this._slotSubscriptions[id]) { + const {subscriptionId} = this._slotSubscriptions[id]; + delete this._slotSubscriptions[id]; + if (subscriptionId !== null) { + try { + await this._rpcWebSocket.call('slotUnsubscribe', [subscriptionId]); + } catch (err) { + console.log('slotUnsubscribe error:', err.message); + } + } + this._updateSubscriptions(); + } else { + throw new Error(`Unknown slot change id: ${id}`); + } + } + _argsWithCommitment(args: Array, override: ?Commitment): Array { const commitment = override || this._commitment; if (commitment) { diff --git a/web3.js/test/connection.test.js b/web3.js/test/connection.test.js index 1728f53d68..f562793bd8 100644 --- a/web3.js/test/connection.test.js +++ b/web3.js/test/connection.test.js @@ -931,3 +931,39 @@ test('program account change notification', async () => { await connection.removeProgramAccountChangeListener(subscriptionId); }); + +test('slot notification', async () => { + if (mockRpcEnabled) { + console.log('non-live test skipped'); + return; + } + + const connection = new Connection(url, 'recent'); + + // let notified = false; + const subscriptionId = connection.onSlotChange(slotInfo => { + expect(slotInfo.parent).toBeDefined(); + expect(slotInfo.slot).toBeDefined(); + expect(slotInfo.root).toBeDefined(); + expect(slotInfo.slot).toBeGreaterThan(slotInfo.parent); + expect(slotInfo.slot).toBeGreaterThanOrEqual(slotInfo.root); + // notified = true; + }); + + // + // FIXME: enable this check when slotNotification is live + // + // // Wait for mockCallback to receive a call + // let i = 0; + // while (!notified) { + // //for (;;) { + // if (++i === 30) { + // throw new Error('Slot change notification not observed'); + // } + // // Sleep for a 1/4 of a slot, notifications only occur after a block is + // // processed + // await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND); + // } + + await connection.removeSlotChangeListener(subscriptionId); +});