Implementing Polling With RxJS
Learn how to implement polling with RxJS for efficient, repetitive API calls, and optimize performance with exponential backoff and advanced RxJS operators.
Join the DZone community and get the full member experience.
Join For FreeIn most front-end applications, fetching data from the back-end is a common requirement to provide up-to-date information to the user. Typically, this process is straightforward: know the API endpoint, create a function, call it, process the response, and display it on the page. However, there are scenarios — often rare or specific to certain business needs — where client-server communication must be more sophisticated. In this article, we’ll explore one such case: implementing repetitive API calls (polling) using RxJS.
RxJS Refresher
Before diving into polling, let’s take a moment to refresh our understanding of RxJS. For those new to it, RxJS (Reactive Extensions for JavaScript) is a library that brings reactive programming concepts into the JavaScript ecosystem. It’s particularly useful for managing asynchronous operations, offering powerful tools like Observables, Observers, Subjects, and Operators.
In short, RxJS is perfect for situations where you need to handle complex asynchronous tasks. Polling is one such scenario, making RxJS an ideal candidate for implementing this kind of functionality.
Repetitive API Calls
What is the idea of repetitive API calls? Well, there are times when a simple one-time API call won’t cut it. For example, waiting for an online queue (e.g., buying concert tickets where the queue updates frequently), checking order status changes at regular intervals without reloading the page, etc.
While you could use setInterval
to achieve this, it’s not a reliable solution. It can lead to issues like infinite API calls, excessive server load, or unpredictable behavior in your application. Instead, we can use RxJS to implement a more robust, controlled polling mechanism.
Code Breakdown
Let’s break down the code that does this:
import { MonoTypeOperatorFunction, timer } from 'rxjs';
import { last, scan, switchMapTo, takeWhile, tap } from 'rxjs/operators';
function attemptsGuardFactory(maxAttempts: number) {
return (attemptsCount: number) => {
if (attemptsCount > maxAttempts) {
throw new Error('Exceeded maxAttempts');
}
};
}
export function pollWhile<T>(
pollInterval: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false,
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0, pollInterval).pipe(
scan((attempts) => {
attempts += 1;
return attempts;
}, 0),
tap(attemptsGuardFactory(maxAttempts)),
switchMapTo(source$),
takeWhile(isPollingActive, true),
);
return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}
Here we can see two functions: attemptsGuardFactory
and pollWhile
. The first one is a helper function, and the second one is the actual implementation of polling. The pollWhile
function returns an RxJS operator and accepts some parameters to modify your polling settings:
pollInterval
: The interval (in milliseconds) at which the polling will occurisPollingActive
: A function that determines if polling should continuemaxAttempts
: Limits the maximum number of polling attemptsemitOnlyLast
: Should the function emit a value on each "tick"? If true, only the last value will be emitted.
We use the timer(0, pollInterval)
function to create an observable that emits at the specified interval. The first value is emitted immediately, and subsequent values are emitted after each pollInterval
. The scan
operator acts similarly to reduce
, accumulating the number of polling attempts. Unlike reduce
, though, scan
emits intermediate values, which is useful in our case as we want to track polling attempts over time.
Next, the tap
operator allows us to perform side effects — in this case, we use it to check the number of attempts via attemptsGuardFactory
. If we’ve reached the maxAttempts
, we throw an error to stop further polling. The switchMapTo
operator is key here — it subscribes to the source$
observable and cancels the previous subscription if a new one starts. This ensures that if the polling interval triggers again before the previous request completes, the earlier request is canceled, preventing overlapping calls. The switchMapTo
is similar to the switchMap
, but it takes an observable instead of a callback.
Finally, takeWhile
ensures polling continues only while isPollingActive
returns true
. Depending on the emitOnlyLast
flag, the function either returns only the last emitted value or all values.
Here’s a simple example of how you can use this polling mechanism in your code:
// Simulate a mock API call that returns the process status
function mockApiCall() {
const statuses = ['pending', 'pending', 'pending', 'complete'];
let index = 0;
return of(null).pipe(
// Simulating 1 second delay for the API response
delay(1000),
map(() => {
const status = statuses[index];
index++;
return status;
})
);
}
const isPollingActive = (status: string) => status !== 'complete';
mockApiCall().pipe(
pollWhile<string>(
2000,
isPollingActive,
5,
true
)
).subscribe(() => {
// This block will be executed when the status is "complete" or the maximum polling attempts are reached
});
It's as simple as that! We can easily use this pipe for every observable we want. But let’s take this a step further. In some cases, like when a server is overloaded, we may want to introduce a delay that grows over time to ease the load. This technique, known as exponential backoff, can be easily added to our RxJS operator.
Here’s the modified version of pollWhile
:
import { MonoTypeOperatorFunction, timer } from 'rxjs';
import { expand, last, scan, switchMapTo, takeWhile, tap } from 'rxjs/operators';
export function pollWhile<T>(
pollInterval: number,
growthFactor: number,
isPollingActive: (res: T) => boolean,
maxAttempts: number = Infinity,
emitOnlyLast: boolean = false,
): MonoTypeOperatorFunction<T> {
return (source$) => {
const poll$ = timer(0).pipe(
scan((attempts) => attempts + 1, 0),
tap(attemptsGuardFactory(maxAttempts)),
expand((attempts) => timer(pollInterval * Math.pow(growthFactor, attempts))),
switchMapTo(source$),
takeWhile(isPollingActive, true),
);
return emitOnlyLast ? poll$.pipe(last()) : poll$;
};
}
In this version, we introduce a new parameter, growthFactor
, which controls how much the delay grows after each polling attempt. We use the expand
operator to multiply the delay by the growth factor, increasing the wait time after each attempt. This is particularly useful in scenarios where you want to avoid overloading a server with too many requests in quick succession.
Conclusion
As you can see, polling with RxJS offers a robust and flexible way to handle repetitive API calls. We can build a highly customizable polling mechanism that ensures our application remains performant and efficient. In addition to this, we can easily implement the exponential backoff reducing server load in situations where response times are inconsistent or where the server might be overwhelmed. Whether you're dealing with long-running processes, status updates, or any other use case that requires periodic server checks, RxJS offers a powerful toolset for implementing polling. Give it a try in your next project, and take full advantage of reactive programming capabilities!
Opinions expressed by DZone contributors are their own.
Comments