The auditMap operator for RxJS
RxJs has lots of custom operators but there was recently a problem I could not solve with them.
The Problem
I have a page with a back button. Within the page you can change settings, which e.g. updates something in a database using a REST API. The page has no save button, so every change is saved instantly.
The problem is now that I have to ensure that the API calls are finished before I route to another page. So when the user changes a setting and clicks the back button, I have to block the routing until the api calls returned successfully.
So imagine the user is changing multiple settings very quickly and the API call is slow:
- He ticks a checkbox => Call the API
- He changes an address => Call the API
- He unticks again => Call the API
- He clicks on the back button
- Wait until the settings are saved
- Route to the back page
So I need an operator which does not emit when there are "next" operations in the queue. I don't need all operations to be executed as each operation has all changes of the previous operation. Only the last operation must be executed.
Using ConcatMap Operator
Using SwitchMap Operator
Using Audit Operator
Using Throttle Operator
Custom Solution: AuditMap Operator
import { EMPTY, Observable, Observer, OperatorFunction } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
export function auditMap<T, R>(
project: (value: T) => Observable<R>,
): OperatorFunction<T, R> {
return function (source$: Observable<T>) {
let nextArgs: [T] | undefined;
let isRunning = false;
const subscriberFact = (observer: Observer<R>): Observer<R> => ({
complete: () => {
// if we have next emits then take it and subscribe
if (nextArgs !== undefined) {
const [nextValue] = nextArgs;
nextArgs = undefined;
project(nextValue).subscribe(subscriberFact(observer));
} else {
isRunning = false;
observer.complete();
}
},
error: (error: any) => { observer.error(error); },
next: (value: R) => {
if (!nextArgs) {
observer.next(value);
}
},
});
return source$.pipe(
// with mergeMap we can subscribe to all values emitted from the source
// if we currently processing a previous value we store the next value and emit EMPTY
// so that the operator does not emit
mergeMap((value: T) => {
if (!isRunning) {
isRunning = true;
return Observable.create((observer: Observer<R>) => {
project(value).subscribe(subscriberFact(observer));
});
}
nextArgs = [value];
return EMPTY;
}),
);
};
}
Comments
Post a Comment