The auditMap operator for RxJS

RxJs has lots of custom operators but there was recently a problem I could not solve with them.

The Problem

I have a page with a back button. Within the page you can change settings, which e.g. updates something in a database using a REST API. The page has no save button, so every change is saved instantly.

The problem is now that I have to ensure that the API calls are finished before I route to another page. So when the user changes a setting and clicks the back button, I have to block the routing until the api calls returned successfully.

So imagine the user is changing multiple settings very quickly and the API call is slow:

  • He ticks a checkbox => Call the API
  • He changes an address => Call the API
  • He unticks again => Call the API
  • He clicks on the back button
  • Wait until the settings are saved
  • Route to the back page

So I need an operator which does not emit when there are "next" operations in the queue. I don't need all operations to be executed as each operation has all changes of the previous operation. Only the last operation must be executed.


Using ConcatMap Operator

The concatMap operator ensures that all operations are done sequentially in order but emits for each operation. So I will not know if all pending operations are completed because it emits each time.

Using SwitchMap Operator

The switchMap throws the current operation away when a new emit comes. This ensures that always the last operation is taken. But in my case this can lead to strange behaviours when e.g. the operation before takes longer then the next operation. This comes close to what I want, but I want to be sure that the next operation should be taken when the current operation is finished.



Using Audit Operator

The audit operator ignores all pending operations while processing and then emits. So also not a solution as I don't want to emit if there any "next" operations.


Using Throttle Operator

The throttle operator emits a value from the source obervable and ignores subsequent values. But it has a second parameter which can be used to define which values should be emitted. I go with the configuration throttle(op, {leading: false, trailing: true}). I want that the last operation should emit.



This comes close to what we want but throttle also does not wait if there are any subsequent "next operations". In this example I put 1 and the 2 and 3. After processing of "1" is finished it takes "3" and emits directly without waiting "3" to be processed.


Custom Solution: AuditMap Operator

The custom auditMap RxJs operator only emits if there is no "pending" next operation.


import { EMPTY, Observable, Observer, OperatorFunction } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

export function auditMap<T, R>(
    project: (value: T) => Observable<R>,
): OperatorFunction<T, R> {

    return function (source$: Observable<T>) {
        let nextArgs: [T] | undefined;
        let isRunning = false;

        const subscriberFact = (observer: Observer<R>): Observer<R> => ({
            complete: () => {
                // if we have next emits then take it and subscribe
                if (nextArgs !== undefined) {
                    const [nextValue] = nextArgs;
                    nextArgs = undefined;

                    project(nextValue).subscribe(subscriberFact(observer));
                } else {
                    isRunning = false;
                    observer.complete();
                }
            },
            error: (error: any) => { observer.error(error); },
            next: (value: R) => {
                if (!nextArgs) {
                    observer.next(value);
                }
            },
        });

        return source$.pipe(
             // with mergeMap we can subscribe to all values emitted from the source
             // if we currently processing a previous value we store the next value and emit EMPTY
             // so that the operator does not emit
            mergeMap((value: T) => {
                if (!isRunning) {
                    isRunning = true;
                    return Observable.create((observer: Observer<R>) => {
                        project(value).subscribe(subscriberFact(observer));
                    });
                }

                nextArgs = [value];
                return EMPTY;
            }),
        );
    };
}


Try it out on StackBlitz



Comments

Popular posts from this blog

What is Base, Local & Remote in Git merge

Asynchronous Nunjucks with Filters and Extensions

Debug Azure Function locally with https on a custom domain