Chuyển tới nội dung chính

🚀 Tokio: Async Runtime Phổ Biến

🎯 Mục Tiêu Bài Học

Sau khi hoàn thành bài học này, bạn sẽ:

  • ✅ Setup Tokio trong project
  • ✅ Sử dụng #[tokio::main]
  • ✅ Tokio tasks và spawning
  • ✅ Async I/O với Tokio
  • ✅ Timers và intervals
  • ✅ Synchronization primitives

🤔 Tokio Là Gì?

Tokioasync runtime phổ biến nhất cho Rust:

🎭 Async Runtime:

  • Scheduler để chạy async tasks
  • I/O event loop
  • Timers và utilities
  • Multi-threaded work stealing

🦀 Tokio Features:

  • Fast và reliable
  • Nhiều utilities built-in
  • Production-ready
  • Được nhiều companies sử dụng

📦 Setup Tokio

Cargo.toml

[dependencies]
tokio = { version = "1", features = ["full"] }

Features Phổ Biến

# Full - tất cả features
tokio = { version = "1", features = ["full"] }

# Minimal - chỉ runtime
tokio = { version = "1", features = ["rt"] }

# Với macros và time
tokio = { version = "1", features = ["macros", "time"] }

# Multi-threaded runtime
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

🎯 #[tokio::main]

Basic Usage

#[tokio::main]
async fn main() {
println!("Hello from Tokio!");
}

Flavor Options

// Single-threaded (default cho async fn main)
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Single-threaded runtime");
}

// Multi-threaded
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
println!("Multi-threaded with 4 workers");
}

Manual Runtime

use tokio::runtime::Runtime;

fn main() {
let rt = Runtime::new().unwrap();

rt.block_on(async {
println!("Running async code");
});
}

🧵 Tokio Tasks

tokio::spawn

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
sleep(Duration::from_secs(1)).await;
println!("Task completed!");
42
});

let result = handle.await.unwrap();
println!("Result: {}", result);
}

Multiple Tasks

use tokio::time::{sleep, Duration};

async fn worker(id: u32, delay: u64) -> String {
sleep(Duration::from_millis(delay)).await;
format!("Worker {} done", id)
}

#[tokio::main]
async fn main() {
let handles: Vec<_> = (0..5)
.map(|i| tokio::spawn(worker(i, i * 100)))
.collect();

for handle in handles {
let result = handle.await.unwrap();
println!("{}", result);
}
}

Task Cancellation

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
loop {
println!("Working...");
sleep(Duration::from_secs(1)).await;
}
});

sleep(Duration::from_secs(3)).await;

handle.abort();
println!("Task aborted");
}

⏱️ Time và Timers

tokio::time::sleep

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
println!("Start");
sleep(Duration::from_secs(2)).await;
println!("2 seconds later");
}

tokio::time::interval

use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
let mut interval = interval(Duration::from_secs(1));

for i in 0..5 {
interval.tick().await;
println!("Tick {}", i);
}
}

timeout

use tokio::time::{sleep, timeout, Duration};

async fn slow_task() -> String {
sleep(Duration::from_secs(5)).await;
String::from("Done")
}

#[tokio::main]
async fn main() {
match timeout(Duration::from_secs(2), slow_task()).await {
Ok(result) => println!("Result: {}", result),
Err(_) => println!("Timeout!"),
}
}

Instant và elapsed

use tokio::time::Instant;

#[tokio::main]
async fn main() {
let start = Instant::now();

// Do some async work
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

println!("Elapsed: {:?}", start.elapsed());
}

📡 Async I/O

Reading Files

use tokio::fs::File;
use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut file = File::open("example.txt").await?;

let mut contents = String::new();
file.read_to_string(&mut contents).await?;

println!("File contents: {}", contents);

Ok(())
}

Writing Files

use tokio::fs::File;
use tokio::io::AsyncWriteExt;

#[tokio::main]
async fn main() -> std::io::Result<()> {
let mut file = File::create("output.txt").await?;

file.write_all(b"Hello, Tokio!").await?;

println!("File written");

Ok(())
}

Copy Files

use tokio::fs;

#[tokio::main]
async fn main() -> std::io::Result<()> {
fs::copy("source.txt", "destination.txt").await?;

println!("File copied");

Ok(())
}

🔀 Synchronization

tokio::sync::Mutex

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let counter = Arc::new(Mutex::new(0));

let mut handles = vec![];

