
Multi-threading on boats
- From the sandbox
- Tutorial

Task producer / consumer
This article is intended for beginners who have recently begun their acquaintance with the world of multithreading on JAVA.
Not so long ago, I did an internship at EPAM, and among the list of tasks that the young Padawan needed to complete was a multi-threading assignment. In fairness, it is worth noting that no one was forced to do this with low-level tools such as wait () and notify (), when you can use ArrayBlockingQueue, Semophore and the generally powerful API (java.util.concurrent) without great tricks. Moreover, it is considered bad practice to invent a bicycle instead of multi-threading ready for any scenario, while the API hides the overwhelming complexity of parallelism. I implemented both variants of native and using the API. Today I will show 1 option.
For starters, I suggest that you readnext article .
Link to this project .
The assignment was as follows:
- There are transport ships that sail to the tunnels and then sail to the berths for loading all kinds of goods.
- They pass through a narrow tunnel where only 5 ships can be at a time. The word “swim to the tunnels” means that the ships must appear from somewhere. There may be a limited number, that is, 10 or 100, or there may be an infinite number. We will call the word “Swim” a ship generator.
- The type of ships and their capacity can be different depending on the type of goods that need to be loaded onto the ship. That is, for TK, I came up with 3 types of ships (Bread, Banana and Clothing) and three types of capacity 10, 50, 100 pcs. goods. 3 types of ships * 3 types of capacity = 9 different types of ships.
- Then there are 3 types of berths for loading ships - Bread, Banana and Clothing. Each berth takes or calls to itself the ship he needs and starts loading it. In one second, the pier loads 10 units onto the ship. goods. That is, if the ship has a capacity of 50 pieces, then the berth will load it in 5 seconds of its operation.
The requirement was as follows:
- Correctly divide the task into parallelism.
- Synchronize streams, maintain data integrity. After all, restricting the access of threads to a shared resource is not difficult, but making them work in concert is already much more difficult.
- The operation of the ship generator should not depend on the work of the berths and vice versa.
- The shared resource must be Thread Safe (if there is one in the implementation)
- Threads should not be active if there are no tasks.
- Threads should not hold mutex if there are no tasks.
And so let's go!
For starters, I drew a diagram to make it clear what’s what.

Before solving any problem, it is good practice to visualize it on the surface of the paper. Especially when it comes to a multi-threaded application.
Let's break our application into components, in our case into classes. Project structure.

