feat: add root subscription api

This commit is contained in:
Justin Starry
2020-03-27 22:22:53 +08:00
committed by Michael Vines
parent 3379e8cd46
commit a2c6c991b8
4 changed files with 154 additions and 19 deletions

3
web3.js/module.d.ts vendored
View File

@ -129,6 +129,7 @@ declare module '@solana/web3.js' {
signatureResult: SignatureSuccess | TransactionError, signatureResult: SignatureSuccess | TransactionError,
context: Context, context: Context,
) => void; ) => void;
export type RootChangeCallback = (root: number) => void;
export type SignatureSuccess = { export type SignatureSuccess = {
Ok: null; Ok: null;
@ -242,6 +243,8 @@ declare module '@solana/web3.js' {
callback: SignatureResultCallback, callback: SignatureResultCallback,
): number; ): number;
removeSignatureListener(id: number): Promise<void>; removeSignatureListener(id: number): Promise<void>;
onRootChange(callback: RootChangeCallback): number;
removeRootChangeListener(id: number): Promise<void>;
validatorExit(): Promise<boolean>; validatorExit(): Promise<boolean>;
getMinimumBalanceForRentExemption( getMinimumBalanceForRentExemption(
dataLength: number, dataLength: number,

View File

@ -142,6 +142,7 @@ declare module '@solana/web3.js' {
signatureResult: SignatureSuccess | TransactionError, signatureResult: SignatureSuccess | TransactionError,
context: Context, context: Context,
) => void; ) => void;
declare type RootChangeCallback = (root: number) => void;
declare export type SignatureSuccess = {| declare export type SignatureSuccess = {|
Ok: null, Ok: null,
@ -255,6 +256,8 @@ declare module '@solana/web3.js' {
callback: SignatureResultCallback, callback: SignatureResultCallback,
): number; ): number;
removeSignatureListener(id: number): Promise<void>; removeSignatureListener(id: number): Promise<void>;
onRootChange(callback: RootChangeCallback): number;
removeRootChangeListener(id: number): Promise<void>;
validatorExit(): Promise<boolean>; validatorExit(): Promise<boolean>;
getMinimumBalanceForRentExemption( getMinimumBalanceForRentExemption(
dataLength: number, dataLength: number,

View File

@ -372,7 +372,7 @@ const ProgramAccountNotificationResult = struct({
/** /**
* @private * @private
*/ */
const SlotInfo = struct({ const SlotInfoResult = struct({
parent: 'number', parent: 'number',
slot: 'number', slot: 'number',
root: 'number', root: 'number',
@ -383,7 +383,7 @@ const SlotInfo = struct({
*/ */
const SlotNotificationResult = struct({ const SlotNotificationResult = struct({
subscription: 'number', subscription: 'number',
result: SlotInfo, result: SlotInfoResult,
}); });
/** /**
@ -394,6 +394,14 @@ const SignatureNotificationResult = struct({
result: notificationResultAndContext(SignatureStatusResult), result: notificationResultAndContext(SignatureStatusResult),
}); });
/**
* Expected JSON RPC response for the "rootNotification" message
*/
const RootNotificationResult = struct({
subscription: 'number',
result: 'number',
});
/** /**
* Expected JSON RPC response for the "getProgramAccounts" message * Expected JSON RPC response for the "getProgramAccounts" message
*/ */
@ -579,6 +587,20 @@ const RequestAirdropRpcResult = jsonRpcResult('string');
*/ */
const SendTransactionRpcResult = jsonRpcResult('string'); const SendTransactionRpcResult = jsonRpcResult('string');
/**
* Information about the latest slot being processed by a node
*
* @typedef {Object} SlotInfo
* @property {number} slot Currently processing slot
* @property {number} parent Parent of the current slot
* @property {number} root The root block of the current slot's fork
*/
type SlotInfo = {
slot: number,
parent: number,
root: number,
};
/** /**
* Information describing an account * Information describing an account
* *
@ -676,6 +698,19 @@ type SignatureSubscriptionInfo = {
subscriptionId: ?SubscriptionId, // null when there's no current server subscription id subscriptionId: ?SubscriptionId, // null when there's no current server subscription id
}; };
/**
* Callback function for root change notifications
*/
export type RootChangeCallback = (root: number) => void;
/**
* @private
*/
type RootSubscriptionInfo = {
callback: RootChangeCallback,
subscriptionId: ?SubscriptionId, // null when there's no current server subscription id
};
/** /**
* Signature status: Success * Signature status: Success
* *
@ -737,6 +772,10 @@ export class Connection {
[number]: SignatureSubscriptionInfo, [number]: SignatureSubscriptionInfo,
} = {}; } = {};
_signatureSubscriptionCounter: number = 0; _signatureSubscriptionCounter: number = 0;
_rootSubscriptions: {
[number]: RootSubscriptionInfo,
} = {};
_rootSubscriptionCounter: number = 0;
/** /**
* Establish a JSON RPC connection * Establish a JSON RPC connection
@ -784,6 +823,10 @@ export class Connection {
'signatureNotification', 'signatureNotification',
this._wsOnSignatureNotification.bind(this), this._wsOnSignatureNotification.bind(this),
); );
this._rpcWebSocket.on(
'rootNotification',
this._wsOnRootNotification.bind(this),
);
} }
/** /**
@ -1448,11 +1491,13 @@ export class Connection {
).map(Number); ).map(Number);
const slotKeys = Object.keys(this._slotSubscriptions).map(Number); const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number); const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number);
const rootKeys = Object.keys(this._rootSubscriptions).map(Number);
if ( if (
accountKeys.length === 0 && accountKeys.length === 0 &&
programKeys.length === 0 && programKeys.length === 0 &&
slotKeys.length === 0 && slotKeys.length === 0 &&
signatureKeys.length === 0 signatureKeys.length === 0 &&
rootKeys.length === 0
) { ) {
this._rpcWebSocket.close(); this._rpcWebSocket.close();
return; return;
@ -1471,6 +1516,9 @@ export class Connection {
for (let id of signatureKeys) { for (let id of signatureKeys) {
this._signatureSubscriptions[id].subscriptionId = null; this._signatureSubscriptions[id].subscriptionId = null;
} }
for (let id of rootKeys) {
this._rootSubscriptions[id].subscriptionId = null;
}
this._rpcWebSocket.connect(); this._rpcWebSocket.connect();
return; return;
} }
@ -1494,6 +1542,11 @@ export class Connection {
const sub = this._signatureSubscriptions[id]; const sub = this._signatureSubscriptions[id];
this._subscribe(sub, 'signatureSubscribe', [sub.signature]); this._subscribe(sub, 'signatureSubscribe', [sub.signature]);
} }
for (let id of rootKeys) {
const sub = this._rootSubscriptions[id];
this._subscribe(sub, 'rootSubscribe', []);
}
} }
/** /**
@ -1759,4 +1812,56 @@ export class Connection {
throw new Error(`Unknown signature result id: ${id}`); throw new Error(`Unknown signature result id: ${id}`);
} }
} }
/**
* @private
*/
_wsOnRootNotification(notification: Object) {
const res = RootNotificationResult(notification);
if (res.error) {
throw new Error(res.error.message);
}
assert(typeof res.result !== 'undefined');
const root = res.result;
const keys = Object.keys(this._rootSubscriptions).map(Number);
for (let id of keys) {
const sub = this._rootSubscriptions[id];
if (sub.subscriptionId === res.subscription) {
sub.callback(root);
return true;
}
}
}
/**
* Register a callback to be invoked upon root changes
*
* @param callback Function to invoke whenever the root changes
* @return subscription id
*/
onRootChange(callback: RootChangeCallback): number {
const id = ++this._rootSubscriptionCounter;
this._rootSubscriptions[id] = {
callback,
subscriptionId: null,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister a root notification callback
*
* @param id subscription id to deregister
*/
async removeRootChangeListener(id: number): Promise<void> {
if (this._rootSubscriptions[id]) {
const subInfo = this._rootSubscriptions[id];
delete this._rootSubscriptions[id];
await this._unsubscribe(subInfo, 'rootUnsubscribe');
this._updateSubscriptions();
} else {
throw new Error(`Unknown root change id: ${id}`);
}
}
} }

View File

@ -1417,30 +1417,54 @@ test('slot notification', async () => {
const connection = new Connection(url, 'recent'); const connection = new Connection(url, 'recent');
// let notified = false; let notified = false;
const subscriptionId = connection.onSlotChange(slotInfo => { const subscriptionId = connection.onSlotChange(slotInfo => {
expect(slotInfo.parent).toBeDefined(); expect(slotInfo.parent).toBeDefined();
expect(slotInfo.slot).toBeDefined(); expect(slotInfo.slot).toBeDefined();
expect(slotInfo.root).toBeDefined(); expect(slotInfo.root).toBeDefined();
expect(slotInfo.slot).toBeGreaterThan(slotInfo.parent); expect(slotInfo.slot).toBeGreaterThan(slotInfo.parent);
expect(slotInfo.slot).toBeGreaterThanOrEqual(slotInfo.root); expect(slotInfo.slot).toBeGreaterThanOrEqual(slotInfo.root);
// notified = true; notified = true;
}); });
// // Wait for mockCallback to receive a call
// FIXME: enable this check when slotNotification is live let i = 0;
// while (!notified) {
// // Wait for mockCallback to receive a call if (++i === 30) {
// let i = 0; throw new Error('Slot change notification not observed');
// while (!notified) { }
// //for (;;) { // Sleep for a 1/4 of a slot, notifications only occur after a block is
// if (++i === 30) { // processed
// throw new Error('Slot change notification not observed'); await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
// } }
// // Sleep for a 1/4 of a slot, notifications only occur after a block is
// // processed
// await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
// }
await connection.removeSlotChangeListener(subscriptionId); await connection.removeSlotChangeListener(subscriptionId);
}); });
test('root notification', async () => {
if (mockRpcEnabled) {
console.log('non-live test skipped');
return;
}
const connection = new Connection(url, 'recent');
let roots = [];
const subscriptionId = connection.onRootChange(root => {
roots.push(root);
});
// Wait for mockCallback to receive a call
let i = 0;
while (roots.length < 2) {
if (++i === 30) {
throw new Error('Root change notification not observed');
}
// Sleep for a 1/4 of a slot, notifications only occur after a block is
// processed
await sleep((250 * DEFAULT_TICKS_PER_SLOT) / NUM_TICKS_PER_SECOND);
}
expect(roots[1]).toBeGreaterThan(roots[0]);
await connection.removeRootChangeListener(subscriptionId);
});