for _ in 0..10 {
let counter = Arc::clone(&counter);

let handle = tokio::spawn(async move {
let mut num = counter.lock().await;
*num += 1;
});

handles.push(handle);
}

for handle in handles {
handle.await.unwrap();
}

println!("Counter: {}", *counter.lock().await);
}

tokio::sync::RwLock

use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));

// Multiple readers
let data1 = Arc::clone(&data);
let reader1 = tokio::spawn(async move {
let r = data1.read().await;
println!("Reader 1: {:?}", *r);
});

let data2 = Arc::clone(&data);
let reader2 = tokio::spawn(async move {
let r = data2.read().await;
println!("Reader 2: {:?}", *r);
});

// Writer
let data3 = Arc::clone(&data);
let writer = tokio::spawn(async move {
let mut w = data3.write().await;
w.push(4);
});

reader1.await.unwrap();
reader2.await.unwrap();
writer.await.unwrap();

println!("Final: {:?}", *data.read().await);
}

tokio::sync::mpsc

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);

tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});

while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}

tokio::sync::oneshot

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
let (tx, rx) = oneshot::channel();

tokio::spawn(async move {
tx.send("Hello from task!").unwrap();
});

let result = rx.await.unwrap();
println!("Received: {}", result);
}

🎯 Ví Dụ Thực Tế

Ví Dụ 1: HTTP Server (Conceptual)

use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server listening on port 8080");

loop {
let (mut socket, addr) = listener.accept().await?;
println!("New connection from: {}", addr);

tokio::spawn(async move {
let mut buf = [0; 1024];

loop {
let n = match socket.read(&mut buf).await {
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(_) => return,
};

if socket.write_all(&buf[0..n]).await.is_err() {
return;
}
}
});
}
}

Ví Dụ 2: Parallel File Processing

use tokio::fs;

async fn process_file(path: &str) -> std::io::Result<usize> {
let contents = fs::read_to_string(path).await?;
Ok(contents.len())
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
let files = vec!["file1.txt", "file2.txt", "file3.txt"];

let tasks: Vec<_> = files
.into_iter()
.map(|file| tokio::spawn(async move {
process_file(file).await
}))
.collect();

for task in tasks {
match task.await {
Ok(Ok(size)) => println!("File size: {}", size),
Ok(Err(e)) => println!("Error: {}", e),
Err(e) => println!("Task error: {}", e),
}
}

Ok(())
}

Ví Dụ 3: Rate Limiter

use tokio::time::{interval, Duration};

struct RateLimiter {
interval: tokio::time::Interval,
}

impl RateLimiter {
fn new(rate_per_second: u64) -> Self {
let duration = Duration::from_millis(1000 / rate_per_second);
RateLimiter {
interval: interval(duration),
}
}

async fn acquire(&mut self) {
self.interval.tick().await;
}
}

#[tokio::main]
async fn main() {
let mut limiter = RateLimiter::new(5); // 5 requests per second

for i in 0..10 {
limiter.acquire().await;
println!("Request {} sent", i);
}
}

Ví Dụ 4: Worker Pool

use tokio::sync::mpsc;

async fn worker(id: u32, mut rx: mpsc::Receiver<u32>) {
while let Some(job) = rx.recv().await {
println!("Worker {} processing job {}", id, job);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}

#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);

// Spawn 3 workers sharing the receiver
for i in 0..3 {
let rx_clone = rx.clone();
tokio::spawn(worker(i, rx_clone));
}
drop(rx); // Drop original

// Send jobs
for job in 0..10 {
tx.send(job).await.unwrap();
}

drop(tx); // Close channel

tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}

Wait, receivers can't be cloned in mpsc. Let me fix this:

use tokio::sync::mpsc;
use std::sync::Arc;

async fn worker(id: u32, rx: Arc<tokio::sync::Mutex<mpsc::Receiver<u32>>>) {
loop {
let job = {
let mut rx = rx.lock().await;
rx.recv().await
};

match job {
Some(job) => {
println!("Worker {} processing job {}", id, job);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
None => break,
}
}
}

#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(32);
let rx = Arc::new(tokio::sync::Mutex::new(rx));

// Spawn 3 workers
let mut handles = vec![];
for i in 0..3 {
let rx = Arc::clone(&rx);
handles.push(tokio::spawn(worker(i, rx)));
}

// Send jobs
for job in 0..10 {
tx.send(job).await.unwrap();
}

drop(tx); // Close channel

for handle in handles {
handle.await.unwrap();
}
}

