Some of the real life example of parallel processing in today’s world is to call several independent web service and aggregate the response and return it to caller. One way to handle this is by creating Thread per task , store response in shared data object and aggregate response once all thread done processing and return the result.

Disadvantage with Thread (runnable) model

  • Thread creation/tear down takes system resources and time. Too many request can cause frequent thread creation and tear down.
  • Too many thread creation can cause higher memory use.
  • There is a limit on number of thread per JVM. It can be defined by
  • No life cycle management.

Disadvantage of run() and join()

  • run() returns void and does not throw checked exception.So, in case of application issues, shared data object that thread updates may contains bad data.
  • It will be not clear if thread1.join(100) returns because of successful execution  or timeout.
  • In case of multiple threads, it will be difficult to know which thread was timed out or completed unsuccessfully.

Executor framework

Java concurrency packages defines Executor framework that decouple the producer (task starter thread) from consumer (task executor thread). This is great advantage, because now we can manage Task life-cycle.

Callable and Future

Callable is interface describe a task like Runnable but it throws Exception and returns a value.
Callable.java
public interface Callable<V>{
      V call() throws Exception;
}
Runnable.java
public interface Runnable{
      public abstract void run();
}
Future Interface represents the life-cycle of task and provide method to:
  • get result
  • get result with timeout feature
  • cancel the task

Example

Let look into some code. I will make a Web Service and Database call in parallel. Both of these call takes Name of a customer and  returns all of his mailing addresses. Code will handle following alternative path.
  • Web service call should return within 2 seconds, if not system should return result from DB query.
  • DB call has timeout of 1 second and if it fails then system should stop processing web service call (if still running ).

Request and Response are two abstract classes that hold request and response from task. I am using Abstract Factory pattern to create Tasks (WS1Task and DBTask) from concrete WS1Request and DBRequest. It will make my code to keep adding new tasks without worrying about lot of code change. Task implements Callable and return Response object. DB calls and WS calls will be executed from call();

package parallel.test.message;
import parallel.test.task.Task;
public abstract class Request {
    protected int timeOutInSeconds = Integer.MAX_VALUE;
 public REQUEST_TYPE requestType;
 public enum REQUEST_TYPE {
  WS1, WS2,DB_UPDATE, DB_SIMPLE_QUERY
 };
    public int getTimeOutInSeconds() {
        return timeOutInSeconds;
    }
    public void setTimeOutInSeconds(int timeOutInSeconds) {
        this.timeOutInSeconds = timeOutInSeconds;
    }
 public abstract REQUEST_TYPE getRequestType();
 public abstract Task createTask();
 public abstract boolean equals(Object otherRequest);
 public abstract int hashCode();
}
package parallel.test.message;

import parallel.test.task.Task;
import parallel.test.task.WS1Task;

public class WS1Request extends Request {
	final REQUEST_TYPE reuquestType = REQUEST_TYPE.WS1;
	private String requestId;

    public WS1Request(String requestId){
    	this.requestId = requestId;
    }

	public REQUEST_TYPE getRequestType() {
        return reuquestType;
    }

    public Task createTask() {
        WS1Task task = new WS1Task();
        task.setRequest(this);
        return task;
    }

    public boolean equals(Object otherRequest){
        if(otherRequest == this) return true;
        if(otherRequest == null || !otherRequest.getClass().equals(this.getClass())) return false;
        String otherRequestId = (String) otherRequest;
        return otherRequestId.equals(requestId);
   }

    public int hashCode(){
        int result = 7;
        int requestIdI = requestId.hashCode();
        result = 37 * result + requestIdI;
        return result;

    }

}
package parallel.test.message;

import parallel.test.task.DBQueryTask;
import parallel.test.task.Task;

public class DBQueryRequest extends Request {
	final REQUEST_TYPE reuquestType = REQUEST_TYPE.DB_SIMPLE_QUERY;
	private String requestId;

    public DBQueryRequest(String requestId){
    	this.requestId = requestId;
    }
	public REQUEST_TYPE getRequestType() {
        return reuquestType;
    }

    public Task createTask() {
        DBQueryTask task = new DBQueryTask();
        task.setRequest(this);
        return task;
    }

    public boolean equals(Object otherRequest){
        if(otherRequest == this) return true;
        if(otherRequest == null || !otherRequest.getClass().equals(this.getClass())) return false;
        String otherRequestId = (String) otherRequest;
        return otherRequestId.equals(requestId);
   }

    public int hashCode(){
        int result = 7;
        int requestIdI = requestId.hashCode();
        result = 37 * result + requestIdI;
        return result;

    }

}
package parallel.test.message;

public abstract class Response {

	public RESPONSE_TYPE responseType;
    protected Exception exception;

	public enum RESPONSE_TYPE {
		WS1, WS2,DB_UPDATE,DB_SIMPLE_QUERY,FAULT
	};

    public void setRequestType(){
        getResponseType();
    }

    public abstract RESPONSE_TYPE getResponseType();

    public Exception getException(){
        return exception;
    }

