Spawning a thread for image processing

In our first example, we'll create a microservice that expects a request with an image, loads it completely to the memory, sends it to a thread for resizing, and waits for the result. Let's start by creating a thread that expects image data and responses. To receive the request, we'll use the mpsc::channel module and oneshot::channel for responses, because multiple clients can't send requests and we only expect one response per image. For the requests, we'll use the following struct:

struct WorkerRequest {
buffer: Vec<u8>,
width: u16,
height: u16,
tx: oneshot::Sender<WorkerResponse>,
}

WorkerRequest contains the buffer field for binary image data, the desired width and height of the resized image, and a tx sender of the oneshot::Sender type for sending a WorkerReponse response.

The response is presented by a type alias to the Result type, which holds the successful result with the binary data of the resized image or an error:

type WorkerResponse = Result<Vec<u8>, Error>;

We can now create a thread that supports these messages and carries out resizing:

fn start_worker() -> mpsc::Sender<WorkerRequest> {
let (tx, rx) = mpsc::channel::<WorkerRequest>(1);
thread::spawn(move || {
let requests = rx.wait();
for req in requests {
if let Ok(req) = req {
let resp = convert(req.buffer, req.width, req.height).map_err(other);
req.tx.send(resp).ok();
}
}
});
tx
}

Since we use a single thread for all resizing requests, we can use the wait method of the Sender and Receiver for interacting with clients. The preceding code creates a channel from the mpsc module that can keep one message in a buffer. We don't need more space in the buffer for the message, because resizing takes a long period of time and we just need to send the next message to a receiver while we're processing an image.

We use the thread::spawn method to spawn a new thread with a processing function. The Receiver::wait method converts a Receiver to a blocking iterator of the incoming messages. We use a simple loop to iterate over all the requests. The reactor isn't needed here. If the message is received successfully, we'll process the request. To convert the image, we use the convert method that's described in the following code snippet. We send the result to oneshot::Sender, which doesn't have a wait method; all we need to do is call the send method, which returns a Result. This operation won't block and doesn't need a reactor, because it uses UnsafeCell internally to provide a value for the Receiver that implements the Future trait.

To resize the image, we use an image crate. This contains a rich set of methods for image transformation and supports multiple image formats. Take a look at the implementation of the convert function:

fn convert(data: Vec<u8>, width: u16, height: u16) -> ImageResult<Vec<u8>> {
let format = image::guess_format(&data)?;
let img = image::load_from_memory(&data)?;
let scaled = img.resize(width as u32, height as u32, FilterType::Lanczos3);
let mut result = Vec::new();
scaled.write_to(&mut result, format)?;
Ok(result)
}

The function expects binary data of an image, related to its width and height. The convert function returns an ImageResult, which is a type alias for Result with ImageError as the error type. We use this error type, because some methods inside the convert function implementation can return errors of this type.

The first line of the implementation tries to guess the format of incoming data with the guess_format function. We can use this format value later on to use the same format for the output image. After that, we use the load_from_memory function to read an image from a data vector. This call reads the data and actually doubles the amount of consumed memory for the image  be aware of this if you want to process multiple images simultaneously. After resizing, we write the scaled image to a vector and return it as a Result. The scaled image also consumes some memory, meaning we're almost tripling the consumption. It's better to add limits for the size of the incoming message, the width, and the height to prevent memory overflow.

We can now implement the main function, which spawns a worker thread and starts a server instance:

fn main() {
let addr = ([127, 0, 0, 1], 8080).into();
let builder = Server::bind(&addr);
let tx = start_worker();
let server = builder.serve(move || {
let tx = tx.clone();
service_fn(move |req| microservice_handler(tx.clone(), req))
});
let server = server.map_err(drop);
hyper::rt::run(server);
}

The only difference here from the main function of the previous chapter is that we call the start_worker function and use the returned Sender as a parameter for the handler function along with a request.

Let's look at an implementation of microservice_handler and learn how it interacts with a worker.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.67.16