Class - Ship The class itself does not contain any logic. POJO.
Source code
public class Ship {
private int count;
private Size size;
private Type type;
public Ship(Size size, Type type) {
this.size = size;
this.type = type;
}
public void add(int count) {
this.count += count;
}
public boolean countCheck() {
if (count >= size.getValue()) {
return false;
}
return true;
}
public int getCount() {
return count;
}
public Type getType() {
return type;
}
public Size getSize() {
return size;
}
}
Since ships can differ in size and type, it was decided to create Enum classes to determine the properties of the ship. Size and Type. Size has predefined properties. (code)
Also, the Ship class has an int counter, how many goods are already loaded into it, if more than indicated in the property of the ship, then the boolean countCheck () method of the same class returns false, in other words, the ship is loaded.
Class - Tunnel
Source code
public class Tunnel {
private List store;
private static final int maxShipsInTunel = 5;
private static final int minShipsInTunel = 0;
private int shipsCounter = 0;
public Tunnel() {
store = new ArrayList<>();
}
public synchronized boolean add(Ship element) {
try {
if (shipsCounter < maxShipsInTunel) {
notifyAll();
store.add(element);
String info = String.format("%s + The ship arrived in the tunnel: %s %s %s", store.size(), element.getType(), element.getSize(), Thread.currentThread().getName());
System.out.println(info);
shipsCounter++;
} else {
System.out.println(store.size() + "> There is no place for a ship in the tunnel: " + Thread.currentThread().getName());
wait();
return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
public synchronized Ship get(Type shipType) {
try {
if (shipsCounter > minShipsInTunel) {
notifyAll();
for (Ship ship : store) {
if (ship.getType() == shipType) {
shipsCounter--;
System.out.println(store.size() + "- The ship is taken from the tunnel: " + Thread.currentThread().getName());
store.remove(ship);
return ship;
}
}
}
System.out.println("0 < There are no ships in the tunnel");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
The tunnel stores boats in it, this is done using List. Capacity tunnels 5 boats. In some way, you need to add boats to the tunnel and get them out. The 2 add () and get () methods do this. The get () method retrieves and removes a boat from the list according to the type it needs. As you can see in the add () and get () method, there is a check on the number of ships in the list. If ship> 5, then adding the boat does not work, and vice versa if ship <0, then taking the list does not work either. Tunnel is a shared resource in the context of multithreading. Shared resources in multithreading are evil. All the problems and complexity of multi-threaded programming comes precisely from shared resources. So they should be avoided.
What is the problem specifically with our shared resource.
First, the addition and retrieval of boats must be consistent between threads. If there is no consistency, there is a high probability of a Race Condition and data loss. We solve this with the synchronized keyword .
Secondly, the TK mentioned:
- Streams should not be active if there are no tasks.
- Streams should not hold mutex if there are no tasks.
For this, there is also a solution in the form of wait () and notifyAll ().
For the add () method and the thread that the phrase “no tasks” executes it means ship> 5. When ship> 5, the thread must stop its activity and wait. In order to stop the thread and remove the mutex we call wait (). The get () method has similar rules. Only for it ship <0. Wait, they will wait, but how to awaken them and say that they go back to work? This is where the notifyAll () method comes to our aid. Her task is to switch the thread from WAITING to RUNNABLE. When the add () method fires and at the same time ship <5, then it wakes up the thread that works with the get () method. And vice versa, when the get () method is triggered and ship> 0, it wakes up the stream working with the add () method. Some kind of recursion ... But be careful! There is a chance to catch DEADLOCK and go into endless waiting. (http: //www.quizful.
We go further ...
Class - ShipGenerator.
Now we need something that will generate the boats, and will do this in an independent flow. Class ShipGenerator.
Source code
public class ShipGenerator implements Runnable {
private Tunnel tunnel;
private int shipCount;
public ShipGenerator(Tunnel tunnel, int shipCount) {
this.tunnel = tunnel;
this.shipCount = shipCount;
}
@Override
public void run() {
int count = 0;
while (count < shipCount) {
Thread.currentThread().setName(" Generator ship");
count++;
tunnel.add(new Ship(getRandomSize(), getRandomType()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Type getRandomType() {
Random random = new Random();
return Type.values()[random.nextInt(Type.values().length)];
}
private Size getRandomSize() {
Random random = new Random();
return Size.values()[random.nextInt(Size.values().length)];
}
}
In order to add the task of generating boats into the stream, you must implement the Runnable interface. At the input of 2 parameters is Tunnel and the number of ships needed for generations. Next, override the run () method. The run () method is the method that will be directly executed by the thread. There we will place the logic for generating ships. The logic is simple. We generate boats randomly using Random and put them into the Tunnel shared resource.
A small digression. I said that it is necessary to add a task for the thread, since many mistakenly believe that by creating the Runnable interface they create a thread. In fact, they create a task for the flow. In other words, creating 1000 classes that implement the Runnable interface does not mean creating 1000 threads. This means creating 1000 tasks. And the number of threads that will perform these tasks will depend on the number of cores on your machine.
Class - PierLoader
Source code
public class PierLoader implements Runnable {
private Tunnel tunnel;
private Type shipType;
public PierLoader(Tunnel tunnel, Type shipType) {
this.tunnel = tunnel;
this.shipType =shipType;
}
@Override
public void run() {
while (true) {
try {
Thread.currentThread().setName("Loader "+shipType);
//Time to load the goods
Thread.sleep(500);
Ship ship = tunnel.get(shipType);
if(ship!=null)
while (ship.countCheck()){
Thread.sleep(100);
ship.add(10);
System.out.println(ship.getCount() + " Loaded ship. " + Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
The ship was generated, added to the tunnel, now it is necessary to remove it from the tunnel and load it with goods. To do this, we will create the PierLoader class, in other words, the pier. As you know, we have 3 types of ships, so we need to create 3 types of moorings working independently of each other. By analogy with ShipGenerator, we implement the Runnable interface. Input 2 parameters are Tunnel and Type (type of ships that receives berth data). Instead of add (), we call the get () method and the specific type of boats.
Note that I use sleep () only to emulate the work of loaders and generate ships (supposedly it takes time to load goods and the ships must sail), and not to delay them in order to fit into other flows. Never do this, there are many reasons for this, the most obvious: what will happen if the load on stream A increases and will work out much longer (not 1 second, but 3 seconds as you expected) than you put stream B to sleep (for 1 second) so that wait for stream A? Even depending on the OS, the JVM can behave differently. As you know, sleep does not free the resource, but holds it even during sleep, rather than wait (), which tells the thread to stop its work and release the lock.
Class - Main
Source code
public class Main {
public static void main(String[] args) {
System.out.println("Available number of cores: " + Runtime.getRuntime().availableProcessors());
Tunnel tunnel = new Tunnel();
ShipGenerator shipGenerator = new ShipGenerator(tunnel, 10);
PierLoader pierLoader1 = new PierLoader(tunnel, Type.DRESS);
PierLoader pierLoader2 = new PierLoader(tunnel, Type.BANANA);
PierLoader pierLoader3 = new PierLoader(tunnel, Type.MEAL);
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
service.execute(shipGenerator);
service.execute(pierLoader1);
service.execute(pierLoader2);
service.execute(pierLoader3);
service.shutdown();
}
}
He is the easiest. Initialize the Tunnel class we created. Next, we initialize ShipGenerator, in the constructor we pass the Tunnel object and the number of ships required for the flow to generate.
In the same way, we create 3 PierLoader objects for loading 3 ship types. We pass to the designer our common resource for threads called Tunnel. And the type of ship PierLoader should work out.
Next, we give all this stuff to the ExecutorService class. First, create a thread pool to run tasks. You determine the number of threads using the Runtime.getRuntime (). AvailableProcessors () command. It returns the number of streams available to us in int format. It makes no sense to create 1000 threads if you have only 8 available cores. Since only 8 will work out.
That's it! The multithreaded application is ready. You can play around with the number of tasks for flows, the time of generation of boats and loading goods. The result will always be different.
Conclusion
In conclusion, I would like to advise you to read the books “Java Philosophy” by Bruce Eckel. The chapter “Parallel execution” and “JAVA. Programming Methods ”N. Blinova. Chapter "Streaming Execution". It describes the basic practices of working with multithreading in Java and their complexity.
Before writing a multi-threaded application, you need to draw it on a piece of paper, make a sketch of your project. Avoid shared resources. Shared resources are evil. It is better that the flows do not know and hear nothing about each other at all. Use a ready-made API than do everything with your own pens. This will reduce development time and increase the reliability of your application. Test and profile the application. Perhaps you will find some other bottleneck that slows down your application and does not reveal the full potential of your multi-threaded application.