From fd54f4da9104850d09b2441d5a778f6a5370b061 Mon Sep 17 00:00:00 2001 From: Elias Fleckenstein Date: Thu, 15 Sep 2022 22:12:08 +0200 Subject: [PATCH] Server: implement login --- Cargo.lock | 13 ++++ Cargo.toml | 2 + src/client.rs | 12 ++-- src/quit.rs | 2 +- src/server.rs | 45 ++++--------- src/server/remote_client.rs | 127 +++++++++++++++++++++++++++--------- 6 files changed, 131 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7755eb..63f7305 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -618,6 +618,17 @@ dependencies = [ "syn", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dispatch" version = "0.2.0" @@ -645,8 +656,10 @@ version = "0.1.0" dependencies = [ "clap", "connect", + "derivative", "env_logger", "log", + "once_cell", "serde", "serde_cbor", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6aa98a2..1b21148 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,5 @@ log = "0.4.17" env_logger = "0.9.0" winit = "0.27.3" wgpu = "0.13.1" +once_cell = "1.14.0" +derivative = "2.2.0" diff --git a/src/client.rs b/src/client.rs index fcfc517..f5dd4cb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,7 +6,7 @@ use log::*; use tokio::sync::Mutex as AsyncMutex; pub struct Client { - pub conn: AsyncMutex, + conn: AsyncMutex, quit: Quit, } @@ -23,7 +23,7 @@ impl Client { } async fn handle(&self, msg: &ConnectDatagram) { - println!("{}", msg.recipient()); + info!("{}", msg.recipient()); } async fn run_loop(self, mut reader: ConnectionReader) { @@ -44,18 +44,18 @@ impl Client { msg = reader.next() => match msg { Some(msg) => self.handle(&msg).await, None => { - trace!("Server closed connection"); + trace!("server closed connection"); break; } }, _ = quit.recv() => { - trace!("Quit signal received"); + trace!("quit signal received"); break; }, - else => unreachable!("Quit channel broke"), + else => unreachable!("quit channel broke"), } } - info!("Disconnected"); + info!("disconnected"); } } diff --git a/src/quit.rs b/src/quit.rs index 86cb9c3..ad03de4 100644 --- a/src/quit.rs +++ b/src/quit.rs @@ -1,7 +1,7 @@ use log::*; use tokio::sync::{broadcast, mpsc}; -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Quit { signal: broadcast::Sender<()>, diff --git a/src/server.rs b/src/server.rs index 5f82b83..071019d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,16 +4,15 @@ mod remote_client; use crate::quit::Quit; use connect::{tcp::TcpListener, StreamExt}; use log::*; -use remote_client::{Client, ClientId}; +use remote_client::Client; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -use tokio::{sync::Mutex as AsyncMutex, task}; +#[derive(Debug)] pub struct ServerData { - clients_by_id: RwLock>>, - clients_by_name: RwLock>>, + players: RwLock>>, } pub struct Server { @@ -28,49 +27,29 @@ impl Server { quit, listener: TcpListener::bind(addr).await.unwrap(), data: Arc::new(ServerData { - clients_by_id: RwLock::new(HashMap::new()), - clients_by_name: RwLock::new(HashMap::new()), + players: RwLock::new(HashMap::new()), }), } } pub async fn run(mut self) { - let mut next_id: ClientId = 0; let mut quit = self.quit.subscribe(); loop { tokio::select! { - conn = self.listener.next() => { - let conn = conn.expect("Listener interrupted"); - - info!("Client from {} assigned id {next_id}", conn.peer_addr()); - - let (reader, writer) = conn.split(); - let client = Arc::new(Client { - id: next_id, - conn: AsyncMutex::new(writer), - server: Arc::downgrade(&self.data), - quit: self.quit.clone(), - }); - - self.data - .clients_by_id - .write() - .unwrap() - .insert(next_id, Arc::clone(&client)); - - next_id += 1; - - task::spawn(async move { (*client).run(reader).await }); - }, + conn = self.listener.next() => Client::create( + conn.expect("listener interrupted"), + Arc::downgrade(&self.data), + self.quit.clone() + ).await, _ = quit.recv() => { - trace!("Quit signal received"); + trace!("quit signal received"); break; }, - else => unreachable!("Quit channel broke"), + else => unreachable!("quit channel broke"), } } - info!("Stopped server"); + info!("stopped server"); } } diff --git a/src/server/remote_client.rs b/src/server/remote_client.rs index 6db6eb0..efe0339 100644 --- a/src/server/remote_client.rs +++ b/src/server/remote_client.rs @@ -1,40 +1,100 @@ -use super::pkts::*; -use super::ServerData; -use crate::pkt; -use crate::Quit; -use connect::{ConnectDatagram, ConnectionReader, ConnectionWriter, StreamExt}; +use crate::{ + pkt, + quit::Quit, + server::{pkts::*, ServerData}, +}; +use connect::{ConnectDatagram, Connection, ConnectionReader, ConnectionWriter, StreamExt}; use log::*; -use std::sync::Weak; -use tokio::sync::Mutex as AsyncMutex; - -pub type ClientId = u64; +use once_cell::sync::OnceCell; +use std::{ + fmt, + net::SocketAddr, + sync::{Arc, Weak}, +}; +use tokio::{sync::Mutex as AsyncMutex, task}; +#[derive(derivative::Derivative)] +#[derivative(Debug)] pub struct Client { - pub id: ClientId, - pub conn: AsyncMutex, - pub server: Weak, - pub quit: Quit, + addr: SocketAddr, + data: OnceCell, + + #[allow(unused)] + #[derivative(Debug = "ignore")] + conn: AsyncMutex, + + #[derivative(Debug = "ignore")] + server: Weak, + + #[derivative(Debug = "ignore")] + circle: OnceCell>, + + #[derivative(Debug = "ignore")] + quit: Quit, +} + +#[derive(Debug)] +pub struct ClientData { + name: String, } impl Client { + pub async fn create(conn: Connection, server: Weak, quit: Quit) { + let addr = conn.peer_addr(); + let (reader, writer) = conn.split(); + + let client = Arc::new(Self { + addr, + server, + quit, + conn: AsyncMutex::new(writer), + data: OnceCell::new(), + circle: OnceCell::new(), + }); + + client + .circle + .set(Arc::downgrade(&client)) + .expect("OnceCell was just created"); + + task::spawn(async move { + (*client).run(reader).await; + }); + } + async fn login(&self, pkt: &Login) { - info!( - "Client {id}: logged in {name} {pwd}", - id = self.id, - name = pkt.name, - pwd = pkt.pwd - ); + if let None = self.data.get() && let Some(server) = self.server.upgrade() { + server + .players + .write() + .expect("deadlock") + .entry(pkt.name.clone()) + .or_insert_with(|| { + self.data.set(ClientData { + name: pkt.name.clone(), + }).expect("OnceCell was verified to be empty above"); + + info!("{self}: logged as {}", pkt.name); + + self.circle + .get() + .expect("OnceCell was initialized in fn create") + .upgrade() + .expect("self can't be dropped") + }); + } } async fn handle(&self, msg: &ConnectDatagram) { match msg.recipient() { LOGIN if let Some(pkt) = pkt::get::(msg) => self.login(&pkt).await, - _ => warn!("Client {id}: Invalid packet with recipient {rep}", - id = self.id, rep = msg.recipient()), + _ => warn!("{self}: invalid packet with recipient {}", msg.recipient()), } } - pub async fn run(&self, mut reader: ConnectionReader) { + async fn run(&self, mut reader: ConnectionReader) { + info!("{self}: connected"); + let mut quit = self.quit.subscribe(); loop { @@ -42,23 +102,30 @@ impl Client { msg = reader.next() => match msg { Some(msg) => self.handle(&msg).await, None => { - trace!("Client {id}: Closed connection", id = self.id); + trace!("{self}: closed connection"); break; } }, _ = quit.recv() => { - trace!("Client {id}: Quit signal received", id = self.id); + trace!("{self}: quit signal received"); break; }, - else => unreachable!("Quit channel broke"), + else => unreachable!("quit channel broke"), } } - if let Some(server) = self.server.upgrade() { - server.clients_by_id.write().unwrap().remove(&self.id); - trace!("Client {id}: Removed from clients", id = self.id); - } + if let Some(data) = self.data.get() && let Some(srv) = self.server.upgrade() { + srv.players.write().expect("deadlock").remove(&data.name); + + trace!("{self}: removed from clients"); + } + + info!("{self}: disconnected"); + } +} - info!("Client {id}: Disconnected", id = self.id); +impl fmt::Display for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.addr) } } -- 2.44.0