How to Call Multiple Microservices in Parallel

Written by vaibhavtyagi | Published 2021/08/21
Tech Story Tags: microservices | microservice-architecture | containers | tutorial | coding-skills | microservices-in-practice | microservices-guide | learn-to-code

TLDR The problem is solving using Java 8’s CompletableFeatures & Spring Boot 2.x. We need to fetch all the records for the list provided and filter records from the list based on some attributes of the objects. The records can be fetched from another microservice (B) which takes id as an argument and returns the JSON of that record. The advantage of approach # 1 is that the processing time of fetching each record will be reduced to the max time taken by any of the service calls.via the TL;DR App

Problem Statement

We have a list of objects consisting of the id of records, and we need to fetch all the records for the list provided and filter records from the list based on some attributes of the objects. The records can be fetched from another microservice (B) which takes id as an argument and returns the JSON of that record.

Approach -1

Either we can call micro-service (B) sequentially for each record in the list and store results in another list that can be used for filtering later.

Approach - 2

We can make parallel requests and call micro-service (B) in parallel, storing results in the form of feature objects in one more list and then combine the result of all features.

The advantage of approach # 2 is that the processing time of fetching each record will be reduced to the max time taken by any of the service calls, so overall computation time will be reduced.

Sample Code using Java 8’s CompletableFeatures & Spring Boot 2.x

import java.util.List;
import java.util.Set;

public interface IApiFilterService {
public Set<Long> callSerciceApi(List<Long> id);
}

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class ServiceResponse {

private String customerId;
private int customerType;
private boolean isVipCustomer;
private boolean exceptionOccured;

}

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.log4j.Log4j2;

@Component
@Log4j2
public class ApiCaller {


@Autowired
private RestTemplate restTemplate;

Value("${service.url:https://127.0.0.1:8080/api/v1/customer/{id}")
private String apiUrl;

 

@Async("apiExecutor")
public CompletableFuture<ServiceResponse> callService(Long id) {
	try {
		
		JsonNode responseObj =	restTemplate.getForObject(apiUrl, JsonNode.class,id);
		
		ServiceResponse serviceResponse = new ServiceResponse(
                              responseObj.get("customerId")asText("-1"),
                               responseObj.get("customerType").asInt(0), 
                               responseObj.get("isVipCustomer").asBoolean(false), 
                                responseObj.get("exceptionOccured").asBoolean(false));
		return CompletableFuture.completedFuture(serviceResponse);
	
	}
	catch(Exception ex) {
        ServiceResponse serviceResponse = new ServiceResponse("-1",0,false,false);
                              
		return CompletableFuture.completedFuture(serviceResponse);
		
	}

  }

}


import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import lombok.extern.log4j.Log4j2;

@Service
@Log4j2
public class TApiFilterService implements IApiFilterService {



@Value("${executor.corepool.size:1}")
private int corePoolSize;

@Value("${executor.maxpool.size:3}")
private int maxPoolSize;

@Value("${executor.queue.size:100}")
private int queueCapacity;

@Value("${thread.timeout}")
private int threadTimeout;

@Autowired
ApiCaller thirdPartyApiCaller;



@Bean("apiExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
	
	ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
	threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
	threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
	threadPoolTaskExecutor.setQueueCapacity(queueCapacity);
	threadPoolTaskExecutor.setKeepAliveSeconds(threadTimeout);
	threadPoolTaskExecutor.setThreadNamePrefix("threadpool-");
	threadPoolMonitor.setPool(threadPoolTaskExecutor);
	return threadPoolTaskExecutor;
}

@Override
public Set<Long> callSerciceApi(List<Long> id) {

	long startTime = System.currentTimeMillis();
	List<CompletableFuture<ServiceResponse>> futures = numbers.stream()
			.map(id -> thirdPartyApiCaller.callService(id)).collect(Collectors.toList());

	Set<Long> result = futures.stream().map(CompletableFuture::join).filter(customer-> customer.isVipCustomer()())
			.map(ServiceResponse::getCustomerId).collect(Collectors.toSet());
  
    long endTime = System.currentTimeMillis();
	log.info("Total time to complete  [{}] , (endTime-startTime));

	return (result);
 }

}

That’s all, folks!


Published by HackerNoon on 2021/08/21