In Chapter 2, Managing Lots of Threads – Executors, we presented an example of a client/server application. We implemented a server to search data over the World Development Indicators of the World Bank and a client that makes multiple calls to that server to test the performance of the executor.
In this section, we will extend that example to add to it the following characteristics:
To implement these new characteristics, we have made the following changes to the server:
q;username;priority;codCountry;codIndicator;year
where username
is the name of the user, priority
is the priority of the query, codCountry
is the code of the country, codIndicator
is the code of the indicator, and year
is an optional parameter with the year you want to query.r;username;priority;codIndicator
where username
is the name of the user, priority
is the priority of the query, and codIndicator
is the code of the indicator you want to report.s;username;priority
where username
is the name of the user and priority
is the priority of the query.z;username;priority
where username
is the name of the user, and priority
is the priority of the query.c;username;priority
where username
is the name of the user, and priority
is the priority of the query.ConcurrentServer
and RequestTask
to take into account the new elements of the serverThe rest of the elements of the server (the cache system, the log system, and the DAO
class) are the same, so it won't be described again.
As we mentioned earlier, we have implemented our own executor to execute the tasks of the server. We also have implemented some additional but necessary classes to provide all the functionality. Let's describe these classes.
Our server will calculate the number of tasks that every user executes on it and the total execution time of these tasks. To store this data, we have implemented the ExecutorStatistics
class. It has two attributes to store the information:
public class ExecutorStatistics { private AtomicLong executionTime = new AtomicLong(0L); private AtomicInteger numTasks = new AtomicInteger(0);
These attributes are AtomicVariables
that support atomic operations on single variables. This allows you to use those variables in different threads without using any synchronization mechanisms. Then, it has two methods to increment the number of tasks and the execution time:
public void addExecutionTime(long time) { executionTime.addAndGet(time); } public void addTask() { numTasks.incrementAndGet(); }
Finally, we have added methods to get the value of both attributes, and we have overridden the toString()
method to get the information in a readable way:
@Override public String toString() { return "Executed Tasks: "+getNumTasks()+". Execution Time: "+getExecutionTime(); }
When you create an executor, you can specify a class to manage its rejected tasks. A task is rejected by the executor when you submit it after the shutdown()
or shutdownNow()
methods has been invoked in the executor.
To control this circumstance, we have implemented the RejectedTaskController
class. This class implements the RejectedExecutionHandler
interface and implements the rejectedExecution()
method:
public class RejectedTaskController implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { ConcurrentCommand command=(ConcurrentCommand)task; Socket clientSocket=command.getSocket(); try { PrintWriter out = new PrintWriter(clientSocket.getOutputStream(),true); String message="The server is shutting down." +" Your request can not be served." +" Shutting Down: " +String.valueOf(executor.isShutdown()) +". Terminated: " +String.valueOf(executor.isTerminated()) +". Terminating: " +String.valueOf(executor.isTerminating()); out.println(message); out.close(); clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } }
The rejectedExecution()
method is called once per task that is rejected and receives as parameters the task that has been rejected and the executor that has rejected the task.
When you submit a Runnable
object to an executor, it doesn't execute that Runnable
object directly. It creates a new object, an instance of the FutureTask
class, and it's this task that is executed by the worker thread of the executor.
In our case, to measure the execution time of the tasks, we have implemented our own FutureTask
implementation in the ServerTask
class. It extends the FutureTask
class and implements the Comparable
interface as follows:
public class ServerTask<V> extends FutureTask<V> implements Comparable<ServerTask<V>>{
Internally, it stores the query that is going to execute as a ConcurrentCommand
object:
private ConcurrentCommand command;
In the constructor, it uses the constructor of the FutureTask
class and stores the ConcurrentCommand
object:
public ServerTask(ConcurrentCommand command) { super(command, null); this.command=command; } public ConcurrentCommand getCommand() { return command; } public void setCommand(ConcurrentCommand command) { this.command = command; }
Finally, it implements the compareTo()
operation comparing the commands stored by the two ServerTask
instances to compare. This can be seen in the following code:
@Override public int compareTo(ServerTask<V> other) { return command.compareTo(other.getCommand()); }
Now that we have the auxiliary classes of the executor, we have to implement the executor itself. We have implemented the ServerExecutor
class with this purpose. It extends the ThreadPoolExecutor
class and has some internal attributes, as follows:
startTimes
: This is a ConcurrentHashMap
to store the start date of every task. The key of the class will be the ServerTask
object (a Runnable
object), and the value will be a Date
object.executionStatistics
: This is a ConcurrentHashMap
to store the statistics of use per user. The key will be the username and the value will be a ExecutorStatistics
object.CORE_POOL_SIZE
, MAXIMUM_POOL_SIZE
, and KEEP_ALIVE_TIME
: These are constants to define the characteristics of the executor.REJECTED_TASK_CONTROLLER
: This is a RejectedTaskController
class attribute to control the tasks rejected by the executor.This can be explained by the following code:
public class ServerExecutor extends ThreadPoolExecutor { private ConcurrentHashMap<Runnable, Date> startTimes; private ConcurrentHashMap<String, ExecutorStatistics> executionStatistics; private static int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static int MAXIMUM_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static long KEEP_ALIVE_TIME = 10; private static RejectedTaskController REJECTED_TASK_CONTROLLER = new RejectedTaskController(); public ServerExecutor() { super(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new PriorityBlockingQueue<>(), REJECTED_TASK_CONTROLLER); startTimes = new ConcurrentHashMap<>(); executionStatistics = new ConcurrentHashMap<>(); }
The constructor of the class calls to the parent constructor creating a PriorityBlockingQueue
class to store the tasks that will be executed in the executor. This class orders the elements according to the result of the execution of the compareTo()
method (so the elements stored in it have to implement the Comparable
interface). The utilization of this class will allow us to execute our tasks by priority.
Then, we have overridden some methods of the ThreadPoolExecutor
class. First is the beforeExecute()
method. This method is executed before the execution of every task. It receives the ServerTask
object as a parameter, and the thread that is going to execute the task. In our case, we store the actual date in the ConcurrentHashMap
with the start dates of every task, as follows:
protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); startTimes.put(r, new Date()); }
The next method is the afterExecute()
method. This method is executed after the execution of every task in the executor and receives the ServerTask
object that has been executed as parameter and a Throwable
object. This last parameter will have value only when an exception is thrown during the execution of the task. In our case, we will use this method to:
@Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); ServerTask<?> task=(ServerTask<?>)r; ConcurrentCommand command=task.getCommand(); if (t==null) { if (!task.isCancelled()) { Date startDate = startTimes.remove(r); Date endDate=new Date(); long executionTime= endDate.getTime() - startDate.getTime(); ; ExecutorStatistics statistics = executionStatistics.computeIfAbsent (command.getUsername(), n -> new ExecutorStatistics()); statistics.addExecutionTime(executionTime); statistics.addTask(); ConcurrentServer.finishTask (command.getUsername(), command); } else { String message="The task" + command.hashCode() + "of user" + command.getUsername() + "has been cancelled."; Logger.sendMessage(message); } } else { String message="The exception " +t.getMessage() +" has been thrown."; Logger.sendMessage(message); } }
Finally, we have overridden the newTaskFor()
method. This method will be executed to convert the Runnable
object that we send to the executor, using the submit()
method in the instance of FutureTask
that will be executed by the executor. In our case, we replace the default FutureTask
class with our ServerTask
object:
@Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new ServerTask<T>(runnable); }
We have included an additional method in the executor to write all the statistics stored in the executor in the log system. This method will be called at the end of the execution of the server, as you will see later. We have the following code:
public void writeStatistics() { for(Entry<String, ExecutorStatistics> entry: executionStatistics.entrySet()) { String user = entry.getKey(); ExecutorStatistics stats = entry.getValue(); Logger.sendMessage(user+":"+stats); } }
The command classes execute the different queries you can send to the server. You can send five different queries to our server:
ConcurrentQueryCommand
class.ConcurrentReportCommand
class.ConcurrentStatusCommand
class.ConcurrentCancelCommand
class.ConcurrentStopCommand
class.We also have the ConcurrentErrorCommand
class to manage the situation when an unknown command arrives at the server, and ConcurrentCommand
that is the base class of all the commands.
This is the base class of every command. It includes the behavior common to all the commands that includes the following:
The class extends the Command
class and implements the Comparable
and Runnable
interfaces. In the example of Chapter 2, Managing Lots of Threads – Executors, the commands were simple classes, but in this example, the concurrent commands are Runnable
objects that will be sent to the executor:
public abstract class ConcurrentCommand extends Command implements Comparable<ConcurrentCommand>, Runnable{
It has three attributes:
username
: This is to store the name of the user that sends the query.priority
: This is to store the priority of the query. It will determine the order of execution of the query.socket
: This is the socket used in the communication with the client.The constructor of the class initializes these attributes:
private String username; private byte priority; private Socket socket; public ConcurrentCommand(Socket socket, String[] command) { super(command); username=command[1]; priority=Byte.parseByte(command[2]); this.socket=socket; }
The main functionality of this class is in the abstract execute()
method, which will be implemented by every concrete command to calculate and return the results of the query, and in the run()
method. The run()
method calls the execute()
method, stores the result in the cache, writes the result in the socket, and closes all the resources used in the communication. We have the following:
@Override public abstract String execute(); @Override public void run() { String message="Running a Task: Username: " +username +"; Priority: " +priority; Logger.sendMessage(message); String ret=execute(); ParallelCache cache = ConcurrentServer.getCache(); if (isCacheable()) { cache.put(String.join(";",command), ret); } try { PrintWriter out = new PrintWriter(socket.getOutputStream(),true); out.println(ret); socket.close(); } catch (IOException e) { e.printStackTrace(); } System.out.println(ret); }
Finally, the compareTo()
method uses the priority attribute to determine the order of the tasks. This will be used by the PriorityBlockingQueue
class to order the tasks, so the tasks with a higher priority will be executed first. Take into account that a task has higher priority when the getPriority()
method returns a lower value. If the getPriority()
of a task returns 1
, that task will have a higher priority than a task that getPriority()
method returns 2
:
@Override public int compareTo(ConcurrentCommand o) { return Byte.compare(o.getPriority(), this.getPriority()); }
We have made minor changes in the classes that implement the different commands, and we added a new one implemented by the ConcurrentCancelCommand
class. The main logic of these classes is included in the execute()
method that calculates the response to the query and returns it as a string.
The execute()
method of the new ConcurrentCancelCommand
makes a call to the cancelTasks()
method of the ConcurrentServer
class. This method will stop the execution of all the pending tasks associated with the user passed as a parameter:
@Override public String execute() { ConcurrentServer.cancelTasks(getUsername()); String message = "Tasks of user " +getUsername() +" has been cancelled."; Logger.sendMessage(message); return message; }
The execute()
method of ConcurrentReportCommand
uses the query()
method of the WDIDAO
class to get the data requested by the user. In Chapter 2, Managing Lots of Threads – Executors, you can find the implementation of this method. The implementation is almost the same. The only difference is command array indices as follows:
@Override public String execute() { WDIDAO dao=WDIDAO.getDAO(); if (command.length==5) { return dao.query(command[3], command[4]); } else if (command.length==6) { try { return dao.query(command[3], command[4], Short.parseShort(command[5])); } catch (NumberFormatException e) { return "ERROR;Bad Command"; } } else { return "ERROR;Bad Command"; } }
The execute()
method of ConcurrentQueryCommand
uses the report()
method of the WDIDAO
class to get the data. In Chapter 2, Managing Lots of Threads – Executors, you also can find the implementation of this method. The implementation here is almost the same. The only difference is the command array index:
@Override public String execute() { WDIDAO dao=WDIDAO.getDAO(); return dao.report(command[3]); }
ConcurrentStatusCommand
has an additional parameter in its constructor: the Executor
object, which will execute the commands. This command uses this object to obtain information about the executor and send it as a response to the user. The implementation is almost the same as in Chapter 2, Managing Lots of Threads – Executors. We have used the same methods to get the status of the Executor
object.
The ConcurrentStopCommand
and ConcurrentErrorCommand
are also the same as in Chapter 2, Managing Lots of Threads – Executors, so we haven't included their source code.
The server part receives the queries from the clients of the server and creates the command classes that executes the queries and sends them to the executor. It is implemented by two classes:
ConcurrentServer
class: It includes the main()
method of the server and additional methods to cancel tasks and finish the execution of the systemRequestTask
class: This class creates the commands and sends them to the executorThe main difference with the example of Chapter 2, Managing Lots of Threads – Executors is the role of the RequestTask
class. In the SimpleServer
example, the ConcurrentServer
class creates a RequestTask
object per query and sends them to the executor. In this example, we will only have an instance of the RequestTask
that will be executed as a thread. When the ConcurrentServer
receives a connection, it stores the socket to communicate with the client in a concurrent list of pending connections. The RequestTask
thread reads that socket, processes the data sent by the client, creates the corresponding command, and sends the command to the executor.
The main reason for this change is to leave in the tasks executed by the executor only the code of the queries and leave the preprocessed code outside the executor.
The ConcurrentServer
class needs some internal attributes to work properly:
ParallelCache
instance to use the cache system.ServerSocket
instance to get the connections from the clients.boolean
value to know when it has to stop its execution.LinkedBlockingQueue
to store the sockets of the clients that sends a message to the server. These sockets will be processed by the RequestTask
class.ConcurrentHashMap
to store the Future
objects associated with every task executed in the executor. The key will be the username of the users that sends the queries, and the values will be another Map
whose key will be the ConcurrenCommand
objects, and the value will be the Future
instance associated with that task. We use these Future
instances to cancel the execution of tasks.RequestTask
instance to create the commands and sends them to the executor.Thread
object to execute the RequestTask
object.The code for this is as follows:
public class ConcurrentServer { private static ParallelCache cache; private static volatile boolean stopped=false; private static LinkedBlockingQueue<Socket> pendingConnections; private static ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController; private static Thread requestThread; private static RequestTask task;
The main()
method of this class initializes these objects and opens the ServerSocket
instance to listen to the connections from the clients. In addition, it creates the RequestTask
object and executes it as a thread. It will be in a loop until the shutdown()
method changes the value of the stopped attribute. After this, it waits for the finalization of the Executor
object, using the endTermination()
method of the RequestTask
object, and shuts down the Logger
system and the RequestTask
object with the finishServer()
method:
public static void main(String[] args) { WDIDAO dao=WDIDAO.getDAO(); cache=new ParallelCache(); Logger.initializeLog(); pendingConnections = new LinkedBlockingQueue<Socket>(); taskController = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>>(); task=new RequestTask(pendingConnections, taskController); requestThread=new Thread(task); requestThread.start(); System.out.println("Initialization completed."); serverSocket= new ServerSocket(Constants.CONCURRENT_PORT); do { try { Socket clientSocket = serverSocket.accept(); pendingConnections.put(clientSocket); } catch (Exception e) { e.printStackTrace(); } } while (!stopped); finishServer(); System.out.println("Shutting down cache"); cache.shutdown(); System.out.println("Cache ok" + new Date()); }
It includes two methods to shut down the executor of the server. The shutdown()
method changes the value of the stopped
variable and closes the serverSocket
instance. The finishServer()
method stops the executor, interrupts the thread that executes the RequestTask
object, and shuts downs the Logger
system. We divided this process into two parts to use the Logger
system until the last instruction of the server:
public static void shutdown() { stopped=true; try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } private static void finishServer() { System.out.println("Shutting down the server..."); task.shutdown(); System.out.println("Shutting down Request task"); requestThread.interrupt(); System.out.println("Request task ok"); System.out.println("Closing socket"); System.out.println("Shutting down logger"); Logger.sendMessage("Shutting down the logger"); Logger.shutdown(); System.out.println("Logger ok"); System.out.println("Main server thread ended"); }
The server includes the method that cancels the tasks associated with a user. As we mentioned before, the Server
class uses a nested ConcurrentHashMap
to store all the tasks associated with a user. First, we obtain the Map
with all the tasks of a user and then we process all the Future
objects of those tasks calling to the cancel()
method of the Future
objects. We pass the value true
as a parameter, so if the executor is running a task from that user, it will be interrupted. We have included the necessary code to avoid the cancellation of ConcurrentCancelCommand
:
public static void cancelTasks(String username) { ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username); if (userTasks == null) { return; } int taskNumber = 0; Iterator<ServerTask<?>> it = userTasks.values().iterator(); while(it.hasNext()) { ServerTask<?> task = it.next(); ConcurrentCommand command = task.getCommand(); if(!(command instanceof ConcurrentCancelCommand) && task.cancel(true)) { taskNumber++; Logger.sendMessage("Task with code "+command.hashCode()+"cancelled: "+command.getClass().getSimpleName()); it.remove(); } } String message=taskNumber+" tasks has been cancelled."; Logger.sendMessage(message); }
Finally, we have included a method to eliminate the Future
object associated with tasks from our nested map of ServerTask
objects when that task finishes its execution normally. It's the finishTask()
method:
public static void finishTask(String username, ConcurrentCommand command) { ConcurrentMap<ConcurrentCommand, ServerTask<?>> userTasks = taskController.get(username); userTasks.remove(command); String message = "Task with code "+command.hashCode()+" has finished"; Logger.sendMessage(message); }
The RequestTask
class is the intermediary between the ConcurrentServer
class, which connects to the clients, and the Executor
class, which executes the concurrent tasks. It opens the socket with the client, reads the query data, creates the adequate command, and sends it to the executor.
It uses some internal attributes:
LinkedBlockingQueue
where the ConcurrentServer
class stores the client socketsServerExecutor
to execute the commands as concurrent tasksConcurrentHashMap
to store the Future
objects associated with the tasksThe constructor of the class initializes all these objects:
public class RequestTask implements Runnable { private LinkedBlockingQueue<Socket> pendingConnections; private ServerExecutor executor = new ServerExecutor(); private ConcurrentMap<String, ConcurrentMap<ConcurrentCommand, ServerTask<?>>> taskController; public RequestTask(LinkedBlockingQueue<Socket> pendingConnections, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Future<?>>> taskController) { this.pendingConnections = pendingConnections; this.taskController = taskController; }
The main method of this class is the run()
method. It executes a loop until the thread is interrupted processing the sockets stored in the pendingConnections
object. In this object, the ConcurrentServer
class stores the sockets to communicate with the different clients that sends a query to the server. It opens the socket, reads the data, and creates the corresponding command. This also sends the command to the executor and stores the Future
object in the double ConcurrentHashMap
associated with the hashCode
of the task and with the user that sent the query:
public void run() { try { while (!Thread.currentThread().interrupted()) { try { Socket clientSocket = pendingConnections.take(); BufferedReader in = new BufferedReader(new InputStreamReader (clientSocket.getInputStream())); String line = in.readLine(); Logger.sendMessage(line); ConcurrentCommand command; ParallelCache cache = ConcurrentServer.getCache(); String ret = cache.get(line); if (ret == null) { String[] commandData = line.split(";"); System.out.println("Command: " + commandData[0]); switch (commandData[0]) { case "q": System.out.println("Query"); command = new ConcurrentQueryCommand(clientSocket, commandData); break; case "r": System.out.println("Report"); command = new ConcurrentReportCommand (clientSocket, commandData); break; case "s": System.out.println("Status"); command = new ConcurrentStatusCommand(executor, clientSocket, commandData); break; case "z": System.out.println("Stop"); command = new ConcurrentStopCommand(clientSocket, commandData); break; case "c": System.out.println("Cancel"); command = new ConcurrentCancelCommand (clientSocket, commandData); break; default: System.out.println("Error"); command = new ConcurrentErrorCommand(clientSocket, commandData); break; } ServerTask<?> controller = (ServerTask<?>)executor.submit(command); storeContoller(command.getUsername(), controller, command); } else { PrintWriter out = new PrintWriter (clientSocket.getOutputStream(),true); out.println(ret); clientSocket.close(); } } catch (IOException e) { e.printStackTrace(); } } } catch (InterruptedException e) { // No Action Required } }
The storeController()
method is the one that stores the Future
object in the double ConcurrentHashMap
:
private void storeContoller(String userName, ServerTask<?> controller, ConcurrentCommand command) { taskController.computeIfAbsent(userName, k -> new ConcurrentHashMap<>()).put(command, controller); }
Finally, we have included two methods to manage the execution of the Executor
class, one to call the shutdown()
method for the executor and an other to wait for its finalization. Remember that you must explicitly call the shutdown()
or shutdownNow()
methods to end the execution of an executor. If not, the program won't terminate. Look at the following code:
public void shutdown() { String message="Request Task: " +pendingConnections.size() +" pending connections."; Logger.sendMessage(message); executor.shutdown(); } public void terminate() { try { executor.awaitTermination(1,TimeUnit.DAYS); executor.writeStatistics(); } catch (InterruptedException e) { e.printStackTrace(); } }
Now it's time to test our server. In this case, we won't worry much about the execution time. The main objective of our test is to check whether the new features work well.
We have split the client part into the following two classes:
We have included an executor to execute the concurrent requests to the server to increase the level of concurrency of the client.
In the following image, you can see the results of the cancellation of tasks:
In this case, four tasks of the USER_2 user have been canceled.
The following image shows the final statistics about the number of tasks and the execution time of every user:
18.119.10.194