createReactiveStoreFromDataPublisherFactory

function createReactiveStoreFromDataPublisherFactory<TData>(
    config,
): ReactiveStreamStore<TData>;

Returns a ReactiveStreamStore that wires itself to a fresh DataPublisher on construction and on every `retry()`.

Unlike createReactiveStoreFromDataPublisher, this variant accepts a createDataPublisher factory rather than a ready-made publisher. That lets the store tear down a broken stream and open a new one without losing subscribers or the last known value.

Things to note:

  • getUnifiedState() starts in status: 'loading' until the first notification arrives.
  • On error, the store transitions to status: 'error' preserving the last known value. Only the first error per connection window is captured — a subsequent retry() resets that window.
  • retry() is a no-op unless the store is currently in status: 'error'. When it fires, the store transitions to status: 'retrying' (preserving stale data), invokes createDataPublisher(), and wires up a fresh connection. If the factory rejects, the store transitions to status: 'error' with the rejection reason.
  • Triggering the caller's abortSignal disconnects the store permanently; subsequent retry() calls are no-ops.

Type Parameters

Type Parameter
TData

Parameters

ParameterTypeDescription
configFactoryConfig-

Returns

ReactiveStreamStore<TData>

Example

const store = createReactiveStoreFromDataPublisherFactory({
    abortSignal,
    async createDataPublisher() {
        return getDataPublisherFromEventEmitter(new WebSocket(url));
    },
    dataChannelName: 'message',
    errorChannelName: 'error',
});
const unsubscribe = store.subscribe(() => {
    const snapshot = store.getUnifiedState();
    if (snapshot.status === 'error') console.error('Connection failed:', snapshot.error);
    else if (snapshot.status === 'loaded') console.log('Latest:', snapshot.data);
});
// Call `store.retry()` to recover after an error — e.g. from a user-triggered "Retry" button.

On this page