How to do it...

  1. Create a Rust project to work on during this chapter with cargo new futures.
  2. Navigate into the newly-created futures folder. For the rest of this chapter, we will assume that your command line is within this directory.
  3. Inside the src folder, create a new folder called bin.
  4. Delete the generated lib.rs file, as we are not creating a library.
  5. Open the Cargo.toml file that has been generated.
  6. Under [dependencies], add the following lines:
futures = "0.2.0-beta"
futures-util = "0.2.0-beta"
  1. In the src/bin folder, create a file called pool.rs.
  2. Add the following code and run it with cargo run —bin pool:
1   extern crate futures;
2   
3   use futures::prelude::*;
4   use futures::task::Context;
5   use futures::channel::oneshot;
6   use futures::future::{FutureResult, lazy, ok};
7   use futures::executor::{block_on, Executor, LocalPool, 
ThreadPoolBuilder}; 8 9 use std::cell::Cell; 10 use std::rc::Rc; 11 use std::sync::mpsc; 12 use std::thread; 13 use std::time::Duration;

Let's add our constants, enums, structures, and trait implementations:

15  #[derive(Clone, Copy, Debug)]
16  enum Status {
17    Loading,
18    FetchingData,
19    Loaded,
20  }
21  
22  #[derive(Clone, Copy, Debug)]
23  struct Container {
24    name: &'static str,
25    status: Status,
26    ticks: u64,
27  }
28  
29  impl Container {
30    fn new(name: &'static str) -> Self {
31      Container {
32        name: name,
33        status: Status::Loading,
34        ticks: 3,
35      }
36    }
37  
38    // simulate ourselves retreiving a score from a remote  
database 39 fn pull_score(&mut self) -> FutureResult<u32, Never> { 40 self.status = Status::Loaded; 41 thread::sleep(Duration::from_secs(self.ticks)); 42 ok(100) 43 } 44 } 45 46 impl Future for Container { 47 type Item = (); 48 type Error = Never; 49 50 fn poll(&mut self, _cx: &mut Context) -> Poll<Self::Item,
Self::Error> { 51 Ok(Async::Ready(())) 52 } 53 } 55 const FINISHED: Result<(), Never> = Ok(()); 56 57 fn new_status(unit: &'static str, status: Status) { 58 println!("{}: new status: {:?}", unit, status); 59 }

Let's add our first local threaded function:

61  fn local_until() {
62    let mut container = Container::new("acme");
63  
64    // setup our green thread pool
65    let mut pool = LocalPool::new();
66    let mut exec = pool.executor();
67  
68    // lazy will only execute the closure once the future has
been polled 69 // we will simulate the poll by returning using the
future::ok method 70 71 // typically, we perform some heavy computational process
within this closure 72 // such as loading graphic assets, sound, other parts of our
framework/library/etc. 73 let f = lazy(move |_| -> FutureResult<Container, Never> { 74 container.status = Status::FetchingData; 75 ok(container) 76 }); 77 78 println!("container's current status: {:?}",
container.status); 79 80 container = pool.run_until(f, &mut exec).unwrap(); 81 new_status("local_until", container.status); 82 83 // just to demonstrate a simulation of "fetching data over a
network" 84 println!("Fetching our container's score..."); 85 let score = block_on(container.pull_score()).unwrap(); 86 println!("Our container's score is: {:?}", score); 87 88 // see if our status has changed since we fetched our score 89 new_status("local_until", container.status); 90 }

And now for our locally-spawned threading examples:

92  fn local_spawns_completed() {
93    let (tx, rx) = oneshot::channel();
94    let mut container = Container::new("acme");
95  
96    let mut pool = LocalPool::new();
97    let mut exec = pool.executor();
98  
99    // change our container's status and then send it to our  
oneshot channel 100 exec.spawn_local(lazy(move |_| { 101 container.status = Status::Loaded; 102 tx.send(container).unwrap(); 103 FINISHED 104 })) 105 .unwrap(); 106 107 container = pool.run_until(rx, &mut exec).unwrap(); 108 new_status("local_spanws_completed", container.status); 109 } 110 111 fn local_nested() { 112 let mut container = Container::new("acme"); 114 // we will need Rc (reference counts) since
we are referencing multiple owners 115 // and we are not using Arc (atomic reference counts)
since we are only using 116 // a local pool which is on the same thread technically 117 let cnt = Rc::new(Cell::new(container)); 118 let cnt_2 = cnt.clone(); 119 120 let mut pool = LocalPool::new(); 121 let mut exec = pool.executor(); 122 let mut exec_2 = pool.executor(); 123 124 let _ = exec.spawn_local(lazy(move |_| { 125 exec_2.spawn_local(lazy(move |_| { 126 let mut container = cnt_2.get(); 127 container.status = Status::Loaded; 128 129 cnt_2.set(container); 130 FINISHED 131 })) 132 .unwrap(); 133 FINISHED 134 })); 135 136 let _ = pool.run(&mut exec); 137 138 container = cnt.get(); 139 new_status("local_nested", container.status); 140 }

And now for our thread pool example:

142 fn thread_pool() {
143   let (tx, rx) = mpsc::sync_channel(2);
144   let tx_2 = tx.clone();
145 
146   // there are various thread builder options which are 
referenced at 147 // https://docs.rs/futures/0.2.0-
beta/futures/executor/struct.ThreadPoolBuilder.html 148 let mut cpu_pool = ThreadPoolBuilder::new() 149 .pool_size(2) // default is the number of cpus 150 .create(); 151 152 // We need to box this part since we need the Send +'static trait 153 // in order to safely send information across threads 154 let _ = cpu_pool.spawn(Box::new(lazy(move |_| { 155 tx.send(1).unwrap(); 156 FINISHED 157 }))); 158 159 let f = lazy(move |_| { 160 tx_2.send(1).unwrap(); 161 FINISHED 162 }); 163 164 let _ = cpu_pool.run(f); 165 166 let cnt = rx.into_iter().count(); 167 println!("Count should be 2: {:?}", cnt); 168 }

And lastly, our main function:

170 fn main() {
171   println!("local_until():");
172   local_until();
173 
174   println!("
local_spawns_completed():");
175   local_spawns_completed();
176 
177   println!("
local_nested():");
178   local_nested();
179 
180   println!("
thread_pool():");
181   thread_pool();
182 }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.52.7