From: Geoffrey Allott Date: Sat, 6 Mar 2021 10:44:04 +0000 (+0000) Subject: create dedup_ready method for streams instead of abusing ready_chunks X-Git-Url: https://git.pointlesshacks.com/?a=commitdiff_plain;h=17794e910b8a8e78c72cc9741604c1e7bea98302;p=pokerwave.git create dedup_ready method for streams instead of abusing ready_chunks --- diff --git a/Cargo.lock b/Cargo.lock index 22b1ea6..e575ad0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1084,6 +1084,7 @@ dependencies = [ "hex", "itertools", "log", + "pin-project", "rand 0.8.3", "rand_chacha 0.3.0", "redis", @@ -1353,18 +1354,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92d5161132722baa40d802cc70b15262b98258453e85e5d1d365c757c73869ae" +checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.123" +version = "1.0.124" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9391c295d64fc0abb2c556bad848f33cb8296276b1ad2677d1ae1ace4f258f31" +checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 0609d23..6b29ef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ futures = "0.3" getrandom = "0.2" hex = { version = "0.4", features = ["serde"] } itertools = "0.10" +pin-project = "1.0" rand = "0.8" rand_chacha = "0.3" redis = { version = "0.20", features = ["async-std-comp"] } diff --git a/src/client.rs b/src/client.rs index ac1cec4..2ff9f91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,10 +30,6 @@ pub enum ClientInterest { Timeout { id: i64 }, } -impl ClientInterest { - pub const STREAM_COALESCE_MAX: usize = 16; -} - #[derive(Debug, Clone)] pub enum LoggedInState { Idle, diff --git a/src/dealer.rs b/src/dealer.rs index 8cb276e..12e1bab 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -6,6 +6,7 @@ use redis::{ErrorKind, RedisError, RedisResult}; use crate::client::ClientInterest; use crate::game::{Game, DealerAction, ValidatedUserAction}; use crate::server::{ActionStatus, ServerState}; +use crate::util::dedup::DedupReadyExt; pub struct Dealer { server: ServerState, @@ -37,7 +38,7 @@ impl Dealer { } pub async fn start(mut self, update_stream: Receiver) { - let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX); + let mut update_stream = update_stream.dedup_ready(); while let Some(_) = update_stream.next().await { match self.retrieve_updates().await { Ok(Termination::Continue) => continue, diff --git a/src/main.rs b/src/main.rs index 83d0fc5..9f3e20a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,7 @@ use crate::client::{ClientInterest, ConnectionState}; use crate::dealer::Dealer; use crate::game::GameList; use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState}; +use crate::util::dedup::DedupReadyExt; pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver) -> Result<(), std::io::Error> { #[derive(Debug)] @@ -193,10 +194,10 @@ async fn handle_new_games(server: Server) -> Result<(), std::io::Error> { pub async fn handle_websocket_request(request: Request, stream: WebSocketConnection) -> Result<(), Error> { let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await; let mut client = ConnectionState::new(server_state); - let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX); + let update_stream = update_stream.dedup_ready(); let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right)); while let Some(message) = combined.next().await { - let message: Either, Vec> = message; + let message: Either, ClientInterest> = message; match message { Either::Left(Ok(Message::Text(input))) => { let response = match serde_json::from_str(&input) { @@ -205,14 +206,11 @@ pub async fn handle_websocket_request(request: Request, stream: WebSocke }; stream.send_json(&response).await?; } - Either::Right(mut updates) => { - updates.dedup(); - for update in updates { - debug!("client received update: {:?}", update); - let mut responses = client.retrieve_updates(update).await; - while let Some(response) = responses.next().await { - stream.send_json(&response).await? - } + Either::Right(update) => { + debug!("client received update: {:?}", update); + let mut responses = client.retrieve_updates(update).await; + while let Some(response) = responses.next().await { + stream.send_json(&response).await? } } Either::Left(Ok(Message::Close(_))) => { diff --git a/src/util/dedup.rs b/src/util/dedup.rs new file mode 100644 index 0000000..e68d5ce --- /dev/null +++ b/src/util/dedup.rs @@ -0,0 +1,120 @@ +use std::mem::swap; +use std::pin::Pin; + +use futures::stream::{Fuse, FusedStream, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_project::pin_project; + +#[pin_project] +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct DedupReady { + #[pin] + stream: Fuse, + item: Option, +} + +impl DedupReady { + fn new(stream: S) -> Self { + Self { + stream: stream.fuse(), + item: None, + } + } +} + +impl Stream for DedupReady +where + S::Item: PartialEq, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + match this.item { + Some(item) => match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(next)) if &next == &*item => continue, + Poll::Ready(Some(mut next)) => { + swap(item, &mut next); + return Poll::Ready(Some(next)); + } + Poll::Ready(None) | Poll::Pending => return Poll::Ready(this.item.take()), + } + None => match this.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(item)) => *this.item = Some(item), + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + } +} + +impl FusedStream for DedupReady +where + S::Item: PartialEq, +{ + fn is_terminated(&self) -> bool { + self.item.is_none() && self.stream.is_terminated() + } +} + +pub trait DedupReadyExt : Stream { + fn dedup_ready(self) -> DedupReady + where + Self::Item: PartialEq, + Self: Sized, + { + DedupReady::new(self) + } +} + +impl DedupReadyExt for S {} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::{channel::mpsc::channel, pin_mut, stream::{empty, once, iter}, SinkExt}; + + #[async_std::test] + async fn empty_stream() { + let mut stream = empty::<()>().dedup_ready(); + assert!(stream.next().await.is_none()); + } + + #[async_std::test] + async fn single_item() { + let stream = once(async { 12 }).dedup_ready(); + pin_mut!(stream); + assert_eq!(Some(12), stream.next().await); + assert!(stream.next().await.is_none()); + } + + #[async_std::test] + async fn dedup_list() { + let mut stream = iter(vec![1, 1, 2, 1, 3, 4, 4]).dedup_ready(); + assert_eq!(Some(1), stream.next().await); + assert_eq!(Some(2), stream.next().await); + assert_eq!(Some(1), stream.next().await); + assert_eq!(Some(3), stream.next().await); + assert_eq!(Some(4), stream.next().await); + assert!(stream.next().await.is_none()); + } + + #[async_std::test] + async fn does_not_dedup_pending() { + let (mut sender, receiver) = channel(10); + let mut receiver = receiver.dedup_ready(); + sender.send(1).await.unwrap(); + sender.send(1).await.unwrap(); + assert_eq!(Some(1), receiver.next().await); + sender.send(2).await.unwrap(); + assert_eq!(Some(2), receiver.next().await); + sender.send(2).await.unwrap(); + assert_eq!(Some(2), receiver.next().await); + drop(sender); + assert_eq!(None, receiver.next().await); + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 34aac28..1a4b272 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,2 +1,3 @@ +pub mod dedup; pub mod max; pub mod timestamp;