diff --git a/web3.js/module.d.ts b/web3.js/module.d.ts index ba04110fe7..ae543541a5 100644 --- a/web3.js/module.d.ts +++ b/web3.js/module.d.ts @@ -129,6 +129,7 @@ declare module '@solana/web3.js' { signatureResult: SignatureSuccess | TransactionError, context: Context, ) => void; + export type RootChangeCallback = (root: number) => void; export type SignatureSuccess = { Ok: null; @@ -242,6 +243,8 @@ declare module '@solana/web3.js' { callback: SignatureResultCallback, ): number; removeSignatureListener(id: number): Promise; + onRootChange(callback: RootChangeCallback): number; + removeRootChangeListener(id: number): Promise; validatorExit(): Promise; getMinimumBalanceForRentExemption( dataLength: number, diff --git a/web3.js/module.flow.js b/web3.js/module.flow.js index b5c707e809..0baa795a22 100644 --- a/web3.js/module.flow.js +++ b/web3.js/module.flow.js @@ -142,6 +142,7 @@ declare module '@solana/web3.js' { signatureResult: SignatureSuccess | TransactionError, context: Context, ) => void; + declare type RootChangeCallback = (root: number) => void; declare export type SignatureSuccess = {| Ok: null, @@ -255,6 +256,8 @@ declare module '@solana/web3.js' { callback: SignatureResultCallback, ): number; removeSignatureListener(id: number): Promise; + onRootChange(callback: RootChangeCallback): number; + removeRootChangeListener(id: number): Promise; validatorExit(): Promise; getMinimumBalanceForRentExemption( dataLength: number, diff --git a/web3.js/src/connection.js b/web3.js/src/connection.js index 8f10b34f58..65b2830bac 100644 --- a/web3.js/src/connection.js +++ b/web3.js/src/connection.js @@ -372,7 +372,7 @@ const ProgramAccountNotificationResult = struct({ /** * @private */ -const SlotInfo = struct({ +const SlotInfoResult = struct({ parent: 'number', slot: 'number', root: 'number', @@ -383,7 +383,7 @@ const SlotInfo = struct({ */ const SlotNotificationResult = struct({ subscription: 'number', - result: SlotInfo, + result: SlotInfoResult, }); /** @@ -394,6 +394,14 @@ const SignatureNotificationResult = struct({ result: notificationResultAndContext(SignatureStatusResult), }); +/** + * Expected JSON RPC response for the "rootNotification" message + */ +const RootNotificationResult = struct({ + subscription: 'number', + result: 'number', +}); + /** * Expected JSON RPC response for the "getProgramAccounts" message */ @@ -579,6 +587,20 @@ const RequestAirdropRpcResult = jsonRpcResult('string'); */ const SendTransactionRpcResult = jsonRpcResult('string'); +/** + * Information about the latest slot being processed by a node + * + * @typedef {Object} SlotInfo + * @property {number} slot Currently processing slot + * @property {number} parent Parent of the current slot + * @property {number} root The root block of the current slot's fork + */ +type SlotInfo = { + slot: number, + parent: number, + root: number, +}; + /** * Information describing an account * @@ -676,6 +698,19 @@ type SignatureSubscriptionInfo = { subscriptionId: ?SubscriptionId, // null when there's no current server subscription id }; +/** + * Callback function for root change notifications + */ +export type RootChangeCallback = (root: number) => void; + +/** + * @private + */ +type RootSubscriptionInfo = { + callback: RootChangeCallback, + subscriptionId: ?SubscriptionId, // null when there's no current server subscription id +}; + /** * Signature status: Success * @@ -737,6 +772,10 @@ export class Connection { [number]: SignatureSubscriptionInfo, } = {}; _signatureSubscriptionCounter: number = 0; + _rootSubscriptions: { + [number]: RootSubscriptionInfo, + } = {}; + _rootSubscriptionCounter: number = 0; /** * Establish a JSON RPC connection @@ -784,6 +823,10 @@ export class Connection { 'signatureNotification', this._wsOnSignatureNotification.bind(this), ); + this._rpcWebSocket.on( + 'rootNotification', + this._wsOnRootNotification.bind(this), + ); } /** @@ -1448,11 +1491,13 @@ export class Connection { ).map(Number); const slotKeys = Object.keys(this._slotSubscriptions).map(Number); const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number); + const rootKeys = Object.keys(this._rootSubscriptions).map(Number); if ( accountKeys.length === 0 && programKeys.length === 0 && slotKeys.length === 0 && - signatureKeys.length === 0 + signatureKeys.length === 0 && + rootKeys.length === 0 ) { this._rpcWebSocket.close(); return; @@ -1471,6 +1516,9 @@ export class Connection { for (let id of signatureKeys) { this._signatureSubscriptions[id].subscriptionId = null; } + for (let id of rootKeys) { + this._rootSubscriptions[id].subscriptionId = null; + } this._rpcWebSocket.connect(); return; } @@ -1494,6 +1542,11 @@ export class Connection { const sub = this._signatureSubscriptions[id]; this._subscribe(sub, 'signatureSubscribe', [sub.signature]); } + + for (let id of rootKeys) { + const sub = this._rootSubscriptions[id]; + this._subscribe(sub, 'rootSubscribe', []); + } } /** @@ -1759,4 +1812,56 @@ export class Connection { throw new Error(`Unknown signature result id: ${id}`); } } + + /** + * @private + */ + _wsOnRootNotification(notification: Object) { + const res = RootNotificationResult(notification); + if (res.error) { + throw new Error(res.error.message); + } + assert(typeof res.result !== 'undefined'); + const root = res.result; + const keys = Object.keys(this._rootSubscriptions).map(Number); + for (let id of keys) { + const sub = this._rootSubscriptions[id]; + if (sub.subscriptionId === res.subscription) { + sub.callback(root); + return true; + } + } + } + + /** + * Register a callback to be invoked upon root changes + * + * @param callback Function to invoke whenever the root changes + * @return subscription id + */ + onRootChange(callback: RootChangeCallback): number { + const id = ++this._rootSubscriptionCounter; + this._rootSubscriptions[id] = { + callback, + subscriptionId: null, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister a root notification callback + * + * @param id subscription id to deregister + */ + async removeRootChangeListener(id: number): Promise { + if (this._rootSubscriptions[id]) { + const subInfo = this._rootSubscriptions[id]; + delete this._rootSubscriptions[id]; + await this._unsubscribe(subInfo, 'rootUnsubscribe'); + this._updateSubscriptions(); + } else { + throw new Error(`Unknown root change id: ${id}`); + } + } } diff --git a/web3.js/test/connection.test.js b/web3.js/test/connection.test.js index d40231f915..96d41d4ed3 100644 --- a/web3.js/test/connection.test.js +++ b/web3.js/test/connection.test.js @@ -1417,30 +1417,54 @@ test('slot notification', async () => { const connection = new Connection(url, 'recent'); - // let notified = false; + let notified = false; const subscriptionId = connection.onSlotChange(slotInfo => { expect(slotInfo.parent).toBeDefined(); expect(slotInfo.slot).toBeDefined(); expect(slotInfo.root).toBeDefined(); expect(slotInfo.slot).toBeGreaterThan(slotInfo.parent); expect(slotInfo.slot).toBeGreaterThanOrEqual(slotInfo.root); - // notified = true; + notified = true; }); - // - // FIXME: enable this check when slotNotification is live - // - // // Wait for mockCallback to receive a call - // let i = 0; - // while (!notified) { - // //for (;;) { - // if (++i === 30) { - // throw new Error('Slot change notification not observed'); - // } - // // Sleep for a 1/4 of a slot, notifications only occur after a block is - // // processed - // await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND); - // } + // Wait for mockCallback to receive a call + let i = 0; + while (!notified) { + if (++i === 30) { + throw new Error('Slot change notification not observed'); + } + // Sleep for a 1/4 of a slot, notifications only occur after a block is + // processed + await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND); + } await connection.removeSlotChangeListener(subscriptionId); }); + +test('root notification', async () => { + if (mockRpcEnabled) { + console.log('non-live test skipped'); + return; + } + + const connection = new Connection(url, 'recent'); + + let roots = []; + const subscriptionId = connection.onRootChange(root => { + roots.push(root); + }); + + // Wait for mockCallback to receive a call + let i = 0; + while (roots.length < 2) { + if (++i === 30) { + throw new Error('Root change notification not observed'); + } + // Sleep for a 1/4 of a slot, notifications only occur after a block is + // processed + await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND); + } + + expect(roots[1]).toBeGreaterThan(roots[0]); + await connection.removeRootChangeListener(subscriptionId); +});