Actor

The notification actor is more complex and we need more types to implement it. Let's look into them:

use actix::{Actor, ActorContext, AsyncContext, Handler, Recipient, StreamHandler};
use actix_web::ws::{Message, ProtocolError, WebsocketContext};
use crate::repeater::{RepeaterControl, RepeaterUpdate};
use std::time::{Duration, Instant};
use super::State;

First, we started by using the ws module of the actix_web crate. It contains a necessary WebsocketContext that we will use as a context value in the Actor trait implementation. Also, we need Message and ProtocolError types to implement WebSocket stream handling. We also imported ActorContext to stop the method of the Context instance to break the connection with a client. We imported the AsyncContext trait to get an address of a context and to run a task that performs on time intervals. One new type that we have not used yet is StreamHandler. It is necessary to implement the handing of values that are sent from Stream to Actor.

You can use either Handler or StreamHandler for handling messages of the same type. Which one is preferable? The rule is simple: if your actor will process a lot of messages, it's better to use StreamHandler and connect the messages flow as a Stream to an Actor. The actix runtime has check and if it calls the same Handler, you may receive warnings.

Add the constants that we will use for sending ping messages to our clients:

const PING_INTERVAL: Duration = Duration::from_secs(20);
const PING_TIMEOUT: Duration = Duration::from_secs(60);

The constants contain interval and timeout values.

We will send pings to a client, because we have to keep a connection alive, since servers often have default timeouts for WebSocket connections. For example, nginx will close the connection after 60 seconds if there isn't any activity. And if you use nginx as a proxy with default configuration for WebSocket connections, then your connections can be broken. Browsers don't send pings and only send pongs for incoming pings. The server is responsible for sending pings to clients connected via browsers to prevent disconnecting through a timeout.

Add the following NotifyActor struct to the code:

pub struct NotifyActor {
last_ping: Instant,
repeater: Recipient<RepeaterControl>,
}

This actor has a last_ping of the Instant type to keep the timestamp of the latest ping. Also, the actor holds a Recipient address to send RepeaterControl messages. We will provide the address of RepeaterActor for this field with the constructor:

impl NotifyActor {
pub fn new(repeater: Recipient<RepeaterControl>) -> Self {
Self {
last_ping: Instant::now(),
repeater,
}
}
}

Now, we have to implement the Actor trait for the NotifyActor struct:

impl Actor for NotifyActor {
type Context = WebsocketContext<Self, State>;

fn started(&mut self, ctx: &mut Self::Context) {
let msg = RepeaterControl::Subscribe(ctx.address().recipient());
self.repeater.do_send(msg).ok();
ctx.run_interval(PING_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.last_ping) > PING_TIMEOUT {
ctx.stop();
return;
}
ctx.ping("ping");
});
}

fn stopped(&mut self, ctx: &mut Self::Context) {
let msg = RepeaterControl::Unsubscribe(ctx.address().recipient());
self.repeater.do_send(msg).ok();
}
}

This is the first time where we need to override the empty started and stopped methods. In the started method implementation, we will create a Subscribe message and send it using Repeater. Also, we add a task that will be executed on PING_INTERVAL and will send a ping message using the ping method of WebsocketContext. If a client never responds to us, then the last_ping field won't be updated. If the interval is larger than our PING_TIMEOUT value, we will interrupt the connection using the stop method of the context.

The stopped method implementation is much simpler: it prepares an Unsubscribe event with the same address of the actor and sends it to RepeaterActor.

Our actor implementation is ready and now we have to add handlers for messages and a stream.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.140.197.136