feat: use pubsub to confirm transactions (#12095)
This commit is contained in:
@ -14,6 +14,7 @@ import {MS_PER_SLOT} from './timing';
|
||||
import {Transaction} from './transaction';
|
||||
import {Message} from './message';
|
||||
import {sleep} from './util/sleep';
|
||||
import {promiseTimeout} from './util/promise-timeout';
|
||||
import {toBuffer} from './util/to-buffer';
|
||||
import type {Blockhash} from './blockhash';
|
||||
import type {FeeCalculator} from './fee-calculator';
|
||||
@ -57,11 +58,11 @@ export type SendOptions = {
|
||||
*
|
||||
* @typedef {Object} ConfirmOptions
|
||||
* @property {boolean | undefined} skipPreflight disable transaction verification step
|
||||
* @property {number | undefined} confirmations desired number of cluster confirmations
|
||||
* @property {Commitment | undefined} commitment desired commitment level
|
||||
*/
|
||||
export type ConfirmOptions = {
|
||||
skipPreflight?: boolean,
|
||||
confirmations?: number,
|
||||
commitment?: Commitment,
|
||||
};
|
||||
|
||||
/**
|
||||
@ -1951,39 +1952,74 @@ export class Connection {
|
||||
/**
|
||||
* Confirm the transaction identified by the specified signature.
|
||||
*
|
||||
* If `confirmations` count is not specified, wait for transaction to be finalized.
|
||||
*
|
||||
* If `commitment` is not specified, default to 'max'.
|
||||
*/
|
||||
async confirmTransaction(
|
||||
signature: TransactionSignature,
|
||||
confirmations: ?number,
|
||||
): Promise<RpcResponseAndContext<SignatureStatus | null>> {
|
||||
const start = Date.now();
|
||||
const WAIT_TIMEOUT_MS = 60 * 1000;
|
||||
|
||||
let statusResponse = await this.getSignatureStatus(signature);
|
||||
for (;;) {
|
||||
const status = statusResponse.value;
|
||||
if (status) {
|
||||
// 'status.confirmations === null' implies that the tx has been finalized
|
||||
if (
|
||||
status.err ||
|
||||
status.confirmations === null ||
|
||||
(typeof confirmations === 'number' &&
|
||||
status.confirmations >= confirmations)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
} else if (Date.now() - start >= WAIT_TIMEOUT_MS) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Sleep for approximately one slot
|
||||
await sleep(MS_PER_SLOT);
|
||||
statusResponse = await this.getSignatureStatus(signature);
|
||||
commitment: ?Commitment,
|
||||
): Promise<RpcResponseAndContext<SignatureResult>> {
|
||||
let decodedSignature;
|
||||
try {
|
||||
decodedSignature = bs58.decode(signature);
|
||||
} catch (err) {
|
||||
throw new Error('signature must be base58 encoded: ' + signature);
|
||||
}
|
||||
|
||||
return statusResponse;
|
||||
assert(decodedSignature.length === 64, 'signature has invalid length');
|
||||
|
||||
const start = Date.now();
|
||||
const subscriptionCommitment: Commitment = commitment || 'max';
|
||||
|
||||
let subscriptionId;
|
||||
let response: RpcResponseAndContext<SignatureResult> | null = null;
|
||||
const confirmPromise = new Promise((resolve, reject) => {
|
||||
try {
|
||||
subscriptionId = this.onSignature(
|
||||
signature,
|
||||
(result, context) => {
|
||||
subscriptionId = undefined;
|
||||
response = {
|
||||
context,
|
||||
value: result,
|
||||
};
|
||||
resolve();
|
||||
},
|
||||
subscriptionCommitment,
|
||||
);
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
let timeoutMs = 60 * 1000;
|
||||
switch (subscriptionCommitment) {
|
||||
case 'recent':
|
||||
case 'single':
|
||||
case 'singleGossip': {
|
||||
timeoutMs = 10 * 1000;
|
||||
break;
|
||||
}
|
||||
// exhaust enums to ensure full coverage
|
||||
case 'max':
|
||||
case 'root':
|
||||
}
|
||||
|
||||
try {
|
||||
await promiseTimeout(confirmPromise, timeoutMs);
|
||||
} finally {
|
||||
if (subscriptionId) {
|
||||
this.removeSignatureListener(subscriptionId);
|
||||
}
|
||||
}
|
||||
|
||||
if (response === null) {
|
||||
const duration = (Date.now() - start) / 1000;
|
||||
throw new Error(
|
||||
`Transaction was not confirmed in ${duration.toFixed(2)} seconds`,
|
||||
);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,7 +4,6 @@ import * as BufferLayout from 'buffer-layout';
|
||||
|
||||
import {Account} from './account';
|
||||
import {PublicKey} from './publickey';
|
||||
import {NUM_TICKS_PER_SECOND} from './timing';
|
||||
import {Transaction, PACKET_DATA_SIZE} from './transaction';
|
||||
import {SYSVAR_RENT_PUBKEY} from './sysvar';
|
||||
import {sendAndConfirmTransaction} from './util/send-and-confirm-transaction';
|
||||
@ -70,7 +69,7 @@ export class Loader {
|
||||
transaction,
|
||||
[payer, program],
|
||||
{
|
||||
confirmations: 1,
|
||||
commitment: 'single',
|
||||
skipPreflight: true,
|
||||
},
|
||||
);
|
||||
@ -111,17 +110,17 @@ export class Loader {
|
||||
});
|
||||
transactions.push(
|
||||
sendAndConfirmTransaction(connection, transaction, [payer, program], {
|
||||
confirmations: 1,
|
||||
commitment: 'single',
|
||||
skipPreflight: true,
|
||||
}),
|
||||
);
|
||||
|
||||
// Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors
|
||||
// since all the write transactions modify the same program account
|
||||
await sleep(1000 / NUM_TICKS_PER_SECOND);
|
||||
// Delay between sends in an attempt to reduce rate limit errors
|
||||
const REQUESTS_PER_SECOND = 4;
|
||||
await sleep(1000 / REQUESTS_PER_SECOND);
|
||||
|
||||
// Run up to 8 Loads in parallel to prevent too many parallel transactions from
|
||||
// getting rejected with AccountInUse.
|
||||
// getting retried due to AccountInUse errors.
|
||||
//
|
||||
// TODO: 8 was selected empirically and should probably be revisited
|
||||
if (transactions.length === 8) {
|
||||
@ -159,7 +158,7 @@ export class Loader {
|
||||
transaction,
|
||||
[payer, program],
|
||||
{
|
||||
confirmations: 1,
|
||||
commitment: 'single',
|
||||
skipPreflight: true,
|
||||
},
|
||||
);
|
||||
|
16
web3.js/src/util/promise-timeout.js
Normal file
16
web3.js/src/util/promise-timeout.js
Normal file
@ -0,0 +1,16 @@
|
||||
// @flow
|
||||
|
||||
export function promiseTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
): Promise<T | null> {
|
||||
let timeoutId: TimeoutID;
|
||||
const timeoutPromise = new Promise(resolve => {
|
||||
timeoutId = setTimeout(() => resolve(null), timeoutMs);
|
||||
});
|
||||
|
||||
return Promise.race([promise, timeoutPromise]).then(result => {
|
||||
clearTimeout(timeoutId);
|
||||
return result;
|
||||
});
|
||||
}
|
@ -7,7 +7,7 @@ import type {ConfirmOptions} from '../connection';
|
||||
/**
|
||||
* Send and confirm a raw transaction
|
||||
*
|
||||
* If `confirmations` count is not specified, wait for transaction to be finalized.
|
||||
* If `commitment` option is not specified, defaults to 'max' commitment.
|
||||
*
|
||||
* @param {Connection} connection
|
||||
* @param {Buffer} rawTransaction
|
||||
@ -19,31 +19,23 @@ export async function sendAndConfirmRawTransaction(
|
||||
rawTransaction: Buffer,
|
||||
options?: ConfirmOptions,
|
||||
): Promise<TransactionSignature> {
|
||||
const start = Date.now();
|
||||
const signature = await connection.sendRawTransaction(
|
||||
rawTransaction,
|
||||
options,
|
||||
);
|
||||
|
||||
const status = (
|
||||
await connection.confirmTransaction(
|
||||
signature,
|
||||
options && options.confirmations,
|
||||
options && options.commitment,
|
||||
)
|
||||
).value;
|
||||
|
||||
if (status) {
|
||||
if (status.err) {
|
||||
throw new Error(
|
||||
`Raw transaction ${signature} failed (${JSON.stringify(status)})`,
|
||||
);
|
||||
}
|
||||
return signature;
|
||||
if (status.err) {
|
||||
throw new Error(
|
||||
`Raw transaction ${signature} failed (${JSON.stringify(status)})`,
|
||||
);
|
||||
}
|
||||
|
||||
const duration = (Date.now() - start) / 1000;
|
||||
throw new Error(
|
||||
`Raw transaction '${signature}' was not confirmed in ${duration.toFixed(
|
||||
2,
|
||||
)} seconds`,
|
||||
);
|
||||
return signature;
|
||||
}
|
||||
|
@ -9,7 +9,7 @@ import type {TransactionSignature} from '../transaction';
|
||||
/**
|
||||
* Sign, send and confirm a transaction.
|
||||
*
|
||||
* If `confirmations` count is not specified, wait for transaction to be finalized.
|
||||
* If `commitment` option is not specified, defaults to 'max' commitment.
|
||||
*
|
||||
* @param {Connection} connection
|
||||
* @param {Transaction} transaction
|
||||
@ -23,32 +23,24 @@ export async function sendAndConfirmTransaction(
|
||||
signers: Array<Account>,
|
||||
options?: ConfirmOptions,
|
||||
): Promise<TransactionSignature> {
|
||||
const start = Date.now();
|
||||
const signature = await connection.sendTransaction(
|
||||
transaction,
|
||||
signers,
|
||||
options,
|
||||
);
|
||||
|
||||
const status = (
|
||||
await connection.confirmTransaction(
|
||||
signature,
|
||||
options && options.confirmations,
|
||||
options && options.commitment,
|
||||
)
|
||||
).value;
|
||||
|
||||
if (status) {
|
||||
if (status.err) {
|
||||
throw new Error(
|
||||
`Transaction ${signature} failed (${JSON.stringify(status)})`,
|
||||
);
|
||||
}
|
||||
return signature;
|
||||
if (status.err) {
|
||||
throw new Error(
|
||||
`Transaction ${signature} failed (${JSON.stringify(status)})`,
|
||||
);
|
||||
}
|
||||
|
||||
const duration = (Date.now() - start) / 1000;
|
||||
throw new Error(
|
||||
`Transaction was not confirmed in ${duration.toFixed(
|
||||
2,
|
||||
)} seconds (${JSON.stringify(status)})`,
|
||||
);
|
||||
return signature;
|
||||
}
|
||||
|
Reference in New Issue
Block a user