create dedup_ready method for streams instead of abusing ready_chunks
authorGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 10:44:04 +0000 (10:44 +0000)
committerGeoffrey Allott <geoffrey@allott.email>
Sat, 6 Mar 2021 13:32:12 +0000 (13:32 +0000)
Cargo.lock
Cargo.toml
src/client.rs
src/dealer.rs
src/main.rs
src/util/dedup.rs [new file with mode: 0644]
src/util/mod.rs

index 22b1ea6b9e70facbbeca3930f24a3d2bca265a64..e575ad0499b03ab35825239c1673cb7d4f84e6ac 100644 (file)
@@ -1084,6 +1084,7 @@ dependencies = [
  "hex",
  "itertools",
  "log",
+ "pin-project",
  "rand 0.8.3",
  "rand_chacha 0.3.0",
  "redis",
@@ -1353,18 +1354,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
 
 [[package]]
 name = "serde"
-version = "1.0.123"
+version = "1.0.124"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "92d5161132722baa40d802cc70b15262b98258453e85e5d1d365c757c73869ae"
+checksum = "bd761ff957cb2a45fbb9ab3da6512de9de55872866160b23c25f1a841e99d29f"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.123"
+version = "1.0.124"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9391c295d64fc0abb2c556bad848f33cb8296276b1ad2677d1ae1ace4f258f31"
+checksum = "1800f7693e94e186f5e25a28291ae1570da908aff7d97a095dec1e56ff99069b"
 dependencies = [
  "proc-macro2",
  "quote",
index 0609d2389cf6d2b0544cc4fd330f6fe5ed5d6b7e..6b29ef557e6041d09bcdca222f4b4f8d613d1f25 100644 (file)
@@ -12,6 +12,7 @@ futures = "0.3"
 getrandom = "0.2"
 hex = { version = "0.4", features = ["serde"] }
 itertools = "0.10"
+pin-project = "1.0"
 rand = "0.8"
 rand_chacha = "0.3"
 redis = { version = "0.20", features = ["async-std-comp"] }
index ac1cec4a52f964d8d7e47769e16fd0c700c3525b..2ff9f91c4d57a2bf00ba65356b24ce4836977e24 100644 (file)
@@ -30,10 +30,6 @@ pub enum ClientInterest {
     Timeout { id: i64 },
 }
 
-impl ClientInterest {
-    pub const STREAM_COALESCE_MAX: usize = 16;
-}
-
 #[derive(Debug, Clone)]
 pub enum LoggedInState {
     Idle,
index 8cb276eea8f0d488ade0b03823ed8b90626a2eba..12e1babb12b3d826d5783523295383f07d225db7 100644 (file)
@@ -6,6 +6,7 @@ use redis::{ErrorKind, RedisError, RedisResult};
 use crate::client::ClientInterest;
 use crate::game::{Game, DealerAction, ValidatedUserAction};
 use crate::server::{ActionStatus, ServerState};
+use crate::util::dedup::DedupReadyExt;
 
 pub struct Dealer {
     server: ServerState,
@@ -37,7 +38,7 @@ impl Dealer {
     }
 
     pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
-        let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
+        let mut update_stream = update_stream.dedup_ready();
         while let Some(_) = update_stream.next().await {
             match self.retrieve_updates().await {
                 Ok(Termination::Continue) => continue,
index 83d0fc52235bf7bd30beccc523c3cf49429c2244..9f3e20aa84a17b2e57cf5b4d4b674d916cc7bd2e 100644 (file)
@@ -32,6 +32,7 @@ use crate::client::{ClientInterest, ConnectionState};
 use crate::dealer::Dealer;
 use crate::game::GameList;
 use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
+use crate::util::dedup::DedupReadyExt;
 
 pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
     #[derive(Debug)]
@@ -193,10 +194,10 @@ async fn handle_new_games(server: Server) -> Result<(), std::io::Error> {
 pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
     let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
     let mut client = ConnectionState::new(server_state);
-    let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
+    let update_stream = update_stream.dedup_ready();
     let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
     while let Some(message) = combined.next().await {
-        let message: Either<Result<Message, tide_websockets::Error>, Vec<ClientInterest>> = message;
+        let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
         match message {
             Either::Left(Ok(Message::Text(input))) => {
                 let response = match serde_json::from_str(&input) {
@@ -205,14 +206,11 @@ pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocke
                 };
                 stream.send_json(&response).await?;
             }
-            Either::Right(mut updates) => {
-                updates.dedup();
-                for update in updates {
-                    debug!("client received update: {:?}", update);
-                    let mut responses = client.retrieve_updates(update).await;
-                    while let Some(response) = responses.next().await {
-                        stream.send_json(&response).await?
-                    }
+            Either::Right(update) => {
+                debug!("client received update: {:?}", update);
+                let mut responses = client.retrieve_updates(update).await;
+                while let Some(response) = responses.next().await {
+                    stream.send_json(&response).await?
                 }
             }
             Either::Left(Ok(Message::Close(_))) => {
diff --git a/src/util/dedup.rs b/src/util/dedup.rs
new file mode 100644 (file)
index 0000000..e68d5ce
--- /dev/null
@@ -0,0 +1,120 @@
+use std::mem::swap;
+use std::pin::Pin;
+
+use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_project::pin_project;
+
+#[pin_project]
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct DedupReady<S: Stream> {
+    #[pin]
+    stream: Fuse<S>,
+    item: Option<S::Item>,
+}
+
+impl <S: Stream> DedupReady<S> {
+    fn new(stream: S) -> Self {
+        Self {
+            stream: stream.fuse(),
+            item: None,
+        }
+    }
+}
+
+impl<S: Stream> Stream for DedupReady<S>
+where 
+    S::Item: PartialEq,
+{
+    type Item = S::Item;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let mut this = self.project();
+        loop {
+            match this.item {
+                Some(item) => match this.stream.as_mut().poll_next(cx) {
+                    Poll::Ready(Some(next)) if &next == &*item => continue,
+                    Poll::Ready(Some(mut next)) => {
+                        swap(item, &mut next);
+                        return Poll::Ready(Some(next));
+                    }
+                    Poll::Ready(None) | Poll::Pending => return Poll::Ready(this.item.take()),
+                }
+                None => match this.stream.as_mut().poll_next(cx) {
+                    Poll::Ready(Some(item)) => *this.item = Some(item),
+                    Poll::Ready(None) => return Poll::Ready(None),
+                    Poll::Pending => return Poll::Pending,
+                }
+            }
+        }
+    }
+}
+
+impl<S: Stream> FusedStream for DedupReady<S>
+where
+    S::Item: PartialEq,
+{
+    fn is_terminated(&self) -> bool {
+        self.item.is_none() && self.stream.is_terminated()
+    }
+}
+
+pub trait DedupReadyExt : Stream {
+    fn dedup_ready(self) -> DedupReady<Self>
+    where
+        Self::Item: PartialEq,
+        Self: Sized,
+    {
+        DedupReady::new(self)
+    }
+}
+
+impl<S: Stream> DedupReadyExt for S {}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use futures::{channel::mpsc::channel, pin_mut, stream::{empty, once, iter}, SinkExt};
+
+    #[async_std::test]
+    async fn empty_stream() {
+        let mut stream = empty::<()>().dedup_ready();
+        assert!(stream.next().await.is_none());
+    }
+
+    #[async_std::test]
+    async fn single_item() {
+        let stream = once(async { 12 }).dedup_ready();
+        pin_mut!(stream);
+        assert_eq!(Some(12), stream.next().await);
+        assert!(stream.next().await.is_none());
+    }
+
+    #[async_std::test]
+    async fn dedup_list() {
+        let mut stream = iter(vec![1, 1, 2, 1, 3, 4, 4]).dedup_ready();
+        assert_eq!(Some(1), stream.next().await);
+        assert_eq!(Some(2), stream.next().await);
+        assert_eq!(Some(1), stream.next().await);
+        assert_eq!(Some(3), stream.next().await);
+        assert_eq!(Some(4), stream.next().await);
+        assert!(stream.next().await.is_none());
+    }
+
+    #[async_std::test]
+    async fn does_not_dedup_pending() {
+        let (mut sender, receiver) = channel(10);
+        let mut receiver = receiver.dedup_ready();
+        sender.send(1).await.unwrap();
+        sender.send(1).await.unwrap();
+        assert_eq!(Some(1), receiver.next().await);
+        sender.send(2).await.unwrap();
+        assert_eq!(Some(2), receiver.next().await);
+        sender.send(2).await.unwrap();
+        assert_eq!(Some(2), receiver.next().await);
+        drop(sender);
+        assert_eq!(None, receiver.next().await);
+    }
+}
index 34aac28e1472ebe6901c38fe286f273dedc71c07..1a4b2720dfd954fc9a5fc8183b9d840c41953a07 100644 (file)
@@ -1,2 +1,3 @@
+pub mod dedup;
 pub mod max;
 pub mod timestamp;