The notification actor is more complex and we need more types to implement it. Let's look into them:
use actix::{Actor, ActorContext, AsyncContext, Handler, Recipient, StreamHandler};
use actix_web::ws::{Message, ProtocolError, WebsocketContext};
use crate::repeater::{RepeaterControl, RepeaterUpdate};
use std::time::{Duration, Instant};
use super::State;
First, we started by using the ws module of the actix_web crate. It contains a necessary WebsocketContext that we will use as a context value in the Actor trait implementation. Also, we need Message and ProtocolError types to implement WebSocket stream handling. We also imported ActorContext to stop the method of the Context instance to break the connection with a client. We imported the AsyncContext trait to get an address of a context and to run a task that performs on time intervals. One new type that we have not used yet is StreamHandler. It is necessary to implement the handing of values that are sent from Stream to Actor.
Add the constants that we will use for sending ping messages to our clients:
const PING_INTERVAL: Duration = Duration::from_secs(20);
const PING_TIMEOUT: Duration = Duration::from_secs(60);
The constants contain interval and timeout values.
Add the following NotifyActor struct to the code:
pub struct NotifyActor {
last_ping: Instant,
repeater: Recipient<RepeaterControl>,
}
This actor has a last_ping of the Instant type to keep the timestamp of the latest ping. Also, the actor holds a Recipient address to send RepeaterControl messages. We will provide the address of RepeaterActor for this field with the constructor:
impl NotifyActor {
pub fn new(repeater: Recipient<RepeaterControl>) -> Self {
Self {
last_ping: Instant::now(),
repeater,
}
}
}
Now, we have to implement the Actor trait for the NotifyActor struct:
impl Actor for NotifyActor {
type Context = WebsocketContext<Self, State>;
fn started(&mut self, ctx: &mut Self::Context) {
let msg = RepeaterControl::Subscribe(ctx.address().recipient());
self.repeater.do_send(msg).ok();
ctx.run_interval(PING_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.last_ping) > PING_TIMEOUT {
ctx.stop();
return;
}
ctx.ping("ping");
});
}
fn stopped(&mut self, ctx: &mut Self::Context) {
let msg = RepeaterControl::Unsubscribe(ctx.address().recipient());
self.repeater.do_send(msg).ok();
}
}
This is the first time where we need to override the empty started and stopped methods. In the started method implementation, we will create a Subscribe message and send it using Repeater. Also, we add a task that will be executed on PING_INTERVAL and will send a ping message using the ping method of WebsocketContext. If a client never responds to us, then the last_ping field won't be updated. If the interval is larger than our PING_TIMEOUT value, we will interrupt the connection using the stop method of the context.
The stopped method implementation is much simpler: it prepares an Unsubscribe event with the same address of the actor and sends it to RepeaterActor.
Our actor implementation is ready and now we have to add handlers for messages and a stream.