Ví Dụ 5: Retry Pattern

use tokio::time::{sleep, Duration};

async fn unreliable_operation() -> Result<String, String> {
// Simulate 50% failure rate
if rand::random() {
Ok(String::from("Success!"))
} else {
Err(String::from("Failed"))
}
}

async fn retry<F, Fut, T>(mut f: F, max_attempts: u32) -> Result<T, String>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, String>>,
{
for attempt in 1..=max_attempts {
match f().await {
Ok(result) => return Ok(result),
Err(e) if attempt == max_attempts => return Err(e),
Err(_) => {
println!("Attempt {} failed, retrying...", attempt);
sleep(Duration::from_secs(1)).await;
}
}
}
unreachable!()
}

#[tokio::main]
async fn main() {
match retry(unreliable_operation, 5).await {
Ok(result) => println!("Result: {}", result),
Err(e) => println!("All attempts failed: {}", e),
}
}

Ví Dụ 6: Fan-out / Fan-in

use tokio::sync::mpsc;

async fn producer(id: u32, tx: mpsc::Sender<(u32, String)>) {
for i in 0..3 {
let msg = format!("Producer {} - Message {}", id, i);
tx.send((id, msg)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);

// Fan-out: spawn multiple producers
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(producer(i, tx));
}
drop(tx); // Drop original

// Fan-in: collect all messages
let mut messages = vec![];
while let Some((id, msg)) = rx.recv().await {
println!("Received from producer {}: {}", id, msg);
messages.push((id, msg));
}

println!("Total messages: {}", messages.len());
}

🛡️ Error Handling

Result Propagation

use tokio::fs;

async fn read_file(path: &str) -> std::io::Result<String> {
let contents = fs::read_to_string(path).await?;
Ok(contents)
}

#[tokio::main]
async fn main() {
match read_file("example.txt").await {
Ok(contents) => println!("Contents: {}", contents),
Err(e) => eprintln!("Error: {}", e),
}
}

Join Error Handling

use tokio::task::JoinError;

async fn fallible_task(should_fail: bool) -> Result<i32, String> {
if should_fail {
Err(String::from("Task failed!"))
} else {
Ok(42)
}
}

#[tokio::main]
async fn main() {
let handle = tokio::spawn(fallible_task(false));

match handle.await {
Ok(Ok(value)) => println!("Success: {}", value),
Ok(Err(e)) => println!("Task error: {}", e),
Err(e) => println!("Join error: {}", e),
}
}

💻 Bài Tập Thực Hành

Bài 1: Basic Task

#[tokio::main]
async fn main() {
// TODO: Spawn 3 tasks, mỗi task sleep i seconds và return i
// TODO: Await tất cả và print results
}
💡 Gợi ý
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
let handles: Vec<_> = (1..=3)
.map(|i| {
tokio::spawn(async move {
sleep(Duration::from_secs(i)).await;
i
})
})
.collect();

for handle in handles {
let result = handle.await.unwrap();
println!("Result: {}", result);
}
}

Bài 2: Interval Timer

use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
// TODO: Tạo interval 500ms
// TODO: Print "Tick" 5 times
}
💡 Gợi ý
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
let mut interval = interval(Duration::from_millis(500));

for _ in 0..5 {
interval.tick().await;
println!("Tick");
}
}

Bài 3: Channel Communication

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
// TODO: Tạo channel
// TODO: Spawn task gửi 5 messages
// TODO: Receive và print messages
}
💡 Gợi ý
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);

tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});

while let Some(value) = rx.recv().await {
println!("Received: {}", value);
}
}

🎯 Tóm Tắt

ModuleMô Tả
tokio::taskTask spawning và management
tokio::timeTimers, intervals, timeouts
tokio::syncAsync synchronization primitives
tokio::fsAsync file operations
tokio::netAsync networking
tokio::ioAsync I/O traits

Quy tắc vàng:

  • ✅ Tokio cho production async Rust
  • ✅ #[tokio::main] để setup runtime
  • ✅ tokio::spawn cho concurrent tasks
  • ✅ Dùng tokio::sync cho async synchronization
  • ✅ interval cho periodic tasks
  • ✅ timeout để tránh hang forever
  • ✅ Multi-threaded runtime cho CPU-bound work

🔗 Liên Kết Hữu Ích


Bài tiếp theo: Unit Testing →

Trong bài tiếp theo, chúng ta sẽ tìm hiểu về Testing trong Rust!

Loading comments...