💬 Real-time Chat Application
🎯 Mục Tiêu Dự Án
Xây dựng một real-time chat application với WebSocket protocol:
- 🔌 WebSocket server và client handling
- 📡 Message broadcasting to multiple clients
- 🏠 Multiple chat rooms support
- 👤 User management (join, leave, nicknames)
- 📨 Private messages (DMs)
- 📜 Message history per room
- 👥 Online users list
- ⚡ High-performance async với Tokio
Bạn Sẽ Học Được
- ✅ WebSocket protocol implementation
- ✅ Async programming với Tokio
- ✅ Channels cho message passing (
mpsc,broadcast) - ✅ Concurrent data structures (Arc, Mutex, RwLock)
- ✅ Actor pattern cho state management
- ✅ Broadcasting messages to multiple clients
- ✅ Real-time communication patterns
- ✅ Error handling trong async context
📦 Bước 1: Setup Project
cargo new chat_app
cd chat_app
Thêm dependencies vào Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.21"
futures-util = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4"] }
chrono = "0.4"
dashmap = "5.5"
🎮 Bước 2: Version 1 - Basic WebSocket Echo Server
Tạo src/main.rs:
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};
async fn handle_connection(stream: TcpStream) {
let addr = stream.peer_addr().unwrap();
println!("📥 New connection from: {}", addr);
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
eprintln!("❌ WebSocket handshake error: {}", e);
return;
}
};
println!("✅ WebSocket connection established: {}", addr);
let (mut write, mut read) = ws_stream.split();
// Echo back all messages
while let Some(msg) = read.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
eprintln!("❌ Error receiving message: {}", e);
break;
}
};
if msg.is_text() || msg.is_binary() {
println!("📨 Received: {:?}", msg);
if write.send(msg).await.is_err() {
eprintln!("❌ Error sending message");
break;
}
} else if msg.is_close() {
println!("👋 Connection closed: {}", addr);
break;
}
}
}
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080";
let listener = TcpListener::bind(addr).await.unwrap();
println!("🚀 WebSocket server running on ws://{}", addr);
while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_connection(stream));
}
}
Test với HTML client (client.html):
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Echo Test</title>
</head>
<body>
<h1>WebSocket Echo Client</h1>
<input id="message" type="text" placeholder="Type a message...">
<button onclick="send()">Send</button>
<div id="output"></div>
<script>
const ws = new WebSocket('ws://localhost:8080');
const output = document.getElementById('output');
ws.onopen = () => {
output.innerHTML += '<p style="color: green">Connected!</p>';
};
ws.onmessage = (event) => {
output.innerHTML += `<p>Echo: ${event.data}</p>`;
};
ws.onerror = (error) => {
output.innerHTML += `<p style="color: red">Error: ${error}</p>`;
};
ws.onclose = () => {
output.innerHTML += '<p style="color: red">Disconnected</p>';
};
function send() {
const msg = document.getElementById('message').value;
ws.send(msg);
document.getElementById('message').value = '';
}
</script>
</body>
</html>
Chạy:
cargo run
# Mở client.html trong browser
📖 Giải Thích Code
1. Tokio Async Runtime
#[tokio::main]
async fn main() {
// Tokio tự động setup async runtime
listener.accept().await; // Async operation
}
#[tokio::main]: Macro để setup Tokio runtimeasync/await: Non-blocking I/O operationstokio::spawn(): Spawn new async task
2. WebSocket Handshake
let ws_stream = accept_async(stream).await?;
accept_async(): Thực hiện WebSocket handshake- Upgrade từ HTTP sang WebSocket protocol
3. Split Stream
let (mut write, mut read) = ws_stream.split();
- Split thành read half và write half
- Cho phép concurrent read và write
🎨 Bước 3: Version 2 - Multi-user Chat với Broadcasting
Tạo src/models.rs:
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ClientMessage {
Join { username: String },
Message { content: String },
Leave,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ServerMessage {
Welcome {
user_id: String,
message: String,
},
UserJoined {
user_id: String,
username: String,
},
UserLeft {
user_id: String,
username: String,
},
Message {
user_id: String,
username: String,
content: String,
timestamp: DateTime<Utc>,
},
UserList {
users: Vec<UserInfo>,
},
Error {
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UserInfo {
pub user_id: String,
pub username: String,
}
Tạo src/server.rs:
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use std::collections::HashMap;
use crate::models::{ServerMessage, UserInfo};
pub type Tx = mpsc::UnboundedSender<ServerMessage>;
pub type Rx = mpsc::UnboundedReceiver<ServerMessage>;
#[derive(Clone)]
pub struct User {
pub id: String,
pub username: String,
pub tx: Tx,
}
pub struct ChatServer {
users: Arc<RwLock<HashMap<String, User>>>,
}
impl ChatServer {
pub fn new() -> Self {
ChatServer {
users: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn add_user(&self, user_id: String, username: String, tx: Tx) {
let user = User {
id: user_id.clone(),
username: username.clone(),
tx,
};
self.users.write().await.insert(user_id.clone(), user);
// Broadcast user joined
let msg = ServerMessage::UserJoined {
user_id: user_id.clone(),
username: username.clone(),
};
self.broadcast_message(msg, Some(&user_id)).await;
// Send user list to new user
let users_list = self.get_users_list().await;
let _ = self.send_to_user(&user_id, ServerMessage::UserList { users: users_list }).await;
}
pub async fn remove_user(&self, user_id: &str) {
if let Some(user) = self.users.write().await.remove(user_id) {
let msg = ServerMessage::UserLeft {
user_id: user.id.clone(),
username: user.username.clone(),
};
self.broadcast_message(msg, None).await;
}
}
pub async fn broadcast_message(&self, msg: ServerMessage, exclude_user: Option<&str>) {
let users = self.users.read().await;
for (user_id, user) in users.iter() {
if let Some(exclude) = exclude_user {
if user_id == exclude {
continue;
}
}
let _ = user.tx.send(msg.clone());
}
}
pub async fn send_message(&self, from_user_id: &str, content: String) {
let users = self.users.read().await;
if let Some(user) = users.get(from_user_id) {
let msg = ServerMessage::Message {
user_id: user.id.clone(),
username: user.username.clone(),
content,
timestamp: chrono::Utc::now(),
};
drop(users); // Release lock before broadcasting
self.broadcast_message(msg, None).await;
}
}
pub async fn send_to_user(&self, user_id: &str, msg: ServerMessage) -> Result<(), String> {
let users = self.users.read().await;
if let Some(user) = users.get(user_id) {
user.tx.send(msg).map_err(|e| e.to_string())?;
Ok(())
} else {
Err("User not found".to_string())
}
}
pub async fn get_users_list(&self) -> Vec<UserInfo> {
let users = self.users.read().await;
users
.values()
.map(|u| UserInfo {
user_id: u.id.clone(),
username: u.username.clone(),
})
.collect()
}
}
Cập nhật src/main.rs:
mod models;
mod server;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};
use server::{ChatServer, Tx};
use models::{ClientMessage, ServerMessage};
use std::sync::Arc;
async fn handle_connection(stream: TcpStream, server: Arc<ChatServer>) {
let addr = stream.peer_addr().unwrap();
println!("📥 New connection from: {}", addr);
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
eprintln!("❌ WebSocket error: {}", e);
return;
}
};
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let (tx, mut rx): (Tx, _) = tokio::sync::mpsc::unbounded_channel();
let user_id = uuid::Uuid::new_v4().to_string();
let mut username: Option<String> = None;
// Send welcome message
let welcome = ServerMessage::Welcome {
user_id: user_id.clone(),
message: "Welcome! Please send a Join message with your username.".to_string(),
};
if let Ok(json) = serde_json::to_string(&welcome) {
let _ = ws_sender.send(Message::Text(json)).await;
}
// Spawn task to forward messages from rx to WebSocket
let mut send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
if ws_sender.send(Message::Text(json)).await.is_err() {
break;
}
}
}
});
// Handle incoming messages
let server_clone = Arc::clone(&server);
let user_id_clone = user_id.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
if let Message::Text(text) = msg {
match serde_json::from_str::<ClientMessage>(&text) {
Ok(ClientMessage::Join { username: uname }) => {
if username.is_none() {
username = Some(uname.clone());
server_clone.add_user(user_id_clone.clone(), uname, tx.clone()).await;
println!("👤 User joined: {} ({})", username.as_ref().unwrap(), user_id_clone);
}
},
Ok(ClientMessage::Message { content }) => {
if username.is_some() {
server_clone.send_message(&user_id_clone, content).await;
}
},
Ok(ClientMessage::Leave) => {
break;
},
Err(e) => {
eprintln!("❌ Invalid message format: {}", e);
}
}
} else if msg.is_close() {
break;
}
}
user_id_clone
});
// Wait for either task to finish
tokio::select! {
user_id = &mut recv_task => {
send_task.abort();
if let Ok(uid) = user_id {
server.remove_user(&uid).await;
println!("👋 User left: {}", uid);
}
}
_ = &mut send_task => {
recv_task.abort();
}
}
}
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080";
let listener = TcpListener::bind(addr).await.unwrap();
let server = Arc::new(ChatServer::new());
println!("🚀 Chat server running on ws://{}", addr);
while let Ok((stream, _)) = listener.accept().await {
let server_clone = Arc::clone(&server);
tokio::spawn(handle_connection(stream, server_clone));
}
}
Advanced HTML Client (chat_client.html):
<!DOCTYPE html>
<html>
<head>
<title>Rust Chat</title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; margin: 10px 0; }
.message { margin: 5px 0; padding: 5px; border-radius: 3px; }
.system { background: #f0f0f0; color: #666; }
.user { background: #e3f2fd; }
.me { background: #c8e6c9; text-align: right; }
#userList { border: 1px solid #ccc; padding: 10px; margin: 10px 0; }
input { width: 70%; padding: 10px; }
button { padding: 10px 20px; }
</style>
</head>
<body>
<h1>💬 Rust Chat Room</h1>
<div>
<input id="username" type="text" placeholder="Enter username...">
<button onclick="join()">Join</button>
<button onclick="leave()" id="leaveBtn" disabled>Leave</button>
</div>
<div id="userList">
<h3>👥 Online Users</h3>
<div id="users"></div>
</div>
<div id="messages"></div>
<div>
<input id="messageInput" type="text" placeholder="Type a message..." disabled>
<button onclick="sendMessage()" id="sendBtn" disabled>Send</button>
</div>
<script>
let ws = null;
let myUserId = null;
function connect() {
ws = new WebSocket('ws://localhost:8080');
ws.onopen = () => {
addMessage('Connected to server', 'system');
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
handleMessage(msg);
};
ws.onerror = (error) => {
addMessage('Error: ' + error, 'system');
};
ws.onclose = () => {
addMessage('Disconnected from server', 'system');
document.getElementById('messageInput').disabled = true;
document.getElementById('sendBtn').disabled = true;
};
}
function handleMessage(msg) {
switch(msg.type) {
case 'Welcome':
myUserId = msg.user_id;
addMessage(msg.message, 'system');
break;
case 'UserJoined':
addMessage(`${msg.username} joined the chat`, 'system');
break;
case 'UserLeft':
addMessage(`${msg.username} left the chat`, 'system');
break;
case 'Message':
const isMe = msg.user_id === myUserId;
const className = isMe ? 'me' : 'user';
addMessage(`${msg.username}: ${msg.content}`, className);
break;
case 'UserList':
updateUserList(msg.users);
break;
case 'Error':
addMessage(`Error: ${msg.message}`, 'system');
break;
}
}
function addMessage(text, className) {
const div = document.createElement('div');
div.className = 'message ' + className;
div.textContent = text;
document.getElementById('messages').appendChild(div);
div.scrollIntoView();
}
function updateUserList(users) {
const userDiv = document.getElementById('users');
userDiv.innerHTML = users.map(u => `<div>${u.username}</div>`).join('');
}
function join() {
const username = document.getElementById('username').value.trim();
if (!username) {
alert('Please enter a username');
return;
}
if (!ws) {
connect();
}
setTimeout(() => {
ws.send(JSON.stringify({
type: 'Join',
username: username
}));
document.getElementById('messageInput').disabled = false;
document.getElementById('sendBtn').disabled = false;
document.getElementById('leaveBtn').disabled = false;
document.getElementById('username').disabled = true;
}, 500);
}
function sendMessage() {
const input = document.getElementById('messageInput');
const message = input.value.trim();
if (message && ws) {
ws.send(JSON.stringify({
type: 'Message',
content: message
}));
input.value = '';
}
}
function leave() {
if (ws) {
ws.send(JSON.stringify({ type: 'Leave' }));
ws.close();
ws = null;
}
}
document.getElementById('messageInput').addEventListener('keypress', (e) => {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
🎨 Bước 4: Version 3 - Multiple Rooms và Private Messages
Tạo src/room.rs:
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::models::{ServerMessage, UserInfo};
use crate::server::User;
pub struct Room {
pub id: String,
pub name: String,
pub users: HashMap<String, User>,
pub message_history: Vec<ServerMessage>,
}
impl Room {
pub fn new(id: String, name: String) -> Self {
Room {
id,
name,
users: HashMap::new(),
message_history: Vec::new(),
}
}
pub fn add_user(&mut self, user: User) {
self.users.insert(user.id.clone(), user);
}
pub fn remove_user(&mut self, user_id: &str) -> Option<User> {
self.users.remove(user_id)
}
pub fn broadcast(&self, msg: ServerMessage, exclude_user: Option<&str>) {
for (user_id, user) in &self.users {
if let Some(exclude) = exclude_user {
if user_id == exclude {
continue;
}
}
let _ = user.tx.send(msg.clone());
}
}
pub fn add_to_history(&mut self, msg: ServerMessage) {
// Keep only last 100 messages
if self.message_history.len() >= 100 {
self.message_history.remove(0);
}
self.message_history.push(msg);
}
pub fn get_users_list(&self) -> Vec<UserInfo> {
self.users
.values()
.map(|u| UserInfo {
user_id: u.id.clone(),
username: u.username.clone(),
})
.collect()
}
}
pub struct RoomManager {
rooms: Arc<RwLock<HashMap<String, Room>>>,
}
impl RoomManager {
pub fn new() -> Self {
let mut rooms = HashMap::new();
// Create default room
let default_room = Room::new("general".to_string(), "General".to_string());
rooms.insert("general".to_string(), default_room);
RoomManager {
rooms: Arc::new(RwLock::new(rooms)),
}
}
pub async fn create_room(&self, room_id: String, name: String) -> Result<(), String> {
let mut rooms = self.rooms.write().await;
if rooms.contains_key(&room_id) {
return Err("Room already exists".to_string());
}
rooms.insert(room_id.clone(), Room::new(room_id, name));
Ok(())
}
pub async fn join_room(&self, room_id: &str, user: User) -> Result<Vec<ServerMessage>, String> {
let mut rooms = self.rooms.write().await;
let room = rooms
.get_mut(room_id)
.ok_or_else(|| "Room not found".to_string())?;
// Broadcast join message
let join_msg = ServerMessage::UserJoined {
user_id: user.id.clone(),
username: user.username.clone(),
};
room.broadcast(join_msg.clone(), Some(&user.id));
room.add_user(user);
// Return message history
Ok(room.message_history.clone())
}
pub async fn leave_room(&self, room_id: &str, user_id: &str) -> Result<(), String> {
let mut rooms = self.rooms.write().await;
let room = rooms
.get_mut(room_id)
.ok_or_else(|| "Room not found".to_string())?;
if let Some(user) = room.remove_user(user_id) {
let leave_msg = ServerMessage::UserLeft {
user_id: user.id.clone(),
username: user.username.clone(),
};
room.broadcast(leave_msg, None);
}
Ok(())
}
pub async fn send_message(&self, room_id: &str, user_id: &str, content: String) -> Result<(), String> {
let mut rooms = self.rooms.write().await;
let room = rooms
.get_mut(room_id)
.ok_or_else(|| "Room not found".to_string())?;
let user = room
.users
.get(user_id)
.ok_or_else(|| "User not in room".to_string())?;
let msg = ServerMessage::Message {
user_id: user.id.clone(),
username: user.username.clone(),
content,
timestamp: chrono::Utc::now(),
};
room.add_to_history(msg.clone());
room.broadcast(msg, None);
Ok(())
}
pub async fn get_room_list(&self) -> Vec<(String, String, usize)> {
let rooms = self.rooms.read().await;
rooms
.values()
.map(|r| (r.id.clone(), r.name.clone(), r.users.len()))
.collect()
}
}
Cập nhật src/models.rs với room messages:
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ClientMessage {
Join { username: String },
JoinRoom { room_id: String },
LeaveRoom { room_id: String },
Message { room_id: String, content: String },
PrivateMessage { to_user_id: String, content: String },
ListRooms,
Leave,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ServerMessage {
Welcome {
user_id: String,
message: String,
},
RoomJoined {
room_id: String,
room_name: String,
history: Vec<ServerMessage>,
},
UserJoined {
user_id: String,
username: String,
},
UserLeft {
user_id: String,
username: String,
},
Message {
user_id: String,
username: String,
content: String,
timestamp: DateTime<Utc>,
},
PrivateMessage {
from_user_id: String,
from_username: String,
content: String,
timestamp: DateTime<Utc>,
},
RoomList {
rooms: Vec<RoomInfo>,
},
UserList {
users: Vec<UserInfo>,
},
Error {
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoomInfo {
pub room_id: String,
pub name: String,
pub user_count: usize,
}