    public void setException(Exception e){
        this.exception = e;
    }

}
package parallel.test.message;

import java.util.List;

public class WS1Response extends Response {
    private List allAddresses;

    public RESPONSE_TYPE getResponseType(){
        return RESPONSE_TYPE.WS1;
    }

    public List getAllAddresses() {
		return allAddresses;
	}

	public void setAllAddresses(List allAddresses) {
		this.allAddresses = allAddresses;
	}

}
package parallel.test.message;

import java.util.List;

public class DBQueryResponse extends Response {
    private List allAddresses;

    public RESPONSE_TYPE getResponseType(){
        return RESPONSE_TYPE.DB_SIMPLE_QUERY;
    }

    public List getAllAddresses() {
		return allAddresses;
	}

	public void setAllAddresses(List allAddresses) {
		this.allAddresses = allAddresses;
	}

}
package parallel.test.message;

public class FaultResponse extends Response{

    public FaultResponse(){}

	public FaultResponse(Exception s){
    	this.exception = s;
    }
	public RESPONSE_TYPE getResponseType(){
        return RESPONSE_TYPE.FAULT;
    }
}
package parallel.test.task;

import java.util.concurrent.Callable;

import parallel.test.message.Request;
import parallel.test.message.Response;

public abstract class Task implements Callable {

    public abstract void setConectionParam(String connUrl, String userName, String password) ;

    public abstract void setRequest(Request request);

    public abstract Request getRequest();

    public abstract Response call() throws Exception;

}
package parallel.test.task;

import java.util.ArrayList;
import java.util.List;

import parallel.test.message.Request;
import parallel.test.message.Response;
import parallel.test.message.WS1Request;
import parallel.test.message.WS1Response;

public class WS1Task extends Task {

	private WS1Request request;
    private String endPointUrl;
    private String user;
    private String pass;

    public void setConectionParam(String endPointURL, String user, String pass) {
        this.endPointUrl = endPointURL;
        this.user = user;
        this.pass = pass;
    }

    public void setRequest(Request request) {
        this.request = (WS1Request) request;
    }

    public Request getRequest() {
        return request;
    }

    public Response call() throws TaskExecutionException{
        WS1Response response = new WS1Response();
        try{
            //call web service
            //get response
        	//fake response for now
        	List all = new ArrayList();
        	all.add("1234 Main St, Oakland CA 433322");
        	all.add("1234 Wasington Anv, San Francisco CA 33333");
        	response.setAllAddresses(all);
        	return response;
        }catch(Exception e){
            TaskExecutionException taskExecutionException = new TaskExecutionException(e.getMessage());
            taskExecutionException.initCause(e);
            throw taskExecutionException;
        }

    }
}
package parallel.test.task;

import java.util.ArrayList;
import java.util.List;

import parallel.test.message.DBQueryRequest;
import parallel.test.message.DBQueryResponse;
import parallel.test.message.Request;
import parallel.test.message.Response;

public class DBQueryTask extends Task {

	private DBQueryRequest request;
    private String dbUrl;
    private String user;
    private String pass;

    public void setConectionParam(String dbURL, String user, String pass) {
        this.dbUrl = dbURL;
        this.user = user;
        this.pass = pass;
    }

    public void setRequest(Request request) {
        this.request = (DBQueryRequest) request;
    }

    public Request getRequest() {
        return request;
    }

    public Response call() throws TaskExecutionException{
        DBQueryResponse response = new DBQueryResponse();
        try{
            //call DB
            //get response
        	//fake response for now
         List all = new ArrayList();
        	all.add("1234 Main St, Oakland CA 433322");
        	all.add("1234 Wasington Anv, San Francisco CA 33333");
        	all.add("8765 NotA Ave, City QQ 77777");
        	response.setAllAddresses(all);
         return response;
        }catch(Exception e){
            TaskExecutionException taskExecutionException = new TaskExecutionException(e.getMessage());
            taskExecutionException.initCause(e);
            throw taskExecutionException;
        }
            }
}
package parallel.test.task;

public class TaskExecutionException extends Exception{

	private static final long serialVersionUID = 1L;

	public TaskExecutionException(){}

    public TaskExecutionException(String message){
        super(message);
    }

    public TaskExecutionException(String message, Throwable throwable){
        super(message,throwable);
    }

    public TaskExecutionException(Throwable throwable){
        super(throwable);
    }

}

TaskManager is main class that manages the workflow. It creates ThreadPoolExecutors for WebService and DBQuery calls. submit method in ThreadPoolExcutor takes Callable as parameter and returns Future representing pending result of the task. We can use the Future to cancel tasks if we need.

package parallel.test.broker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import parallel.test.message.FaultResponse;
import parallel.test.message.Request;
import parallel.test.message.Response;
import parallel.test.pool.WSThreadFactory;
import parallel.test.pool.WSThreadPoolExecutor;
import parallel.test.task.Task;

public class TaskManager {

	private static ThreadPoolExecutor webServiceExecutor;
	private static ThreadPoolExecutor dbExecutor;
    private static int CORE_POOL_SIZE         = 10;
    private static int MAX_POOL_SIZE          = 15;
    private static int KEEP_ALIVE_TIME        = 10;
    private static int BLOCKING_QUEUE_CPACITY = 10;

