diff --git a/web3.js/src/connection.ts b/web3.js/src/connection.ts index b289392a36..4ebd699e70 100644 --- a/web3.js/src/connection.ts +++ b/web3.js/src/connection.ts @@ -1556,6 +1556,54 @@ type RootSubscriptionInfo = { subscriptionId: SubscriptionId | null; // null when there's no current server subscription id }; +/** + * @internal + */ +const LogsResult = pick({ + err: TransactionErrorResult, + logs: array(string()), + signature: string(), +}); + +/** + * Logs result. + * + * @typedef {Object} Logs. + */ +export type Logs = { + err: TransactionError | null; + logs: string[]; + signature: string; +}; + +/** + * Expected JSON RPC response for the "logsNotification" message. + */ +const LogsNotificationResult = pick({ + result: notificationResultAndContext(LogsResult), + subscription: number(), +}); + +/** + * Filter for log subscriptions. + */ +export type LogsFilter = PublicKey | 'all' | 'allWithVotes'; + +/** + * Callback function for log notifications. + */ +export type LogsCallback = (logs: Logs, ctx: Context) => void; + +/** + * @private + */ +type LogsSubscriptionInfo = { + callback: LogsCallback; + filter: LogsFilter; + subscriptionId: SubscriptionId | null; // null when there's no current server subscription id + commitment?: Commitment; +}; + /** * Signature result * @@ -1669,6 +1717,11 @@ export class Connection { [id: number]: SlotSubscriptionInfo; } = {}; + /** @internal */ _logsSubscriptionCounter: number = 0; + /** @internal */ _logsSubscriptions: { + [id: number]: LogsSubscriptionInfo; + } = {}; + /** * Establish a JSON RPC connection * @@ -1730,6 +1783,10 @@ export class Connection { 'rootNotification', this._wsOnRootNotification.bind(this), ); + this._rpcWebSocket.on( + 'logsNotification', + this._wsOnLogsNotification.bind(this), + ); } /** @@ -2991,12 +3048,14 @@ export class Connection { const slotKeys = Object.keys(this._slotSubscriptions).map(Number); const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number); const rootKeys = Object.keys(this._rootSubscriptions).map(Number); + const logsKeys = Object.keys(this._logsSubscriptions).map(Number); if ( accountKeys.length === 0 && programKeys.length === 0 && slotKeys.length === 0 && signatureKeys.length === 0 && - rootKeys.length === 0 + rootKeys.length === 0 && + logsKeys.length === 0 ) { if (this._rpcWebSocketConnected) { this._rpcWebSocketConnected = false; @@ -3053,6 +3112,21 @@ export class Connection { const sub = this._rootSubscriptions[id]; this._subscribe(sub, 'rootSubscribe', []); } + + for (let id of logsKeys) { + const sub = this._logsSubscriptions[id]; + let filter; + if (typeof sub.filter === 'object') { + filter = {mentions: [sub.filter.toString()]}; + } else { + filter = sub.filter; + } + this._subscribe( + sub, + 'logsSubscribe', + this._buildArgs([filter], sub.commitment), + ); + } } /** @@ -3169,6 +3243,55 @@ export class Connection { } } + /** + * Registers a callback to be invoked whenever logs are emitted. + */ + onLogs( + filter: LogsFilter, + callback: LogsCallback, + commitment?: Commitment, + ): number { + const id = ++this._logsSubscriptionCounter; + this._logsSubscriptions[id] = { + filter, + callback, + commitment, + subscriptionId: null, + }; + this._updateSubscriptions(); + return id; + } + + /** + * Deregister a logs callback. + * + * @param id subscription id to deregister. + */ + async removeOnLogsListener(id: number): Promise { + if (!this._logsSubscriptions[id]) { + throw new Error(`Unknown logs id: ${id}`); + } + const subInfo = this._logsSubscriptions[id]; + delete this._logsSubscriptions[id]; + await this._unsubscribe(subInfo, 'logsUnsubscribe'); + this._updateSubscriptions(); + } + + /** + * @internal + */ + _wsOnLogsNotification(notification: Object) { + const res = create(notification, LogsNotificationResult); + const keys = Object.keys(this._logsSubscriptions).map(Number); + for (let id of keys) { + const sub = this._logsSubscriptions[id]; + if (sub.subscriptionId === res.subscription) { + sub.callback(res.result.value, res.result.context); + return; + } + } + } + /** * @internal */ diff --git a/web3.js/test/connection.test.ts b/web3.js/test/connection.test.ts index 9ec697ed6e..cab380f948 100644 --- a/web3.js/test/connection.test.ts +++ b/web3.js/test/connection.test.ts @@ -2268,6 +2268,40 @@ describe('Connection', () => { await connection.removeRootChangeListener(subscriptionId); }); + it('logs notification', async () => { + let listener: number | undefined; + const owner = new Account(); + const [logsRes, ctx] = await new Promise(resolve => { + listener = connection.onLogs( + 'all', + (logs, ctx) => { + resolve([logs, ctx]); + }, + 'processed', + ); + // Sleep to allow the subscription time to be setup. + // + // Without this, there's a race condition between setting up the log + // subscription and executing the transaction to trigger the log. + // If the transaction to trigger the log executes before the + // subscription is setup, the log event listener never fires and so the + // promise never resolves. + sleep(1000).then(() => { + // Execute a transaction so that we can pickup its logs. + connection.requestAirdrop(owner.publicKey, 1); + }); + }); + expect(ctx.slot).to.be.greaterThan(0); + expect(logsRes.logs.length).to.eq(2); + expect(logsRes.logs[0]).to.eq( + 'Program 11111111111111111111111111111111 invoke [1]', + ); + expect(logsRes.logs[1]).to.eq( + 'Program 11111111111111111111111111111111 success', + ); + await connection.removeOnLogsListener(listener!); + }); + it('https request', async () => { const connection = new Connection('https://devnet.solana.com'); const version = await connection.getVersion();