- Create a Rust project to work on during this chapter with cargo new futures.
- Navigate into the newly-created futures folder. For the rest of this chapter, we will assume that your command line is within this directory.
- Inside the src folder, create a new folder called bin.
- Delete the generated lib.rs file, as we are not creating a library.
- Open the Cargo.toml file that has been generated.
- Under [dependencies], add the following lines:
futures = "0.2.0-beta"
futures-util = "0.2.0-beta"
- In the src/bin folder, create a file called pool.rs.
- Add the following code and run it with cargo run —bin pool:
1 extern crate futures; 2 3 use futures::prelude::*; 4 use futures::task::Context; 5 use futures::channel::oneshot; 6 use futures::future::{FutureResult, lazy, ok}; 7 use futures::executor::{block_on, Executor, LocalPool,
ThreadPoolBuilder}; 8 9 use std::cell::Cell; 10 use std::rc::Rc; 11 use std::sync::mpsc; 12 use std::thread; 13 use std::time::Duration;
Let's add our constants, enums, structures, and trait implementations:
15 #[derive(Clone, Copy, Debug)] 16 enum Status { 17 Loading, 18 FetchingData, 19 Loaded, 20 } 21 22 #[derive(Clone, Copy, Debug)] 23 struct Container { 24 name: &'static str, 25 status: Status, 26 ticks: u64, 27 } 28 29 impl Container { 30 fn new(name: &'static str) -> Self { 31 Container { 32 name: name, 33 status: Status::Loading, 34 ticks: 3, 35 } 36 } 37 38 // simulate ourselves retreiving a score from a remote
database 39 fn pull_score(&mut self) -> FutureResult<u32, Never> { 40 self.status = Status::Loaded; 41 thread::sleep(Duration::from_secs(self.ticks)); 42 ok(100) 43 } 44 } 45 46 impl Future for Container { 47 type Item = (); 48 type Error = Never; 49 50 fn poll(&mut self, _cx: &mut Context) -> Poll<Self::Item,
Self::Error> { 51 Ok(Async::Ready(())) 52 } 53 } 55 const FINISHED: Result<(), Never> = Ok(()); 56 57 fn new_status(unit: &'static str, status: Status) { 58 println!("{}: new status: {:?}", unit, status); 59 }
Let's add our first local threaded function:
61 fn local_until() { 62 let mut container = Container::new("acme"); 63 64 // setup our green thread pool 65 let mut pool = LocalPool::new(); 66 let mut exec = pool.executor(); 67 68 // lazy will only execute the closure once the future has
been polled 69 // we will simulate the poll by returning using the
future::ok method 70 71 // typically, we perform some heavy computational process
within this closure 72 // such as loading graphic assets, sound, other parts of our
framework/library/etc. 73 let f = lazy(move |_| -> FutureResult<Container, Never> { 74 container.status = Status::FetchingData; 75 ok(container) 76 }); 77 78 println!("container's current status: {:?}",
container.status); 79 80 container = pool.run_until(f, &mut exec).unwrap(); 81 new_status("local_until", container.status); 82 83 // just to demonstrate a simulation of "fetching data over a
network" 84 println!("Fetching our container's score..."); 85 let score = block_on(container.pull_score()).unwrap(); 86 println!("Our container's score is: {:?}", score); 87 88 // see if our status has changed since we fetched our score 89 new_status("local_until", container.status); 90 }
And now for our locally-spawned threading examples:
92 fn local_spawns_completed() { 93 let (tx, rx) = oneshot::channel(); 94 let mut container = Container::new("acme"); 95 96 let mut pool = LocalPool::new(); 97 let mut exec = pool.executor(); 98 99 // change our container's status and then send it to our
oneshot channel 100 exec.spawn_local(lazy(move |_| { 101 container.status = Status::Loaded; 102 tx.send(container).unwrap(); 103 FINISHED 104 })) 105 .unwrap(); 106 107 container = pool.run_until(rx, &mut exec).unwrap(); 108 new_status("local_spanws_completed", container.status); 109 } 110 111 fn local_nested() { 112 let mut container = Container::new("acme"); 114 // we will need Rc (reference counts) since
we are referencing multiple owners 115 // and we are not using Arc (atomic reference counts)
since we are only using 116 // a local pool which is on the same thread technically 117 let cnt = Rc::new(Cell::new(container)); 118 let cnt_2 = cnt.clone(); 119 120 let mut pool = LocalPool::new(); 121 let mut exec = pool.executor(); 122 let mut exec_2 = pool.executor(); 123 124 let _ = exec.spawn_local(lazy(move |_| { 125 exec_2.spawn_local(lazy(move |_| { 126 let mut container = cnt_2.get(); 127 container.status = Status::Loaded; 128 129 cnt_2.set(container); 130 FINISHED 131 })) 132 .unwrap(); 133 FINISHED 134 })); 135 136 let _ = pool.run(&mut exec); 137 138 container = cnt.get(); 139 new_status("local_nested", container.status); 140 }
And now for our thread pool example:
142 fn thread_pool() { 143 let (tx, rx) = mpsc::sync_channel(2); 144 let tx_2 = tx.clone(); 145 146 // there are various thread builder options which are
referenced at 147 // https://docs.rs/futures/0.2.0-
beta/futures/executor/struct.ThreadPoolBuilder.html 148 let mut cpu_pool = ThreadPoolBuilder::new() 149 .pool_size(2) // default is the number of cpus 150 .create(); 151 152 // We need to box this part since we need the Send +'static trait 153 // in order to safely send information across threads 154 let _ = cpu_pool.spawn(Box::new(lazy(move |_| { 155 tx.send(1).unwrap(); 156 FINISHED 157 }))); 158 159 let f = lazy(move |_| { 160 tx_2.send(1).unwrap(); 161 FINISHED 162 }); 163 164 let _ = cpu_pool.run(f); 165 166 let cnt = rx.into_iter().count(); 167 println!("Count should be 2: {:?}", cnt); 168 }
And lastly, our main function:
170 fn main() { 171 println!("local_until():"); 172 local_until(); 173 174 println!(" local_spawns_completed():"); 175 local_spawns_completed(); 176 177 println!(" local_nested():"); 178 local_nested(); 179 180 println!(" thread_pool():"); 181 thread_pool(); 182 }