Intercepting Concurrent Requests in NestJS to Avoid Race Conditions

When working with APIs that handle a high volume of concurrent requests, there is a risk of race conditions. These occur when multiple requests try to access or modify the same data simultaneously, resulting in inconsistent or incorrect outcomes. In NestJS, one effective strategy to prevent such race conditions is to intercept requests and throttle them based on specific criteria, such as the uniqueness of the request input.
This article walks through how you can intercept concurrent GraphQL requests in NestJS and prevent race conditions using a custom interceptor. Let’s take a closer look at how this is done.
Understanding Race Conditions
Race conditions happen when two or more concurrent processes access shared data and their behavior depends on the timing of the operations. For example, if two users try to modify the same resource at the same time, the final state may be unpredictable. This is especially problematic in distributed systems where data consistency is critical.
In the context of a GraphQL API, a user might send multiple requests with identical or conflicting data. If both requests are processed at the same time, they could lead to data integrity issues.
Why Intercepting Concurrent Requests is Necessary
When building APIs, especially in distributed systems, concurrency issues can arise due to race conditions. These conditions can lead to inconsistent data, failed transactions, and unexpected behavior. For example, in a scenario where two users attempt to update the same resource simultaneously, without proper synchronization, the final state of that resource could be unpredictable.
In NestJS, race conditions can be mitigated by intercepting requests and ensuring that no two concurrent requests with identical or conflicting data are processed simultaneously. This is especially important in GraphQL APIs, where input data plays a crucial role in determining the uniqueness of a request. By throttling requests, we can ensure that only one request with the same input is handled at a time, thereby maintaining data integrity.
Without this mechanism, concurrent requests could:
1. Overwrite Data: Two requests might modify the same resource, leading to data loss.
2. Inconsistent States: One request might read stale data while another is updating it.
3. Performance Degradation: Processing conflicting requests at the same time could lead to unnecessary load on the system.
By implementing an interceptor, you ensure that requests are processed in a controlled manner, preventing race conditions and maintaining the overall reliability of your system.
Test Script for Concurrent Requests
Here’s an alternative test script to simulate concurrent requests, similar to the original but with slight modifications:
#!/bin/bash
# Number of concurrent requests
request_count=10
# Function to send a GraphQL mutation request
send_graphql_request() {
curl -X POST \
-H "accept: application/json" \
-H "content-type: application/json" \
-H "application-id: abc123" \
-H "x-correlation-id: $(uuidgen)" \
-H "authorization: Bearer your_auth_token" \
http://localhost:3000/graphql \
-d '{
"operationName": "UpdateProfile",
"variables": {
"input": {
"email": "testuser@example.com",
"firstName": "John",
"lastName": "Doe"
}
},
"query": "mutation UpdateProfile($input: UpdateProfileInput!) {\n updateProfile(input: $input) {\n id\n firstName\n lastName\n email\n }\n}"
}'
}
# Send multiple requests in parallel
for i in $(seq 1 $request_count); do
send_graphql_request & # Execute in the background
done
# Wait for all requests to complete
wait
echo "All $request_count requests have been sent."
Explanation of the Test Script
• Parallel Requests: The for loop sends multiple concurrent requests (10 in this case) using the & operator, which allows each request to run in the background.
• Random Correlation ID: The x-correlation-id header uses uuidgen to generate a unique identifier for each request. This is useful for tracking and debugging in systems that log correlation IDs.
• GraphQL Mutation: The request simulates an update operation (e.g., updating user profile data) via a GraphQL mutation, which tests the ability of the interceptor to throttle requests with identical input data.
• Wait for Completion: The wait command ensures that the script waits for all background requests to complete before printing the final message.
This script can be adjusted to test different mutation operations or input data to ensure that the ThrottleInterceptor is working correctly and effectively mitigating race conditions.
Introducing ThrottleInterceptor
The goal of our ThrottleInterceptor is to temporarily block any concurrent requests that contain the same input data. This prevents multiple requests from being processed simultaneously, ensuring only one request is processed at a time for any unique input.
Here’s the code that implements this strategy:
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { GqlExecutionContext } from '@nestjs/graphql';
import { Observable, map } from 'rxjs';
import * as crypto from 'crypto';
@Injectable()
export class ThrottleInterceptor implements NestInterceptor {
private storage = new Map<string, NodeJS.Timeout>();
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const ctx = GqlExecutionContext.create(context);
const request = ctx.getContext();
const input = request?.req?.body?.variables?.input;
const key = this.generateKey(input);
const ttl = 5000; // 5 seconds in milliseconds
// Check if a request with the same key is already being processed
if (this.storage.has(key)) {
throw new ConflictException('Another request with the same input is currently being processed. Please try again later.');
}
// Set a timeout to remove the key after the specified TTL (5 seconds)
this.storage.set(
key,
setTimeout(() => this.storage.delete(key), ttl)
);
return next.handle().pipe(
map(data => {
// Automatically clean up the key after processing the request
return data;
})
);
}
private generateKey(input: any): string {
// Create a unique key based on the request input
const hash = crypto.createHash('sha256');
hash.update(JSON.stringify(input));
return hash.digest('hex');
}
}
How the Interceptor Works
1. Context Extraction:
The GqlExecutionContext is used to extract the GraphQL request context. This allows us to retrieve the request body and access the input variables sent in the GraphQL mutation or query.
2. Key Generation:
The generateKey method creates a unique hash of the request input. This ensures that requests with the same input will generate the same key, allowing the interceptor to identify duplicate or conflicting requests.
3. Concurrency Check:
Before allowing the request to proceed, the interceptor checks if a key already exists in the storage Map. If a key is found, it means that a request with the same input is already being processed, and an error is thrown to prevent the new request from proceeding.
4. TTL-Based Throttling:
When a new request is processed, a key is stored in the Map with a TTL (Time to Live) of 5 seconds. After this time, the key is automatically deleted using setTimeout, allowing future requests with the same input to be processed after the throttle period.
5. Request Handling:
If no key is found, the request is allowed to proceed. The next.handle() method continues the request-processing pipeline, and once completed, the request’s input is no longer throttled.
Key Advantages
• Preventing Race Conditions:
By ensuring that no two identical requests are processed at the same time, this interceptor prevents conflicts that could lead to race conditions.
• Simple Throttling Logic:
The 5-second throttle ensures that a small window of time exists where only one request can modify the same data. This is especially useful for operations that involve updates to shared resources.
• Customizable TTL:
The TTL (5 seconds in this case) can be adjusted based on your application’s needs. If you need to allow requests sooner or throttle for a longer period, you can simply modify the ttl value.
Handling Errors Gracefully
The current implementation throws a generic error, Server have a problem, when a duplicate request is detected. In a production system, you may want to improve this by throwing a more descriptive error, like a ConflictException, or by returning a more user-friendly response that informs the client about the concurrency issue.
You can modify the error handling like this:
if (this.storage.has(key)) {
throw new ConflictException('Another request with the same input is currently being processed. Please try again later.');
}
Use Cases
This pattern is particularly useful in scenarios where:
• Concurrency is high, and the API needs to prevent conflicting operations.
• Transactions on shared resources need to be serialized to ensure data consistency.
• Debounced or delayed operations are necessary to prevent overloading systems with duplicate or unnecessary requests.
Conclusion
The ThrottleInterceptor in NestJS provides a powerful way to manage concurrent requests and avoid race conditions by throttling duplicate requests. This approach is simple, scalable, and easily customizable, making it a great tool for ensuring data consistency in applications with high request volume.
By understanding and implementing strategies like these, you can build more robust APIs that handle concurrency gracefully, ensuring that data remains consistent and that users experience fewer issues when sending requests in parallel.