}
pub async fn handle_client_interest_subscriptions(mut connection: PubSub, mut new_clients: Receiver<ClientInterestSender>) {
- debug!("handle_client_interest: init");
+ debug!("init");
let mut clients: Vec<Client> = Vec::new();
loop {
enum Action {
action
} {
Action::AddClient { sender } => {
- debug!("handle_client_interest: Action::AddClient {{ clients[{}] }}", clients.len());
+ debug!("Action::AddClient {{ clients[{}] }}", clients.len());
clients.push(Client { sender, interests: HashSet::new() });
}
Action::RegisterInterest { index, mut client_interests } => {
- debug!("handle_client_interest: Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
+ debug!("Action::RegisterInterest{{ client_interests: {:?}, index: {:?} }}", client_interests, index);
let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
swap(&mut clients[index].interests, &mut client_interests);
let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
for interest in &interests_after - &interests_prior {
- debug!("handle_client_interest: Subscribing to {:?}", interest);
+ debug!("Subscribing to {:?}", interest);
if let Err(err) = connection.subscribe(interest).await {
- error!("handle_client_interest: Subscribe failed: {}", err);
+ error!("Subscribe failed: {}", err);
}
}
for interest in &interests_prior - &interests_after {
- debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ debug!("Unsubscribing from {:?}", interest);
if let Err(err) = connection.unsubscribe(interest).await {
- error!("handle_client_interest: Unsubscribe failed: {}", err);
+ error!("Unsubscribe failed: {}", err);
}
}
let client = &mut clients[index];
let sender = &mut client.sender;
for interest in &client.interests - &client_interests {
- debug!("handle_client_interest: Sending initial interest for new interest {:?} to clients[{}]", interest, index);
+ debug!("Sending initial interest for new interest {:?} to clients[{}]", interest, index);
if let Err(err) = sender.interest.send(interest.clone()).await {
- error!("handle_client_interest: Send failed: {}", err);
+ error!("Send failed: {}", err);
}
}
}
Action::SendInterest { interest } => {
- debug!("handle_client_interest: Action::SendInterest {{ interest: {:?} }}", interest);
+ debug!("Action::SendInterest {{ interest: {:?} }}", interest);
match TryFrom::try_from(interest) {
Ok(interest) => {
for (index, Client { sender, interests }) in clients.iter_mut().enumerate() {
if interests.contains(&interest) {
- debug!("handle_client_interest: Sending {:?} to clients[{}]", interest, index);
+ debug!("Sending {:?} to clients[{}]", interest, index);
if let Err(err) = sender.interest.send(interest.clone()).await {
- error!("handle_client_interest: Send failed: {}", err);
+ error!("Send failed: {}", err);
}
}
}
}
Err(ClientInterestFromMsgError::TimeoutMessageWasNotExpired) => {}
Err(ClientInterestFromMsgError::InvalidChannelName { channel_name }) => {
- error!("handle_client_interest: Failed to interest {} to ClientInterest", channel_name);
+ error!("Failed to interest {} to ClientInterest", channel_name);
}
}
}
Action::RemoveClient { index } => {
- debug!("handle_client_interest: Action::RemoveClient {{ index: {:?} }}", index);
+ debug!("Action::RemoveClient {{ index: {:?} }}", index);
let interests_prior: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
clients.remove(index);
let interests_after: HashSet<ClientInterest> = clients.iter().map(|Client { ref interests, .. }| interests.clone()).flatten().collect();
for interest in &interests_prior - &interests_after {
- debug!("handle_client_interest: Unsubscribing from {:?}", interest);
+ debug!("Unsubscribing from {:?}", interest);
if let Err(err) = connection.unsubscribe(interest).await {
- error!("handle_client_interest: Unsubscribe failed: {}", err);
+ error!("Unsubscribe failed: {}", err);
}
}
}
Action::NoNewClients => {
- debug!("handle_client_interest: Action::NoNewClients - current clients: {:?}", clients);
+ debug!("Action::NoNewClients - current clients: {:?}", clients);
if clients.is_empty() {
return;
}
}
Action::ConnectionClosed => {
- debug!("handle_client_interest: Action::ConnectionClosed - current clients: {:?}", clients);
+ debug!("Action::ConnectionClosed - current clients: {:?}", clients);
return;
}
}