The Burger Problem
tl;dr racing with cancellation and timeout
Malaysians have an interesting queuing mechanism, instead of having one line for counters, there is one line for each counter, each counter have different speeds so people try to queue on multiple counter at once and see who reach first. Let us see the illustration:
counter 1 | . . . . a
counter 2 | . . . . b
counter 3 | . . . . . c
counter 1 | . . . . a
counter 2 | . . . b
counter 3 | . . . . c
counter 1 | . . . . a
counter 2 | . . . b
counter 3 | . . c
counter 1 | . . . a
counter 2 | . b
counter 3 | . . c
counter 1 | . . .
counter 2 | b
counter 3 | . .
A group of 3 (a, b, c) starts queuing on 3 counter (1, 2, 3) to buy burger (vegan I wished), looking at the illustration, they start queuing at the same time but counter 2 is faster so b reached the counter first, the rest of the group then stop queuing.
Of course, the scenario shown above is the happy scenario. Here are the others:
-
One of the three reaches the counter first, so the other two stops waiting at the line.
-
Two or more of them reaches the counter at nearly the same time. Only one of them buys the burgers for everyone. Other exits the line.
-
All of them timed out, since the wait is too long, and goes elsewhere. No burgers. T_T
The predetermined random wait time per “worker” is just a shorthand, since the correct algorithm is probably you have to check if your line moves at every “tick”. Or simulate the whole thing with some lines “faster” than the others.
It’s a classic race for resources. You have just 3 workers instead of yourself to hedge your bets to avoid the worse case scenarios.
I wish to write test but not sure how test can be written. Feel free to contribute. The code will be written step-by-step, feel free to skip to the end for the last source code.
I was thinking of using channels in this case but channel does not seem to
work. Looking at std::sync
documentation, there is
std::sync::CondVar
which seems complicated but may be helpful, it allows blocking a Mutex
until
some event occurred.
Let’s steal the example and do some modification.
use rand::Rng;
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let mut handles = Vec::new();
let pair = Arc::new((Mutex::new(()), Condvar::new()));
for i in 0..3 {
let pair = pair.clone();
handles.push(thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (_, ended) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
println!("Worker {} {:?} {:?}", i, dur, ended.timed_out());
cvar.notify_all();
}));
}
for handle in handles {
handle.join().unwrap();
}
}
For each thread, it creates a random duration to simulate the “wait” and use
CondVar
to notify all threads blocking on CondVar
, this example also
provides a wait timeout in case it takes too long. wait_timeout
on CondVar
waits for notification on a MutexGuard
(can be obtained from a Mutex
) and a
timeout, blocking the thread for a duration.
Take note, there is an issue here where the race is not handled, scenario 1 works here but scenario 2 is broken here such that both worker may reach the counter and both buys food (only one should buy).
Side note, writing this in a functional manner using iterators (Iter
).
use rand::Rng;
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let pair = Arc::new((Mutex::new(()), Condvar::new()));
(0..3)
.map(|i| {
let pair = pair.clone();
thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (_, ended) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
println!("Worker {} {:?} {:?}", i, dur, ended.timed_out());
cvar.notify_all();
})
})
.for_each(|handle| handle.join().unwrap());
}
Later we have also tried using bus
and
std::sync::mpsc
channels, there is this fan-in and fan-out thing but it does not solve scenario
2, the code looks weird so I stop making this method work.
use rand::Rng;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let mut handles = Vec::new();
let pair = Arc::new((Mutex::new(AtomicBool::new(true)), Condvar::new()));
for i in 1..=3 {
let pair = pair.clone();
handles.push(thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
cvar.notify_all();
if ended.swap(false, Ordering::Relaxed) {
println!("Worker {} {:?} done", i, dur);
} else {
println!("Worker {} {:?}", i, dur);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
}
Looking at the documentation, we could make using of
AtomicBool
since we only need one of the first that reaches the counter,
std::sync::atomic
is thread safe since it implements
Sync
so we can use it
to track atomic values between threads, only one shall prevail. This solve
scenario 2 but we still have scenario 3. Hooray!
We track AtomicBool::new(true)
in the Mutex
together with CondVar
, making
sure that only one thread is reading from it any time. With true
indicating
that no worker reached the counter yet, we use
swap
to write false
to the AtomicBool
, letting others know that someone reached
the counter just in case both reached at the same time.
swap
returns the previous value which we could use to check the last value,
with ended.swap(false, Ordering::Relaxed)
it allows us to make sure only the
first worker will reach that condition. Ordering::Relaxed
is the memory
ordering of the operation and is out of scope of this article.
Let us not stop there, improving the code we could have just notify_all
once
for the first worker instead of having multiple workers notify_all
.
use rand::Rng;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let mut handles = Vec::new();
let pair = Arc::new((Mutex::new(AtomicBool::new(true)), Condvar::new()));
for i in 1..=3 {
let pair = pair.clone();
handles.push(thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
if ended.swap(false, Ordering::Relaxed) {
cvar.notify_all();
println!("Worker {} {:?} done", i, dur);
} else {
println!("Worker {} {:?}", i, dur);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
}
How do we do scenario 3 where we wanted the workers to stop in case the counters are too slow (also true in real life, Malaysia counters may be slow/inefficient to the point that customers leave)? Oh, just add another thread to do the time keeping.
use rand::Rng;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let mut handles = Vec::new();
let pair = Arc::new((Mutex::new(AtomicBool::new(true)), Condvar::new()));
for i in 1..=3 {
let pair = pair.clone();
handles.push(thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
if ended.swap(false, Ordering::Relaxed) {
cvar.notify_all();
println!("Worker {} {:?} done", i, dur);
} else {
println!("Worker {} {:?}", i, dur);
}
}));
}
thread::spawn(move || {
let dur = Duration::from_secs(3);
let (ref lock, ref cvar) = *pair;
let (ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
if ended.swap(false, Ordering::Relaxed) {
cvar.notify_all();
println!("Timed out {:?}", dur);
}
});
for handle in handles {
handle.join().unwrap();
}
}
Here the new thread stops all the worker in case they take too long (3 seconds in this case, still rare enough), it also prints out a nice message for timeout in addition to stopping any worker from showing done.
After writing and looking at the documentation for explanation, I just realized
that AtomicBool
was duplicating the job of Mutex
so using plain bool
is
enough, we have to make the MutexGuard
mutable and modify the value in
Mutex
through Deref
(*
). Latest code:
use rand::Rng;
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
fn main() {
let mut handles = Vec::new();
let pair = Arc::new((Mutex::new(true), Condvar::new()));
for i in 1..=3 {
let pair = pair.clone();
handles.push(thread::spawn(move || {
let dur = Duration::from_secs(rand::thread_rng().gen_range(1, 6));
let (ref lock, ref cvar) = *pair;
let (mut ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
if *ended {
*ended = false;
cvar.notify_all();
println!("Worker {} {:?} done", i, dur);
} else {
println!("Worker {} {:?}", i, dur);
}
}));
}
thread::spawn(move || {
let dur = Duration::from_secs(3);
let (ref lock, ref cvar) = *pair;
let (mut ended, _) = cvar.wait_timeout(lock.lock().unwrap(), dur).unwrap();
if *ended {
*ended = false;
cvar.notify_all();
println!("Timed out {:?}", dur);
}
});
for handle in handles {
handle.join().unwrap();
}
}
All three scenarios have been solved up to this point, utilizing std::sync
,
Mutex
and atomics. I found it hard at first to understand CondVar
in this
case but it became easy after playing with it.
We thank Ang for showing the question, it was also implemented in Go and Elixir in our telegram group (lazy to put here).