Lab Multithreading in Java: Parallel Copy

  • Tutorial
Good laboratory multithreading (simple, understandable, non-trivial and useful in the national economy) is a rarity. I offer you one condition and four laboratory work on elementary multithreading in Java.

I also teach the Scala for Java Developers course on the online education platform udemy.com (similar to Coursera / EdX).

Conditions


This is an implementation of a single-threaded byte copier from InputStream to OutputStream. Copying occurs in the thread calling the copy (...) method
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
    public static void copy(InputStream src, OutputStream dst)throws IOException{
        try (InputStream src0 = src; OutputStream dst0 = dst) {
            int b;
            while ((b = src.read()) != -1) {
                dst.write(b);
            }
        }
    }
}


This is a single-threaded copier implementation with arrays from InputStream to OutputStream. Copying occurs in the thread calling the copy (...) method
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
    public static void copy(InputStream src, OutputStream dst)throws IOException{
        byte[] buff = new byte[128];
        try (InputStream src0 = src; OutputStream dst0 = dst) {
            int count;
            while ((count = src.read(buff)) != -1) {
                dst.write(buff, 0, count);
            }
        }
    }
}


This is an implementation of a multi-threaded copier arrays from InputStream to OutputStream. We start reading and writing on a separate new stream and connect them with a blocking limited queue for transferring data from the reader to the writer
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
public class CopyUtil {
    public static void copy(final InputStream src, final OutputStream dst) throws IOException {
        // reader-to-writer byte[]-channel
        final BlockingQueue buffer = new ArrayBlockingQueue<>(64);
        // exception-channel from reader/writer threads?
        final AtomicReference ex = new AtomicReference<>();
        final ThreadGroup group = new ThreadGroup("read-write") {
            public void uncaughtException(Thread t, Throwable e) {ex.set(e);}
        };
        // reader from 'src'
        Thread reader = new Thread(group, () -> {
            try (InputStream src0 = src) {              // 'src0' for auto-closing
                while (true) {
                    byte[] data = new byte[128];        // new data buffer
                    int count = src.read(data, 1, 127); // read up to 127 bytes
                    data[0] = (byte) count;             // 0-byte is length-field
                    buffer.put(data);                   // send to writer
                    if (count == -1) {break;}           // src empty
                }
            } catch (Exception e) {group.interrupt();}  // interrupt writer
        });
        reader.start();
        // writer to 'dst'
        Thread writer = new Thread(group, () -> {
            try (OutputStream dst0 = dst) {      // 'dst0' for auto-closing
                while (true) {
                    byte[] data = buffer.take(); // get new data from reader
                    if (data[0] == -1) {break;}  // its last data
                    dst.write(data, 1, data[0]); // 
                }
            } catch (Exception e) {group.interrupt();}  // interrupt writer
        });
        writer.start();
        // wait to complete read/write operations
        try {
            reader.join(); // wait for reader
            writer.join(); // wait for writer
        } catch (InterruptedException e) {throw new IOException(e);}
        if (ex.get() != null) {throw new IOException(ex.get());}
    }
}


The following test can be used to verify that the copy is correct.
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
public class Test {
    public static void main(String[] args) throws IOException {
        Random rnd = new Random(0);
        byte[] testData = new byte[64 * 1024];
        rnd.nextBytes(testData);
        ByteArrayOutputStream dst = new ByteArrayOutputStream();
        CopyUtil.copy(new ByteArrayInputStream(testData), dst);
        if (!Arrays.equals(testData, dst.toByteArray())) {
            throw new AssertionError("Lab decision wrong!");
        } else {
            System.out.println("OK!");
        }
    }
}


Exercise 1


In the last two-threaded solution, we start two streams - for reading and for writing. Rewrite the code so that reading is carried out in a new stream, and recording is performed by the stream that called copy (...). By the way, then it will be possible to get rid of a couple of join-s, since the stream at the receiving end of the buffer knows when the data ended.

Task # 2


In the last two-threaded solution, the reader constantly creates new byte [] - buffers, passes them to the writer, and he sends them to the GC. Create a separate reverse queue of empty buffers from writer to reader.

Task # 3


In all three code examples, we implemented data transfer from one reader to one writer. Implement a multi-threaded solution for transferring data from one reader to many writers. All writers receive identical data. The reader and writers each work in their own separate thread. Do not create separate copies of the data for each writer - let the writers read from one of all the buffers, but store these buffers simultaneously in different queues (a separate queue runs from the reader to each writer).
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class CopyUtil {
    public static void copy(InputStream src, OutputStream ... dst) throws IOException {
        // some code
    }
}


Task # 4


Do the previous task # 3 but do not form a star topology, where in the center the reader and rays emanate to the writers, and the ring topology. In which the reader and writers line up in a circle and pass the buffer in a circle. The reader is the first writer, the first writer is the second, ... the last writer is the reader. And then the reader can reuse the buffer.

Contacts


I am developing a Java Core programming course (online course) .
email: GolovachCourses@gmail.com
skype: GolovachCourses

Also popular now: