fix: Plumb program-based subscriptions

This commit is contained in:
Tyera Eulberg
2019-03-08 17:02:39 -07:00
committed by Michael Vines
parent 2ce935287a
commit f6afbe1a72
3 changed files with 257 additions and 36 deletions

View File

@ -102,6 +102,19 @@ const AccountNotificationResult = struct({
result: AccountInfoResult,
});
/**
* @private
*/
const ProgramAccountInfoResult = struct(['string', AccountInfoResult]);
/***
* Expected JSON RPC response for the "programNotification" message
*/
const ProgramAccountNotificationResult = struct({
subscription: 'number',
result: ProgramAccountInfoResult,
});
/**
* Expected JSON RPC response for the "confirmTransaction" message
*/
@ -156,6 +169,18 @@ type AccountInfo = {
userdata: Buffer,
};
/**
* Account information identified by pubkey
*
* @typedef {Object} KeyedAccountInfo
* @property {PublicKey} accountId
* @property {AccountInfo} accountInfo
*/
type KeyedAccountInfo = {
accountId: PublicKey,
accountInfo: AccountInfo,
};
/**
* Callback function for account change notifications
*/
@ -170,6 +195,22 @@ type AccountSubscriptionInfo = {
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* Callback function for program account change notifications
*/
export type ProgramAccountChangeCallback = (
keyedAccountInfo: KeyedAccountInfo,
) => void;
/**
* @private
*/
type ProgramAccountSubscriptionInfo = {
programId: string, // PublicKey of the program as a base 58 string
callback: ProgramAccountChangeCallback,
subscriptionId: null | number, // null when there's no current server subscription id
};
/**
* Possible signature status values
*
@ -198,6 +239,10 @@ export class Connection {
_disableBlockhashCaching: boolean = false;
_accountChangeSubscriptions: {[number]: AccountSubscriptionInfo} = {};
_accountChangeSubscriptionCounter: number = 0;
_programAccountChangeSubscriptions: {
[number]: ProgramAccountSubscriptionInfo,
} = {};
_programAccountChangeSubscriptionCounter: number = 0;
/**
* Establish a JSON RPC connection
@ -231,6 +276,10 @@ export class Connection {
'accountNotification',
this._wsOnAccountNotification.bind(this),
);
this._rpcWebSocket.on(
'programNotification',
this._wsOnProgramAccountNotification.bind(this),
);
}
/**
@ -459,6 +508,71 @@ export class Connection {
this._rpcWebSocketConnected = false;
}
/**
* @private
*/
async _updateSubscriptions() {
const accountKeys = Object.keys(this._accountChangeSubscriptions).map(
Number,
);
const programKeys = Object.keys(
this._programAccountChangeSubscriptions,
).map(Number);
if (accountKeys.length === 0 && programKeys.length === 0) {
this._rpcWebSocket.close();
return;
}
if (!this._rpcWebSocketConnected) {
for (let id of accountKeys) {
this._accountChangeSubscriptions[id].subscriptionId = null;
}
for (let id of programKeys) {
this._programAccountChangeSubscriptions[id].subscriptionId = null;
}
this._rpcWebSocket.connect();
return;
}
for (let id of accountKeys) {
const {subscriptionId, publicKey} = this._accountChangeSubscriptions[id];
console.log('pubkey: ' + publicKey);
if (subscriptionId === null) {
try {
this._accountChangeSubscriptions[
id
].subscriptionId = await this._rpcWebSocket.call('accountSubscribe', [
publicKey,
]);
} catch (err) {
console.log(
`accountSubscribe error for ${publicKey}: ${err.message}`,
);
}
}
}
for (let id of programKeys) {
const {
subscriptionId,
programId,
} = this._programAccountChangeSubscriptions[id];
console.log('program-id: ' + programId);
if (subscriptionId === null) {
try {
this._programAccountChangeSubscriptions[
id
].subscriptionId = await this._rpcWebSocket.call('programSubscribe', [
programId,
]);
} catch (err) {
console.log(
`programSubscribe error for ${programId}: ${err.message}`,
);
}
}
}
}
/**
* @private
*/
@ -486,42 +600,6 @@ export class Connection {
}
}
/**
* @private
*/
async _updateSubscriptions() {
const keys = Object.keys(this._accountChangeSubscriptions).map(Number);
if (keys.length === 0) {
this._rpcWebSocket.close();
return;
}
if (!this._rpcWebSocketConnected) {
for (let id of keys) {
this._accountChangeSubscriptions[id].subscriptionId = null;
}
this._rpcWebSocket.connect();
return;
}
for (let id of keys) {
const {subscriptionId, publicKey} = this._accountChangeSubscriptions[id];
if (subscriptionId === null) {
try {
this._accountChangeSubscriptions[
id
].subscriptionId = await this._rpcWebSocket.call('accountSubscribe', [
publicKey,
]);
} catch (err) {
console.log(
`accountSubscribe error for ${publicKey}: ${err.message}`,
);
}
}
}
}
/**
* Register a callback to be invoked whenever the specified account changes
*
@ -564,4 +642,80 @@ export class Connection {
throw new Error(`Unknown account change id: ${id}`);
}
}
/**
* @private
*/
_wsOnProgramAccountNotification(notification: Object) {
const res = ProgramAccountNotificationResult(notification);
if (res.error) {
throw new Error(res.error.message);
}
const keys = Object.keys(this._programAccountChangeSubscriptions).map(
Number,
);
for (let id of keys) {
const sub = this._programAccountChangeSubscriptions[id];
if (sub.subscriptionId === res.subscription) {
const {result} = res;
assert(typeof result !== 'undefined');
sub.callback({
accountId: result[0],
accountInfo: {
executable: result[1].executable,
owner: new PublicKey(result[1].owner),
lamports: result[1].lamports,
userdata: Buffer.from(result[1].userdata),
},
});
return true;
}
}
}
/**
* Register a callback to be invoked whenever accounts owned by the
* specified program change
*
* @param programId Public key of the program to monitor
* @param callback Function to invoke whenever the account is changed
* @return subscription id
*/
onProgramAccountChange(
programId: PublicKey,
callback: ProgramAccountChangeCallback,
): number {
const id = ++this._programAccountChangeSubscriptionCounter;
this._programAccountChangeSubscriptions[id] = {
programId: programId.toBase58(),
callback,
subscriptionId: null,
};
this._updateSubscriptions();
return id;
}
/**
* Deregister an account notification callback
*
* @param id subscription id to deregister
*/
async removeProgramAccountChangeListener(id: number): Promise<void> {
if (this._programAccountChangeSubscriptions[id]) {
const {subscriptionId} = this._programAccountChangeSubscriptions[id];
delete this._programAccountChangeSubscriptions[id];
if (subscriptionId !== null) {
try {
await this._rpcWebSocket.call('programUnsubscribe', [subscriptionId]);
} catch (err) {
console.log('programUnsubscribe error:', err.message);
}
}
this._updateSubscriptions();
} else {
throw new Error(`Unknown account change id: ${id}`);
}
}
}