use crate::client::ClientInterest;
use crate::game::{Game, DealerAction, ValidatedUserAction};
use crate::server::{ActionStatus, ServerState};
+use crate::util::dedup::DedupReadyExt;
pub struct Dealer {
server: ServerState,
}
pub async fn start(mut self, update_stream: Receiver<ClientInterest>) {
- let mut update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
+ let mut update_stream = update_stream.dedup_ready();
while let Some(_) = update_stream.next().await {
match self.retrieve_updates().await {
Ok(Termination::Continue) => continue,
use crate::dealer::Dealer;
use crate::game::GameList;
use crate::server::{ClientInterestFromMsgError, ClientInterestSender, Server, ServerState};
+use crate::util::dedup::DedupReadyExt;
pub async fn handle_client_interest(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) -> Result<(), std::io::Error> {
#[derive(Debug)]
pub async fn handle_websocket_request(request: Request<Server>, stream: WebSocketConnection) -> Result<(), Error> {
let (server_state, update_stream): (ServerState, Receiver<_>) = request.state().new_state().await;
let mut client = ConnectionState::new(server_state);
- let update_stream = update_stream.ready_chunks(ClientInterest::STREAM_COALESCE_MAX);
+ let update_stream = update_stream.dedup_ready();
let mut combined = stream::select(stream.clone().map(Either::Left), update_stream.map(Either::Right));
while let Some(message) = combined.next().await {
- let message: Either<Result<Message, tide_websockets::Error>, Vec<ClientInterest>> = message;
+ let message: Either<Result<Message, tide_websockets::Error>, ClientInterest> = message;
match message {
Either::Left(Ok(Message::Text(input))) => {
let response = match serde_json::from_str(&input) {
};
stream.send_json(&response).await?;
}
- Either::Right(mut updates) => {
- updates.dedup();
- for update in updates {
- debug!("client received update: {:?}", update);
- let mut responses = client.retrieve_updates(update).await;
- while let Some(response) = responses.next().await {
- stream.send_json(&response).await?
- }
+ Either::Right(update) => {
+ debug!("client received update: {:?}", update);
+ let mut responses = client.retrieve_updates(update).await;
+ while let Some(response) = responses.next().await {
+ stream.send_json(&response).await?
}
}
Either::Left(Ok(Message::Close(_))) => {
--- /dev/null
+use std::mem::swap;
+use std::pin::Pin;
+
+use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
+use futures::task::{Context, Poll};
+use pin_project::pin_project;
+
+#[pin_project]
+#[derive(Debug)]
+#[must_use = "streams do nothing unless polled"]
+pub struct DedupReady<S: Stream> {
+ #[pin]
+ stream: Fuse<S>,
+ item: Option<S::Item>,
+}
+
+impl <S: Stream> DedupReady<S> {
+ fn new(stream: S) -> Self {
+ Self {
+ stream: stream.fuse(),
+ item: None,
+ }
+ }
+}
+
+impl<S: Stream> Stream for DedupReady<S>
+where
+ S::Item: PartialEq,
+{
+ type Item = S::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut this = self.project();
+ loop {
+ match this.item {
+ Some(item) => match this.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(next)) if &next == &*item => continue,
+ Poll::Ready(Some(mut next)) => {
+ swap(item, &mut next);
+ return Poll::Ready(Some(next));
+ }
+ Poll::Ready(None) | Poll::Pending => return Poll::Ready(this.item.take()),
+ }
+ None => match this.stream.as_mut().poll_next(cx) {
+ Poll::Ready(Some(item)) => *this.item = Some(item),
+ Poll::Ready(None) => return Poll::Ready(None),
+ Poll::Pending => return Poll::Pending,
+ }
+ }
+ }
+ }
+}
+
+impl<S: Stream> FusedStream for DedupReady<S>
+where
+ S::Item: PartialEq,
+{
+ fn is_terminated(&self) -> bool {
+ self.item.is_none() && self.stream.is_terminated()
+ }
+}
+
+pub trait DedupReadyExt : Stream {
+ fn dedup_ready(self) -> DedupReady<Self>
+ where
+ Self::Item: PartialEq,
+ Self: Sized,
+ {
+ DedupReady::new(self)
+ }
+}
+
+impl<S: Stream> DedupReadyExt for S {}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use futures::{channel::mpsc::channel, pin_mut, stream::{empty, once, iter}, SinkExt};
+
+ #[async_std::test]
+ async fn empty_stream() {
+ let mut stream = empty::<()>().dedup_ready();
+ assert!(stream.next().await.is_none());
+ }
+
+ #[async_std::test]
+ async fn single_item() {
+ let stream = once(async { 12 }).dedup_ready();
+ pin_mut!(stream);
+ assert_eq!(Some(12), stream.next().await);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[async_std::test]
+ async fn dedup_list() {
+ let mut stream = iter(vec![1, 1, 2, 1, 3, 4, 4]).dedup_ready();
+ assert_eq!(Some(1), stream.next().await);
+ assert_eq!(Some(2), stream.next().await);
+ assert_eq!(Some(1), stream.next().await);
+ assert_eq!(Some(3), stream.next().await);
+ assert_eq!(Some(4), stream.next().await);
+ assert!(stream.next().await.is_none());
+ }
+
+ #[async_std::test]
+ async fn does_not_dedup_pending() {
+ let (mut sender, receiver) = channel(10);
+ let mut receiver = receiver.dedup_ready();
+ sender.send(1).await.unwrap();
+ sender.send(1).await.unwrap();
+ assert_eq!(Some(1), receiver.next().await);
+ sender.send(2).await.unwrap();
+ assert_eq!(Some(2), receiver.next().await);
+ sender.send(2).await.unwrap();
+ assert_eq!(Some(2), receiver.next().await);
+ drop(sender);
+ assert_eq!(None, receiver.next().await);
+ }
+}