createReactiveStoreWithInitialValueAndSlotTracking

function createReactiveStoreWithInitialValueAndSlotTracking<
    TRpcValue,
    TSubscriptionValue,
    TItem,
>(
    config,
): ReactiveStreamStore<
    Readonly<{
        context: Readonly<{
            slot: Slot;
        }>;
        value: TItem;
    }>
>;

Creates a ReactiveStreamStore that combines an initial RPC fetch with an ongoing subscription to keep its state up to date.

The store uses slot-based comparison to ensure that only the most recent value is kept, regardless of whether it came from the initial RPC response or a subscription notification. This prevents stale data from overwriting newer data when the RPC response and subscription notifications arrive out of order.

Things to note:

  • getUnifiedState() starts in status: 'loading' until the first response or notification arrives. Once data arrives it transitions to status: 'loaded' with a SolanaRpcResponse containing the value and the slot context at which it was observed.
  • On error from either source, the store transitions to status: 'error' preserving the last known value. Only the first error per connection window is captured.
  • Calling ReactiveStreamStore.retry | `retry()` while in status: 'error' re-sends the RPC request and re-subscribes to the subscription using a fresh inner abort signal. The store transitions through status: 'retrying' back to loaded/error.
  • Triggering the caller's abort signal disconnects the store permanently; subsequent retry() calls are no-ops.

Type Parameters

Type Parameter
TRpcValue
TSubscriptionValue
TItem

Parameters

ParameterTypeDescription
configCreateReactiveStoreWithInitialValueAndSlotTrackingConfig<TRpcValue, TSubscriptionValue, TItem>-

Returns

ReactiveStreamStore<Readonly<{ context: Readonly<{ slot: Slot; }>; value: TItem; }>>

Example

import {
    address,
    createReactiveStoreWithInitialValueAndSlotTracking,
    createSolanaRpc,
    createSolanaRpcSubscriptions,
} from '@solana/kit';
 
const rpc = createSolanaRpc('http://127.0.0.1:8899');
const rpcSubscriptions = createSolanaRpcSubscriptions('ws://127.0.0.1:8900');
const myAddress = address('FnHyam9w4NZoWR6mKN1CuGBritdsEWZQa4Z4oawLZGxa');
 
const balanceStore = createReactiveStoreWithInitialValueAndSlotTracking({
    abortSignal: AbortSignal.timeout(60_000),
    rpcRequest: rpc.getBalance(myAddress, { commitment: 'confirmed' }),
    rpcValueMapper: lamports => lamports,
    rpcSubscriptionRequest: rpcSubscriptions.accountNotifications(myAddress),
    rpcSubscriptionValueMapper: ({ lamports }) => lamports,
});
 
const unsubscribe = balanceStore.subscribe(() => {
    const state = balanceStore.getUnifiedState();
    if (state.status === 'error') {
        console.error('Error:', state.error);
        balanceStore.retry();
    } else if (state.status === 'loaded') {
        console.log(`Balance at slot ${state.data.context.slot}:`, state.data.value);
    }
});

See

ReactiveStreamStore

On this page