Async Programming Exercises
Master Rust's async/await with these exercises covering async functions, futures, Tokio runtime, error handling, and real-world async patterns.
Prerequisites
Add these dependencies to your Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
Exercise 1: Basic Async Function
Difficulty: Easy
Problem: Create and execute a simple async function.
Requirements:
- Define an async function
- Use
awaitto call it - Set up Tokio runtime
- Return values from async functions
Example:
async fn hello() -> String {
"Hello, async!".to_string()
}
#[tokio::main]
async fn main() {
let msg = hello().await;
println!("{}", msg);
}
Hints:
- Use
async fnsyntax - Call async functions with
.await - Use
#[tokio::main]for async main - Async functions return
Future<Output = T>
Solution
use tokio;
async fn greet(name: &str) -> String {
format!("Hello, {}!", name)
}
async fn calculate_sum(a: i32, b: i32) -> i32 {
a + b
}
async fn delayed_greeting(name: &str, delay_ms: u64) -> String {
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
format!("Hello after {}ms, {}!", delay_ms, name)
}
#[tokio::main]
async fn main() {
let greeting = greet("World").await;
println!("{}", greeting);
let sum = calculate_sum(5, 7).await;
println!("Sum: {}", sum);
let delayed = delayed_greeting("Alice", 100).await;
println!("{}", delayed);
}
Learning Points:
- Defining async functions
- Using
.awaitto get results #[tokio::main]macro- Async functions and Future trait
Exercise 2: Concurrent Async Tasks
Difficulty: Easy
Problem: Run multiple async tasks concurrently.
Requirements:
- Spawn multiple async tasks
- Use
tokio::spawn - Wait for tasks to complete
- Collect results using
JoinHandle
Example:
let handle = tokio::spawn(async {
// async work
});
let result = handle.await?;
Hints:
- Use
tokio::spawn()to create tasks - Tasks run concurrently
- Use
join!macro for multiple futures JoinHandlereturns Result
Solution
use tokio;
use tokio::time::{sleep, Duration};
async fn task(id: u32, duration_ms: u64) -> u32 {
println!("Task {} starting", id);
sleep(Duration::from_millis(duration_ms)).await;
println!("Task {} completed", id);
id * 2
}
#[tokio::main]
async fn main() {
// Spawn multiple tasks
let handle1 = tokio::spawn(task(1, 100));
let handle2 = tokio::spawn(task(2, 200));
let handle3 = tokio::spawn(task(3, 150));
// Wait for all tasks
let result1 = handle1.await.unwrap();
let result2 = handle2.await.unwrap();
let result3 = handle3.await.unwrap();
println!("Results: {}, {}, {}", result1, result2, result3);
// Using join! macro
let (r1, r2, r3) = tokio::join!(
task(4, 100),
task(5, 200),
task(6, 150)
);
println!("Join results: {}, {}, {}", r1, r2, r3);
}
Learning Points:
- Spawning tasks with
tokio::spawn - Concurrent task execution
- Awaiting JoinHandles
join!macro for running futures concurrently
Exercise 3: Async Error Handling
Difficulty: Medium
Problem: Handle errors in async functions properly.
Requirements:
- Return
Resultfrom async functions - Use
?operator in async context - Propagate errors correctly
- Handle errors from spawned tasks
Example:
async fn fallible_operation() -> Result<i32, String> {
// might fail
}
Hints:
- Async functions can return
Result<T, E> - Use
?operator as in sync code JoinHandle::awaitreturnsResult<T, JoinError>- Chain error handling
Solution
use tokio;
#[derive(Debug)]
enum AppError {
InvalidInput(String),
Timeout,
NetworkError(String),
}
impl std::fmt::Display for AppError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
AppError::InvalidInput(msg) => write!(f, "Invalid input: {}", msg),
AppError::Timeout => write!(f, "Operation timed out"),
AppError::NetworkError(msg) => write!(f, "Network error: {}", msg),
}
}
}
impl std::error::Error for AppError {}
async fn parse_number(s: &str) -> Result<i32, AppError> {
s.parse::<i32>()
.map_err(|e| AppError::InvalidInput(format!("Parse error: {}", e)))
}
async fn validate_positive(n: i32) -> Result<i32, AppError> {
if n > 0 {
Ok(n)
} else {
Err(AppError::InvalidInput(format!("{} is not positive", n)))
}
}
async fn process_with_timeout(s: &str) -> Result<i32, AppError> {
let num = parse_number(s).await?;
let validated = validate_positive(num).await?;
tokio::time::timeout(
tokio::time::Duration::from_secs(1),
async { Ok(validated * 2) }
)
.await
.map_err(|_| AppError::Timeout)?
}
async fn fetch_data(id: u32) -> Result<String, AppError> {
if id == 0 {
return Err(AppError::InvalidInput("ID cannot be zero".to_string()));
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
if id > 10 {
Err(AppError::NetworkError(format!("Failed to fetch ID {}", id)))
} else {
Ok(format!("Data for ID {}", id))
}
}
#[tokio::main]
async fn main() {
match process_with_timeout("42").await {
Ok(result) => println!("Success: {}", result),
Err(e) => println!("Error: {}", e),
}
match process_with_timeout("-5").await {
Ok(result) => println!("Success: {}", result),
Err(e) => println!("Error: {}", e),
}
// Handling errors from spawned tasks
let handles: Vec<_> = (0..5)
.map(|id| tokio::spawn(fetch_data(id)))
.collect();
for (i, handle) in handles.into_iter().enumerate() {
match handle.await {
Ok(Ok(data)) => println!("Task {}: {}", i, data),
Ok(Err(e)) => println!("Task {} error: {}", i, e),
Err(e) => println!("Task {} panicked: {}", i, e),
}
}
}
Learning Points:
- Async functions with Result
- Error propagation with
? - Handling task join errors
- Timeout handling
- Custom error types in async
Exercise 4: Select and Race
Difficulty: Medium
Problem: Use select! and try_join! to handle multiple futures.
Requirements:
- Use
tokio::select!to wait on multiple futures - First future to complete wins
- Handle different completion branches
- Use
try_join!for error handling
Example:
tokio::select! {
result1 = future1 => { /* handle */ },
result2 = future2 => { /* handle */ },
}
Hints:
select!races multiple futures- Only one branch executes
- Use
try_join!when all must succeed - Pattern match on results
Solution
use tokio;
use tokio::time::{sleep, Duration};
async fn task_a() -> &'static str {
sleep(Duration::from_millis(100)).await;
"Task A completed"
}
async fn task_b() -> &'static str {
sleep(Duration::from_millis(200)).await;
"Task B completed"
}
async fn select_example() {
tokio::select! {
result = task_a() => {
println!("First to complete: {}", result);
}
result = task_b() => {
println!("First to complete: {}", result);
}
}
}
async fn fetch_from_primary() -> Result<String, String> {
sleep(Duration::from_millis(150)).await;
Ok("Data from primary".to_string())
}
async fn fetch_from_backup() -> Result<String, String> {
sleep(Duration::from_millis(100)).await;
Ok("Data from backup".to_string())
}
async fn fetch_with_fallback() -> Result<String, String> {
tokio::select! {
result = fetch_from_primary() => result,
result = fetch_from_backup() => {
println!("Using backup");
result
}
}
}
async fn download(url: &str) -> Result<String, String> {
sleep(Duration::from_millis(100)).await;
if url.is_empty() {
Err("Empty URL".to_string())
} else {
Ok(format!("Downloaded from {}", url))
}
}
async fn parallel_downloads() -> Result<(), String> {
let urls = vec!["url1", "url2", "url3"];
let results = tokio::try_join!(
download(urls[0]),
download(urls[1]),
download(urls[2])
)?;
println!("All downloads complete:");
println!(" {}", results.0);
println!(" {}", results.1);
println!(" {}", results.2);
Ok(())
}
async fn timeout_example() {
let operation = async {
sleep(Duration::from_secs(2)).await;
"Completed"
};
tokio::select! {
result = operation => {
println!("Operation completed: {}", result);
}
_ = sleep(Duration::from_millis(500)) => {
println!("Operation timed out");
}
}
}
#[tokio::main]
async fn main() {
println!("=== Select Example ===");
select_example().await;
println!("\n=== Fetch with Fallback ===");
match fetch_with_fallback().await {
Ok(data) => println!("Got: {}", data),
Err(e) => println!("Error: {}", e),
}
println!("\n=== Parallel Downloads ===");
parallel_downloads().await.ok();
println!("\n=== Timeout Example ===");
timeout_example().await;
}
Learning Points:
select!for racing futures- First-to-complete semantics
try_join!for parallel execution with error handling- Implementing timeouts with select
- Fallback patterns
Exercise 5: Channels in Async
Difficulty: Medium
Problem: Use async channels for communication between tasks.
Requirements:
- Create async channels (mpsc, oneshot)
- Send and receive messages asynchronously
- Handle channel closure
- Implement producer-consumer pattern
Example:
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
tx.send(data).await.unwrap();
});
let msg = rx.recv().await;
Hints:
- Use
tokio::sync::mpscfor multiple producer - Use
tokio::sync::oneshotfor single message - Channels are async-aware
- Use
.awaiton send/recv
Solution
use tokio;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
async fn mpsc_example() {
let (tx, mut rx) = mpsc::channel(32);
// Spawn producers
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
for j in 0..3 {
let msg = format!("Producer {} - Message {}", i, j);
tx.send(msg).await.unwrap();
sleep(Duration::from_millis(50)).await;
}
});
}
drop(tx); // Drop original sender
// Receive messages
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
}
async fn oneshot_example() {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
tx.send("Result from spawned task").unwrap();
});
match rx.await {
Ok(result) => println!("Got: {}", result),
Err(_) => println!("Sender dropped"),
}
}
async fn worker_pool() {
let (job_tx, mut job_rx) = mpsc::channel::<u32>(10);
let (result_tx, mut result_rx) = mpsc::channel::<u32>(10);
// Spawn workers
for worker_id in 0..3 {
let mut job_rx = job_rx.clone();
let result_tx = result_tx.clone();
tokio::spawn(async move {
while let Some(job) = job_rx.recv().await {
println!("Worker {} processing job {}", worker_id, job);
sleep(Duration::from_millis(100)).await;
let result = job * 2;
result_tx.send(result).await.unwrap();
}
});
}
drop(job_rx);
drop(result_tx);
// Send jobs
tokio::spawn(async move {
for i in 0..9 {
job_tx.send(i).await.unwrap();
}
});
// Collect results
let mut results = vec![];
while let Some(result) = result_rx.recv().await {
results.push(result);
}
println!("Results: {:?}", results);
}
#[tokio::main]
async fn main() {
println!("=== MPSC Example ===");
mpsc_example().await;
println!("\n=== Oneshot Example ===");
oneshot_example().await;
println!("\n=== Worker Pool ===");
worker_pool().await;
}
Learning Points:
- Async channels: mpsc and oneshot
- Channel capacity and backpressure
- Async send and receive
- Channel-based task communication
Exercise 6: Shared State in Async
Difficulty: Medium
Problem: Share state between async tasks safely.
Requirements:
- Use
Arc<Mutex<T>>orArc<RwLock<T>> - Access shared data from multiple tasks
- Handle async lock acquisition
- Avoid holding locks across await points
Example:
let data = Arc::new(Mutex::new(vec![]));
// Share between tasks
Hints:
- Use
tokio::sync::Mutexfor async locks - Don't hold MutexGuard across
.await - Consider using channels instead
- Use
RwLockfor read-heavy workloads
Solution
use tokio;
use tokio::sync::{Mutex, RwLock};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
async fn shared_counter() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..5 {
let counter = Arc::clone(&counter);
let handle = tokio::spawn(async move {
for _ in 0..10 {
let mut num = counter.lock().await;
*num += 1;
println!("Task {} incremented to {}", i, *num);
drop(num); // Release lock before sleeping
sleep(Duration::from_millis(10)).await;
}
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
println!("Final count: {}", *counter.lock().await);
}
async fn shared_cache() {
use std::collections::HashMap;
let cache = Arc::new(RwLock::new(HashMap::new()));
// Writer task
let cache_writer = Arc::clone(&cache);
let writer = tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(100)).await;
let mut cache = cache_writer.write().await;
cache.insert(format!("key{}", i), i * 10);
println!("Writer added key{}", i);
}
});
// Reader tasks
let mut readers = vec![];
for reader_id in 0..3 {
let cache = Arc::clone(&cache);
let reader = tokio::spawn(async move {
for i in 0..5 {
sleep(Duration::from_millis(120)).await;
let cache = cache.read().await;
if let Some(value) = cache.get(&format!("key{}", i)) {
println!("Reader {} read key{} = {}", reader_id, i, value);
} else {
println!("Reader {} key{} not found", reader_id, i);
}
}
});
readers.push(reader);
}
writer.await.unwrap();
for reader in readers {
reader.await.unwrap();
}
}
// Demonstrating the danger of holding lock across await
async fn bad_example() {
let data = Arc::new(Mutex::new(0));
let data_clone = Arc::clone(&data);
tokio::spawn(async move {
let mut num = data_clone.lock().await;
*num += 1;
// BAD: Holding lock across await
// sleep(Duration::from_secs(1)).await;
// This would block other tasks unnecessarily
});
// Good: Release lock before await
let data_clone = Arc::clone(&data);
tokio::spawn(async move {
{
let mut num = data_clone.lock().await;
*num += 1;
} // Lock released here
sleep(Duration::from_secs(1)).await; // Now safe
});
}
#[tokio::main]
async fn main() {
println!("=== Shared Counter ===");
shared_counter().await;
println!("\n=== Shared Cache ===");
shared_cache().await;
}
Learning Points:
tokio::sync::Mutexfor async-aware lockingRwLockfor concurrent readers- Avoiding deadlocks in async
- Not holding locks across await points
Exercise 7: Stream Processing
Difficulty: Medium
Problem: Work with async streams.
Requirements:
- Create and consume streams
- Use StreamExt trait methods
- Transform streams with map, filter
- Collect stream results
Example:
use futures::stream::{self, StreamExt};
let stream = stream::iter(vec![1, 2, 3]);
Hints:
- Use
futurescrate for Stream trait StreamExtprovides combinators- Similar to iterators but async
- Use
while let Some(item) = stream.next().await
Solution
use tokio;
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn basic_stream() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
stream
.for_each(|item| async move {
println!("Item: {}", item);
})
.await;
}
async fn transform_stream() {
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let doubled: Vec<_> = stream
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x > 5))
.collect()
.await;
println!("Filtered and doubled: {:?}", doubled);
}
async fn async_stream() {
let stream = stream::iter(0..5)
.then(|i| async move {
sleep(Duration::from_millis(100)).await;
i * 2
});
stream
.for_each(|item| async move {
println!("Processed: {}", item);
})
.await;
}
async fn buffered_stream() {
use futures::stream;
let requests = stream::iter(0..10)
.map(|i| async move {
sleep(Duration::from_millis(100)).await;
println!("Processing request {}", i);
i * 2
})
.buffered(3); // Process up to 3 concurrently
let results: Vec<_> = requests.collect().await;
println!("Results: {:?}", results);
}
async fn channel_as_stream() {
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
sleep(Duration::from_millis(50)).await;
}
});
let mut stream = ReceiverStream::new(rx);
while let Some(item) = stream.next().await {
println!("From channel stream: {}", item);
}
}
#[tokio::main]
async fn main() {
println!("=== Basic Stream ===");
basic_stream().await;
println!("\n=== Transform Stream ===");
transform_stream().await;
println!("\n=== Async Stream ===");
async_stream().await;
println!("\n=== Buffered Stream ===");
buffered_stream().await;
println!("\n=== Channel as Stream ===");
channel_as_stream().await;
}
Add to Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
futures = "0.3"
tokio-stream = "0.1"
Learning Points:
- Stream trait for async sequences
- Stream combinators (map, filter, etc.)
- Buffered processing
- Converting channels to streams
Exercise 8: Graceful Shutdown
Difficulty: Hard
Problem: Implement graceful shutdown for async tasks.
Requirements:
- Signal shutdown to all tasks
- Wait for tasks to complete
- Use cancellation tokens or channels
- Clean up resources
Example:
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
// Tasks listen for shutdown signal
Hints:
- Use
tokio::sync::broadcastfor shutdown signal - Use
select!to listen for shutdown - Join all task handles
- Use
CancellationTokenfrom tokio-util
Solution
use tokio;
use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
async fn worker(id: u32, mut shutdown: broadcast::Receiver<()>) {
println!("Worker {} started", id);
loop {
tokio::select! {
_ = sleep(Duration::from_millis(100)) => {
println!("Worker {} working...", id);
}
_ = shutdown.recv() => {
println!("Worker {} received shutdown signal", id);
break;
}
}
}
println!("Worker {} performing cleanup", id);
sleep(Duration::from_millis(50)).await;
println!("Worker {} shutdown complete", id);
}
async fn graceful_shutdown_example() {
let (shutdown_tx, _) = broadcast::channel(1);
let mut handles = vec![];
// Spawn workers
for i in 0..3 {
let shutdown_rx = shutdown_tx.subscribe();
let handle = tokio::spawn(worker(i, shutdown_rx));
handles.push(handle);
}
// Let workers run for a bit
sleep(Duration::from_millis(500)).await;
// Send shutdown signal
println!("Sending shutdown signal");
shutdown_tx.send(()).unwrap();
// Wait for all workers to finish
for handle in handles {
handle.await.unwrap();
}
println!("All workers shut down");
}
// Using CancellationToken (requires tokio-util)
async fn worker_with_token(id: u32, token: tokio_util::sync::CancellationToken) {
println!("Worker {} started", id);
loop {
tokio::select! {
_ = sleep(Duration::from_millis(100)) => {
println!("Worker {} working...", id);
}
_ = token.cancelled() => {
println!("Worker {} cancelled", id);
break;
}
}
}
println!("Worker {} cleanup done", id);
}
async fn cancellation_token_example() {
let token = tokio_util::sync::CancellationToken::new();
let mut handles = vec![];
for i in 0..3 {
let token = token.clone();
let handle = tokio::spawn(worker_with_token(i, token));
handles.push(handle);
}
sleep(Duration::from_millis(500)).await;
println!("Cancelling all workers");
token.cancel();
for handle in handles {
handle.await.unwrap();
}
println!("All workers cancelled");
}
#[tokio::main]
async fn main() {
println!("=== Graceful Shutdown with Broadcast ===");
graceful_shutdown_example().await;
println!("\n=== Cancellation Token ===");
cancellation_token_example().await;
}
Add to Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = "0.7"
Learning Points:
- Shutdown signaling patterns
broadcastchannel for notificationsCancellationTokenfor cancellation- Graceful cleanup
- Coordinating task termination
Exercise 9: Rate Limiting
Difficulty: Medium
Problem: Implement rate limiting for async operations.
Requirements:
- Limit requests per time period
- Use
tokio::time::interval - Implement token bucket or similar
- Handle backpressure
Example:
let mut interval = interval(Duration::from_millis(100));
interval.tick().await; // Wait for next tick
Hints:
- Use
tokio::time::interval - Combine with channels for backpressure
- Consider semaphores for concurrency limits
- Use governor crate for advanced rate limiting
Solution
use tokio;
use tokio::time::{interval, sleep, Duration, Instant};
async fn simple_rate_limit() {
let mut interval = interval(Duration::from_millis(200));
for i in 0..5 {
interval.tick().await;
println!("Request {} at {:?}", i, Instant::now());
}
}
async fn process_with_rate_limit(id: u32) {
println!("Processing request {}", id);
sleep(Duration::from_millis(50)).await;
}
async fn rate_limited_worker() {
let mut interval = interval(Duration::from_millis(100));
let requests = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
for request in requests {
interval.tick().await;
tokio::spawn(process_with_rate_limit(request));
}
sleep(Duration::from_millis(200)).await;
}
// Using Semaphore for concurrency limiting
async fn concurrent_requests(id: u32) {
println!("Request {} starting", id);
sleep(Duration::from_millis(100)).await;
println!("Request {} complete", id);
}
async fn semaphore_example() {
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(3)); // Max 3 concurrent
let mut handles = vec![];
for i in 0..10 {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let handle = tokio::spawn(async move {
concurrent_requests(i).await;
drop(permit); // Release permit
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap();
}
}
// Token bucket implementation
struct TokenBucket {
tokens: tokio::sync::Mutex<f64>,
capacity: f64,
refill_rate: f64, // tokens per second
last_refill: tokio::sync::Mutex<Instant>,
}
impl TokenBucket {
fn new(capacity: f64, refill_rate: f64) -> Self {
TokenBucket {
tokens: tokio::sync::Mutex::new(capacity),
capacity,
refill_rate,
last_refill: tokio::sync::Mutex::new(Instant::now()),
}
}
async fn acquire(&self) -> bool {
self.refill().await;
let mut tokens = self.tokens.lock().await;
if *tokens >= 1.0 {
*tokens -= 1.0;
true
} else {
false
}
}
async fn refill(&self) {
let now = Instant::now();
let mut last_refill = self.last_refill.lock().await;
let elapsed = now.duration_since(*last_refill).as_secs_f64();
let mut tokens = self.tokens.lock().await;
*tokens = (*tokens + elapsed * self.refill_rate).min(self.capacity);
*last_refill = now;
}
}
async fn token_bucket_example() {
use std::sync::Arc;
let bucket = Arc::new(TokenBucket::new(5.0, 2.0)); // 5 tokens, refill 2/sec
for i in 0..10 {
if bucket.acquire().await {
println!("Request {} accepted", i);
} else {
println!("Request {} rate limited", i);
}
sleep(Duration::from_millis(100)).await;
}
}
#[tokio::main]
async fn main() {
println!("=== Simple Rate Limit ===");
simple_rate_limit().await;
println!("\n=== Rate Limited Worker ===");
rate_limited_worker().await;
println!("\n=== Semaphore Example ===");
semaphore_example().await;
println!("\n=== Token Bucket ===");
token_bucket_example().await;
}
Learning Points:
- Rate limiting with intervals
- Concurrency limiting with Semaphore
- Token bucket algorithm
- Backpressure handling
Exercise 10: Retry Logic
Difficulty: Medium
Problem: Implement retry logic with exponential backoff.
Requirements:
- Retry failed operations
- Implement exponential backoff
- Set maximum retry attempts
- Handle permanent vs temporary failures
Example:
async fn with_retry<F, T>(f: F, max_attempts: u32) -> Result<T, Error>
where
F: Fn() -> Future<Output = Result<T, Error>>
Hints:
- Use loop for retry attempts
- Increase delay exponentially
- Use
tokio::time::sleep - Consider using
tokio-retrycrate
Solution
use tokio;
use tokio::time::{sleep, Duration};
#[derive(Debug, Clone)]
enum AppError {
Temporary(String),
Permanent(String),
}
async fn unreliable_operation(attempt: u32) -> Result<String, AppError> {
println!("Attempt {}", attempt);
if attempt < 3 {
Err(AppError::Temporary(format!("Failed on attempt {}", attempt)))
} else {
Ok(format!("Success on attempt {}", attempt))
}
}
async fn retry_with_backoff<F, Fut, T>(
mut operation: F,
max_attempts: u32,
) -> Result<T, AppError>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, AppError>>,
{
let mut attempt = 0;
let mut delay = Duration::from_millis(100);
loop {
attempt += 1;
match operation().await {
Ok(result) => return Ok(result),
Err(AppError::Permanent(msg)) => {
return Err(AppError::Permanent(msg));
}
Err(AppError::Temporary(msg)) if attempt >= max_attempts => {
return Err(AppError::Temporary(format!(
"Max attempts reached. Last error: {}",
msg
)));
}
Err(AppError::Temporary(msg)) => {
println!("Temporary error: {}. Retrying in {:?}...", msg, delay);
sleep(delay).await;
delay *= 2; // Exponential backoff
}
}
}
}
async fn fetch_data(url: &str, attempt: &mut u32) -> Result<String, AppError> {
*attempt += 1;
println!("Fetching {} (attempt {})", url, *attempt);
sleep(Duration::from_millis(50)).await;
if *attempt < 3 {
Err(AppError::Temporary("Network timeout".to_string()))
} else {
Ok(format!("Data from {}", url))
}
}
async fn retry_example() {
let mut attempt = 0;
let result = retry_with_backoff(
|| async {
unreliable_operation(attempt).await
},
5,
)
.await;
match result {
Ok(data) => println!("Success: {}", data),
Err(e) => println!("Failed: {:?}", e),
}
}
// Circuit breaker pattern
enum CircuitState {
Closed,
Open(Instant),
HalfOpen,
}
struct CircuitBreaker {
state: tokio::sync::Mutex<CircuitState>,
failure_threshold: u32,
failures: tokio::sync::Mutex<u32>,
timeout: Duration,
}
impl CircuitBreaker {
fn new(failure_threshold: u32, timeout: Duration) -> Self {
CircuitBreaker {
state: tokio::sync::Mutex::new(CircuitState::Closed),
failure_threshold,
failures: tokio::sync::Mutex::new(0),
timeout,
}
}
async fn call<F, Fut, T>(&self, operation: F) -> Result<T, AppError>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, AppError>>,
{
use tokio::time::Instant;
let mut state = self.state.lock().await;
match *state {
CircuitState::Open(opened_at) => {
if opened_at.elapsed() > self.timeout {
*state = CircuitState::HalfOpen;
println!("Circuit: Closed -> HalfOpen");
} else {
return Err(AppError::Temporary("Circuit is open".to_string()));
}
}
_ => {}
}
drop(state);
match operation().await {
Ok(result) => {
let mut failures = self.failures.lock().await;
*failures = 0;
let mut state = self.state.lock().await;
*state = CircuitState::Closed;
Ok(result)
}
Err(e) => {
let mut failures = self.failures.lock().await;
*failures += 1;
if *failures >= self.failure_threshold {
let mut state = self.state.lock().await;
*state = CircuitState::Open(Instant::now());
println!("Circuit opened due to failures");
}
Err(e)
}
}
}
}
#[tokio::main]
async fn main() {
use tokio::time::Instant;
println!("=== Retry Example ===");
retry_example().await;
println!("\n=== Circuit Breaker ===");
let breaker = CircuitBreaker::new(3, Duration::from_secs(2));
for i in 0..10 {
let result = breaker
.call(|| async {
if i < 4 {
Err(AppError::Temporary("Simulated failure".to_string()))
} else {
Ok(format!("Success {}", i))
}
})
.await;
println!("Request {}: {:?}", i, result);
sleep(Duration::from_millis(300)).await;
}
}
Learning Points:
- Retry logic with exponential backoff
- Distinguishing temporary vs permanent errors
- Circuit breaker pattern
- Graceful degradation
Async Best Practices
1. Don't Block the Runtime
// Bad: Blocking operation
std::thread::sleep(Duration::from_secs(1));
// Good: Async sleep
tokio::time::sleep(Duration::from_secs(1)).await;
2. Use Structured Concurrency
// Good: Use join or select
tokio::join!(task1(), task2(), task3());
// Or spawn with tracking
let handles = vec![
tokio::spawn(task1()),
tokio::spawn(task2()),
];
3. Avoid Holding Locks Across Await
// Bad
let guard = mutex.lock().await;
some_async_operation().await; // Still holding lock!
// Good
{
let guard = mutex.lock().await;
// Use guard
} // Lock released
some_async_operation().await;
4. Choose the Right Runtime
// For most applications
#[tokio::main]
async fn main() { }
// For libraries, let users choose
pub async fn my_function() { }
Common Patterns
Request-Response
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let result = process().await;
tx.send(result).unwrap();
});
let response = rx.await?;
Fan-out Fan-in
let futures: Vec<_> = items.iter()
.map(|item| process(item))
.collect();
let results = futures::future::join_all(futures).await;
Timeout Pattern
match tokio::time::timeout(Duration::from_secs(5), operation()).await {
Ok(result) => // operation completed
Err(_) => // timeout
}
Performance Tips
- Batch operations when possible
- Use buffered streams for parallel processing
- Avoid excessive task spawning - use streams or channels
- Profile async code with tokio-console
- Set appropriate channel buffer sizes
Next Steps
- Study the Tokio documentation
- Learn about async traits (RPITIT)
- Explore
async-stdas alternative runtime - Read about async Drop and cancellation safety
- Build real async applications (web servers, clients)
- Learn async stream processing with tokio-stream
Recommended Crates
tokio- The async runtimeasync-trait- Async methods in traitsfutures- Async utilitiestokio-util- Additional Tokio utilitiestower- Async service compositionhyper- Async HTTP