The Sender

Now that we've covered the Receiver, all that remains is the Sender. It's defined in src/sender.rs. The most important function in the Sender is send. The Sender follows the same disk/memory mode idea that Receiver uses but is more complicated in its operation. Let's dig in:

        let mut back_guard = self.mem_buffer.lock_back();
        if (*back_guard).inner.total_disk_writes == 0 {
            // in-memory mode
            let placed_event = private::Placement::Memory(event);
            match self.mem_buffer.push_back(placed_event, &mut 
back_guard) { Ok(must_wake_receiver) => { if must_wake_receiver { let front_guard = self.mem_buffer.lock_front(); self.mem_buffer.notify_not_empty(&front_guard); drop(front_guard); } } Err(deque::Error::Full(placed_event)) => { self.write_to_disk(placed_event.extract().unwrap(),
&mut back_guard)?; (*back_guard).inner.total_disk_writes += 1; } }

Recall that the deque allows the holder of the back guard to smuggle state for coordination through it. We're seeing that pay off here. The Sender's internal state is called SenderSync and is defined as follows:

pub struct SenderSync {
    pub sender_fp: Option<BufWriter<fs::File>>,
    pub bytes_written: usize,
    pub sender_seq_num: usize,
    pub total_disk_writes: usize,
    pub path: PathBuf, // active fp filename
}

Every sender thread has to be able to write to the current disk queue file, which sender_fp points to. Likewise, bytes_written tracks how many bytes have been, well, written to disk. The Sender must keep track of this value in order to correctly roll queue files over when they grow too large. sender_seq_num defines the name of the current writable queue file, being as they are named sequentially from zero on up. The key field for us is total_disk_writes. Notice that a memory write—self.mem_buffer.push_back(placed_event, &mut back_guard)—might fail with a Full error. In that case, self.write_to_disk is called to write the T to disk, increasing the total number of disk writes. This write mode was prefixed with a check into the cross-thread SenderSync to determine if there were outstanding disk writes. Remember, at this point, the Receiver has no way to determine that there has been an additional write go to disk; the sole communication channel with the Receiver is through the in-memory deque. To that end, the next Sender thread will flip into a different write mode:

        } else {
            // disk mode
            self.write_to_disk(event, &mut back_guard)?;
            (*back_guard).inner.total_disk_writes += 1;
            assert!((*back_guard).inner.sender_fp.is_some());
            if let Some(ref mut fp) = (*back_guard).inner.sender_fp {
                fp.flush().expect("unable to flush");
            } else {
                unreachable!()
            }
            if let Ok(must_wake_receiver) = self.mem_buffer.push_back(
                private::Placement::Disk(
(*back_guard).inner.total_disk_writes
), &mut back_guard, ) { (*back_guard).inner.total_disk_writes = 0; if must_wake_receiver { let front_guard = self.mem_buffer.lock_front(); self.mem_buffer.notify_not_empty(&front_guard); drop(front_guard); } } }

Once there's a single write to disk, the Sender flips into disk preferential write mode. At the start of this branch the T goes to disk, is flushed. Then, the Sender attempts to push a Placement::Disk onto the in-memory deque, which may fail if the Receiver is slow or unlucky in its scheduling assignment. Should it succeed, however, the total_disk_writes is set to zero—there are no longer any outstanding disk writes—and the Receiver is woken if need be to read its new events. The next time a Sender thread rolls through it may or may not have space in the in-memory deque to perform a memory placement but that's the next thread's concern.

That's the heart of Sender. While there is another large function in-module, write_to_disk, we won't list it here. The implementation is primarily book-keeping inside a mutex, a topic that has been covered in detail in this and the previous chapter, plus filesystem manipulation. That said, the curious reader is warmly encouraged to read through the code.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.38.92