	void init() {
		webServiceExecutor = new WSThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue(BLOCKING_QUEUE_CPACITY), new WSThreadFactory("WSThreadPool"));
		dbExecutor =  new WSThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue(BLOCKING_QUEUE_CPACITY), new WSThreadFactory("DBThreadPool"));
	}

    public Map processRequest(List requests) {
        if(requests == null || requests.isEmpty()) throw new IllegalArgumentException("request is null or empty.");
        Iterator itr = requests.iterator();
        List taskList = new ArrayList();
        while(itr.hasNext()){
            Request request = itr.next();
            Task task = request.createTask();
            taskList.add(task);
        }
        TaskManager parallelTaskManager = new TaskManager();
        parallelTaskManager.init();
        return parallelTaskManager.processInParallel(taskList);
    }

	Map processInParallel(List listOfTask) {
		Iterator taskItr = listOfTask.iterator();
		Map returnMap = Collections
				.synchronizedMap(new HashMap());
		Map responseMap = new HashMap();
		while (taskItr.hasNext()) {
			Task task = taskItr.next();
			Request req = task.getRequest();
			Request.REQUEST_TYPE rtype = req.getRequestType();
			ThreadPoolExecutor threadPoolExecutor = getExecutor(rtype);
			if (threadPoolExecutor == null)
				throw new RuntimeException(
						"Could not get proper ThreadPoolExecutor, aborting the tasks..");
//submit(task) will return Future			
returnMap.put(req, threadPoolExecutor.submit(task));
		}
		Request request;
		Future response;
		List requestListRequests = new ArrayList(
				returnMap.keySet());
		for (Request request1 : requestListRequests) {
			request = request1;
			response = returnMap.get(request);
			try {
//get with timeout on Future				
responseMap.put(request, response.get(
						request.getTimeOutInSeconds(), TimeUnit.SECONDS));
			} catch (TimeoutException e) {
				createFault(responseMap, request, e);
			} catch (ExecutionException e) {
				createFault(responseMap, request, e);
			} catch (InterruptedException e) {
				createFault(responseMap, request, e);
			} catch (CancellationException e) {
				createFault(responseMap, request, e);
//cancel all other task if one fails (optional) 				
for (Request req : requestListRequests) {
					returnMap.get(req).cancel(true);
				}

			} finally {
				response.cancel(true);
			}
		}
		return responseMap;
	}

	private void createFault(Map responseMap,
			Request request, Exception e) {
		FaultResponse faultResponse = new FaultResponse();
		faultResponse.setException(e);
		responseMap.put(request, faultResponse);
	}
	private ThreadPoolExecutor getExecutor(Request.REQUEST_TYPE requestType) {
		switch (requestType) {
		case DB_SIMPLE_QUERY:
			return dbExecutor;
		case WS1:
			return webServiceExecutor;
		case WS2:
			return dbExecutor;
		case DB_UPDATE:
			return dbExecutor;
		}
		return null;
	}

}
package parallel.test.pool;
import java.util.concurrent.atomic.AtomicInteger;
public class WSThread extends Thread{

    public static final String NAME = "WSThread";
    private static final AtomicInteger created = new AtomicInteger();

    public WSThread(Runnable r, String name){
        super(r,name+"-"+created.incrementAndGet());
    }

    public void run(){
        super.run();
    }

}
package parallel.test.pool;
import java.util.concurrent.ThreadFactory;
public class WSThreadFactory implements ThreadFactory {
    private final String poolName;

    public WSThreadFactory(String poolName){
        this.poolName = poolName;
    }

    public Thread newThread(Runnable r) {
        return new WSThread(r,poolName);
    }
}
package parallel.test.pool;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class WSThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadLocal startTime = new ThreadLocal();
    private final AtomicLong totalTime = new AtomicLong();
    private final AtomicLong numberOfTasks = new AtomicLong();

    public WSThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    public WSThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    public WSThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    public WSThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        System.out.println("WSThreadPoolExecutor init....");

    }

    protected void beforeExecute(Thread t, Runnable r){
        super.beforeExecute(t,r);
        System.out.println("Thread :"+t+" start "+r);
        startTime.set(System.nanoTime());
    }

    protected void afterExecution(Runnable r, Throwable t){
        try{
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numberOfTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            System.out.println("Thread :"+t+" end "+r+" , total time taken="+taskTime);

        }finally{
            super.afterExecute(r,t);
        }
    }

    protected void terminated(){
        try{
        	System.out.println("Terminated: average time="+(totalTime.get()/numberOfTasks.get()));
        }finally{
            super.terminated();
        }
    }
}

Limitation of Task parallelism

Executing tasks in parallel is advantage if all tasks are independent and execution time are similar for individual tasks.
If one of the task takes too long to execute compare to other. we will not get significant performance improvements by trying to parallelize.

Reference : Java Concurrency in Practice.

Advertisements