A Rusty Tale of Shared Memories and Passed Messages Part 1: Channels

Justin Wernick <>
Curly Tail, Curly Braces
2017-06-19

Abstract

Writing multithreaded code can be challenging. Luckily, the Rust programming language aims to make it easier by baking information about multithreading directly into the language's type system. This post explores the first way that you can have multiple threads communicate with each other in Rust: message passing using channels.

Multithreading

Have you ever written multithreaded code? In other words, code that has multiple paths of execution, which are running at the same time. This is generally thought of as a hard problem, not because it's actually difficult to tell your program to spawn a second thread, but because it becomes exponentially harder to reason about your program once you have two things happening at once.

Unfortunately, multithreading is one of those things that is absolutely necessary for making effective use of computing resources. Imagine if you bought a shiny 4 core processor, but games only used one of those cores! Or maybe you're paying for a high end web server, but your website is only able to make use of a fraction of the computational power available.

It turns out that having multiple threads is relatively painless most of the time, but becomes a massive pain as soon as the threads need to share resources or communicate with each other. Web servers can put each request on a different thread easily because two simultaneous requests to a web server do not need to interact with each other. The programmer then only needs to write 'single threaded' programs that handle those requests. Let the database handle any race conditions between two requests!

Rust, as a systems programming language, has multithreading as part of its standard library. It's aiming to be the type of language you might use to write that web framework, or that database server. While you still need to think carefully about how you're handling these threads, Rust's type system is carefully engineered to make it less scary than in other languages.

It's just two programs running at once. What could go wrong?

Suppose you're writing a program that calculates a long list of numbers and aggregates the results. Say you're marking test papers for a massive online course, and you want to know the average mark. As part of it, you write the following code:

struct Total {
    total: f32,
    count: u32
}
impl Total {
    fn average(&self) -> f32 {
        self.total / self.count as f32
    }
    fn add_next(&mut self, next: f32) {
        let new_total = self.total + next;
        let new_count = self.count + 1;
      
        self.total = new_total;
        self.count = new_count;
    }
}

Further into the project, you're looking for ways to improve the performance. Marking the tests is computationally expensive, so you add multithreading. Test marks don't depend on each other, so you can mark two tests in parallel without any problems and get them marked twice as fast. Now, while aggregating your results, you might have two threads calling add_next for the same total at the same time. What would happen?

Well, adding a number is fast. It's only four instructions. If marking the next test is a heavy operation, the chance of multiple threads hitting add_next at exactly the same time are small, and the program will appear to be running correctly. Except for those times that, by random chance, it isn't.

This is easiest to show with an example of things going wrong. Let's say we have threads A and B, which both have access to the same total object. To start with, the total has two numbers that have already been added to it.

//Total in both threads
{
    total: 10,
    count: 2
}

A new number, 5, is being added by thread A. It completes the first two instructions in add_next, but hasn't yet done the last two.

//Total in both threads
{
    total: 10,
    count: 2
}
//Thread A specific:
{
    new_total: 15,
    new_count: 3
}

At the same time, thread B is adding 7. Again, let's suppose it's completed the first two instructions in add_next, but hasn't yet done the last two.

//Total in both threads
{
    total: 10,
    count: 2
}
//Thread B specific:
{
    new_total: 17,
    new_count: 3
}

After both threads complete, the total will be either 15 or 17, the count will be 3, and both threads will think that they've processed their number. One thread's total will just be overwritten by the other thread.

The program produces incorrect results, but only some of the time so you may never encounter this situation in testing.

Rust to the rescue!

As per usual with Rust, the solution that they've settled on for this problem is to have the compiler refuse to proceed if it thinks you're on the wrong path.

This is done using Rust's trait system and ownership model to audit how you share resources between threads. The race condition example above would not have been possible using Rust's standard threading library, because it would not have let you have two mutable references to the same total object. If you needed both threads to have mutable access, they would be forced to wrap the object in something that is thread safe and allows runtime mutability checks, like Mutex or RwLock.

For example, this is a simple multithreaded program that prints hello world a bunch of times.

use std::thread;
use std::time::Duration;

let child_thread = thread::spawn(move || {
    for _ in 0..5 {
        println!("Hello world from a different thread");
        thread::sleep(Duration::from_millis(500));
    }
});

for _ in 0..5 {
    println!("Hello world from the parent thread");
    thread::sleep(Duration::from_secs(1));
}

// join makes the parent thread wait for the child. Otherwise, the
// program may exit before the other thread is done.
child_thread.join();

At that point where I created a new thread with a closure, I used the move keyword. This allows the new closure to take ownership of any variables that it references (there are none in this example). If those variables don't implement the correct Rust traits to prove that they are safe to send to the new thread, it will not compile.

If I tried to add a call count using Rust's reference counted pointer type, which does not do the checks required for safe multithreading, it would refuse to compile because I can't pass it to the new thread.

use std::thread;
use std::time::Duration;
use std::rc::Rc;
use std::cell::RefCell;

// Rc is a reference counted pointer
//
// RefCell allows the restriction on only one mutable reference at a
// time to be done at runtime, rather than compile time.
//
// Neither do the checks necessary to support multithreading
let count = Rc::new(RefCell::new(0));

let child_thread = thread::spawn(move || {
    for _ in 0..5 {
        println!("Hello world from a different thread");
        *count.borrow_mut() += 1;
        thread::sleep(Duration::from_millis(500));
    }
});

for _ in 0..5 {
    println!("Hello world from the parent thread");
    *count.borrow_mut() += 1;
    thread::sleep(Duration::from_secs(1));
}

child_thread.join();
println!("Total hello world count: {}", *count.borrow());
error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<i32>>: std::marker::Send` is not satisfied in `[closure@/tmp/babel-1243kBA/rust-1243w1k:15:34: 21:2 count:std::rc::Rc<std::cell::RefCell<i32>>]`
  --> /tmp/babel-1243kBA/rust-1243w1k:15:20
   |
15 | let child_thread = thread::spawn(move || {
   |                    ^^^^^^^^^^^^^ within `[closure@/tmp/babel-1243kBA/rust-1243w1k:15:34: 21:2 count:std::rc::Rc<std::cell::RefCell<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::cell::RefCell<i32>>`
   |
   = note: `std::rc::Rc<std::cell::RefCell<i32>>` cannot be sent between threads safely
   = note: required because it appears within the type `[closure@/tmp/babel-1243kBA/rust-1243w1k:15:34: 21:2 count:std::rc::Rc<std::cell::RefCell<i32>>]`
   = note: required by `std::thread::spawn`

error: aborting due to previous error

The error message is nasty, but not nearly as nasty as trying to debug the strange errors that this code could cause.

I'll go into more detail on sharing memory between threads in my next post, but for now I want to demonstrate a different technique for multithreaded programming: message passing.

Message passing

Why is multithreading hard, but calling a public API isn't? Both technically have two threads running at the same time. I would theorize that it's because the public API's execution is logically separated from your program's execution. The two only communicate through a well defined interface, avoiding any form of synchronization by sending messages. When your program calls that API, your request is put into a queue on the receiver so that it can handle it when it's ready for it.

We call this approach "message passing". The one thread creates a message and passes it to the mailbox of another thread. When the other thread is ready, it can check its mailbox.

If you think back to our example above, we had problems with the two threads writing data back to a running total at the same time. Our multithreading problem could be solved if, instead of threads A and B both having a reference to the totals, each only had a way of passing the next result that they'd calculated as a message to a third thread that does the totalling.

In Rust's standard library, you can do this using a Channel. Using a channel, we might do our hello world program like so:

use std::thread;
use std::time::Duration;
use std::sync::mpsc::channel;
// MPSC stands for 'multiple producers, single consumer'. You can
// clone the sender side, but there may only be one receiver side.

// When you call 'channel' it returns a tuple, so you can handle the
// sender and receiver separately.
let (sender, receiver) = channel();

let sender_a = sender.clone();
let child_thread_a = thread::spawn(move || {
    for _ in 0.. {
        // because of the 'move' above, this thread now has ownership
        // of sender_a.
        sender_a.send("Hello world from Thread A");
        thread::sleep(Duration::from_millis(500));
    }
});

let sender_b = sender.clone();
let child_thread_b = thread::spawn(move || {
    for _ in 0.. {
        // sender_b is another copy of the sender. Both it and
        // sender_a are both sending messages to the same receiver.
        sender_b.send("Hello world from Thread B");
        thread::sleep(Duration::from_millis(500));
    }
});

for _ in 0..10 {
    // this will wait for a result from either thread
    let message = receiver.recv();
    match message {
        Ok(msg) => {
            println!("{}", msg);
        },
        Err(_) => {}
    }
}

// Those two threads are in an infinite loop. Don't call .join() on
// them, or you will be waiting forever.

Some important points here, we make a copy of the sender for each thread. The thread can then call send to pass a message to the receiver.

The receiver, wherever it is and when it is ready, can then call recv to wait for the next message from the sender. If you want to check if something has been sent but not wait, you can use try_recv instead.

Show me a Bigger Example!

One classic example of where multithreading is important is in programs with a user interface. You want the interface to remain responsive to user input, even if it has some serious number crunching to do in the background.

For example, if you have a game, you don't want to stop processing frames if you need to do a heavy AI calculation. Nobody will care how smart your AI is if the entire world stops every time they need to make a move. To emphasize this point, I've put together an example using Piston, a game engine written in Rust. The full example is available at the bottom of this post.

In this example, every 300th frame in the game (about every 5 seconds) we want to update the colours being used. Unfortunately, because our colour calculation algorithm is absolutely terrible, calculating our new colours takes a few seconds. This is what the update function in our game loop looks like (there's a bit more in the full code listing to make the spinning square respond to the arrow keys).

fn update(&mut self, args: &UpdateArgs) {
    self.frame_count += 1;
    if self.frame_count % 300 == 0 {
        let (color, background) = do_expensive_work(self.color);
        self.color = color;
        self.background = background;
    }
}

fn do_expensive_work(current_color: [f32; 4]) -> ([f32; 4], [f32; 4]) {
    //rather than doing work immediately, let's take a nap
    thread::sleep(time::Duration::from_secs(2));
    let b = current_color[2] + 1.0;
    let g = current_color[1] + if b > 1.0 { 1.0 } else { 0.0 };
    let r = current_color[0] + if g > 1.0 { 1.0 } else { 0.0 };
    ([r%2.0, g%2.0, b%2.0, 1.0], [(r+1.0)%2.0, (g+1.0)%2.0, (b+1.0)%2.0, 1.0])
}

This is the end result. As you can see, the world stops every time we want to calculate the next colour.

The version that uses a worker thread doesn't change much. Instead of calling do_expensive_work and waiting for a result, it passes in the sender for a channel. Inside do_expensive_work, it then spawns a new thread for the calculations. When it's done, it sends its response on the channel.

In our update function, we need to check the receiver occasionally (being careful not to wait for it if there's nothing yet), and use the result when it comes back.

fn update(&mut self, args: &UpdateArgs) {
    self.frame_count += 1;
    if self.frame_count % 300 == 0 {
        do_expensive_work(self.color, self.color_sender.clone());
    }

    // if let is a nifty way of doing a pattern match where you only
    // care about the one branch. In this case, it will go into the if
    // block only if try_recv returns an Ok.
    if let Ok((color, background)) = self.color_receiver.try_recv() {
        self.color = color;
        self.background = background;
    }
}

fn do_expensive_work(current_color: [f32; 4],
                     color_sender: mpsc::Sender<([f32; 4], [f32; 4])>) {
    thread::spawn(move || {
        //rather than doing work immediately, let's take a nap
        thread::sleep(time::Duration::from_secs(2));
        let b = current_color[2] + 1.0;
        let g = current_color[1] + if b > 1.0 { 1.0 } else { 0.0 };
        let r = current_color[0] + if g > 1.0 { 1.0 } else { 0.0 };
        color_sender.send(
            ([r%2.0, g%2.0, b%2.0, 1.0],
             [(r+1.0)%2.0, (g+1.0)%2.0, (b+1.0)%2.0, 1.0])
        );
    });
}

Even though this is a small code change, it's worlds apart from an end user's point of view.

Fearless Concurrency

Even though multithreading adds inherent complexity to a program, sometimes it is necessary. With Rust, the compiler is able to help you to avoid some of the nastier pitfalls. Message passing using Rust's channels are an excellent way to manage additional threads in your program.

In my next post, I will be discussing the other major approach to multithreaded programming: shared memory.

Full code listing of example

Credit is due to TyOverby and Nikita Pekin for writing a getting started tutorial for the Piston game engine. This demo borrows heavily from that tutorial. The original tutorial, along with more information about Piston, can be found here.

Cargo.toml

[package]
name = "piston-channel-demo"
version = "0.1.0"
authors = ["Justin Worthe"]

[dependencies]
piston = "0.32.0"
piston2d-graphics = "0.21.1"
pistoncore-glutin_window = "0.37.0"
piston2d-opengl_graphics = "0.43.0"

[[bin]]
name = "no-thread"
path = "src/main.rs"

[[bin]]
name = "channels"
path = "src/main-with-channels.rs"

src/main.rs (this is the one without multithreading)

extern crate piston;
extern crate graphics;
extern crate glutin_window;
extern crate opengl_graphics;

use piston::window::WindowSettings;
use piston::event_loop::*;
use piston::input::*;
use opengl_graphics::{ GlGraphics, OpenGL };
use glutin_window::GlutinWindow as Window;

use std::thread;
use std::time;

pub struct App {
    gl: GlGraphics, // OpenGL drawing backend.
    color: [f32; 4],
    background: [f32; 4],
    rotation: f64,   // Rotation for the square.
    position: (f64, f64),
    velocity: (f64, f64),
    frame_count: i32
}

impl App {
    fn move_up(&mut self) {
        self.velocity.1 = -1.0;
    }
    fn move_down(&mut self) {
        self.velocity.1 = 1.0;
    }
    fn move_left(&mut self) {
        self.velocity.0 = -1.0;
    }
    fn move_right(&mut self) {
        self.velocity.0 = 1.0;
    }
    fn stop_vertical(&mut self) {
        self.velocity.1 = 0.0;
    }
    fn stop_horizontal(&mut self) {
        self.velocity.0 = 0.0;
    }
  
    fn render(&mut self, args: &RenderArgs) {
        use graphics::*;
      
        let square = rectangle::square(0.0, 0.0, 50.0);
        let rotation = self.rotation;
        let (x, y) = self.position;

        let color = self.color;
        let background = self.background;
      
        self.gl.draw(args.viewport(), |c, gl| {
            // Clear the screen.
            clear(background, gl);

            let transform = c.transform.trans(x, y)
                .rot_rad(rotation)
                .trans(-25.0, -25.0);

            // Draw a box rotating around the middle of the screen.
            rectangle(color, square, transform, gl);
        });
    }

    fn update(&mut self, args: &UpdateArgs) {
        self.frame_count += 1;
        // Rotate 2 radians per second.
        self.rotation += 2.0 * args.dt;
        self.position.0 += self.velocity.0;
        self.position.1 += self.velocity.1;
      
        if self.frame_count % 300 == 0 {
            let (color, background) = do_expensive_work(self.color);
            self.color = color;
            self.background = background;
        }
    }
}

fn do_expensive_work(current_color: [f32; 4]) -> ([f32; 4], [f32; 4]) {
    //rather than doing work immediately, let's take a nap
    thread::sleep(time::Duration::from_secs(2));
    let b = current_color[2] + 1.0;
    let g = current_color[1] + if b > 1.0 { 1.0 } else { 0.0 };
    let r = current_color[0] + if g > 1.0 { 1.0 } else { 0.0 };
    ([r%2.0, g%2.0, b%2.0, 1.0], [(r+1.0)%2.0, (g+1.0)%2.0, (b+1.0)%2.0, 1.0])
}


fn main() {
    // Change this to OpenGL::V2_1 if not working.
    let opengl = OpenGL::V3_2;

    // Create an Glutin window.
    let mut window: Window = WindowSettings::new(
        "spinning-square",
        [400, 400]
    )
        .opengl(opengl)
        .exit_on_esc(true)
        .build()
        .unwrap();

    // Create a new game and run it.
    let mut app = App {
        gl: GlGraphics::new(opengl),
        color: [0.0, 0.0, 0.0, 1.0],
        background: [1.0, 1.0, 1.0, 1.0],
        rotation: 0.0,
        position: (200.0, 200.0),
        velocity: (0.0, 0.0),
        frame_count: 0
    };

    let mut events = Events::new(EventSettings::new());
    while let Some(e) = events.next(&mut window) {
        if let Some(r) = e.render_args() {
            app.render(&r);
        }

        if let Some(u) = e.update_args() {
            app.update(&u);
        }

        if let Some(Button::Keyboard(key)) = e.press_args() {
            match key {
                Key::Up => app.move_up(),
                Key::Down => app.move_down(),
                Key::Left => app.move_left(),
                Key::Right => app.move_right(),
                _ => {}
            }
        }

        if let Some(Button::Keyboard(key)) = e.release_args() {
            match key {
                Key::Up | Key::Down => app.stop_vertical(),
                Key::Left | Key::Right => app.stop_horizontal(),
                _ => {}
            }
        }
    }
}

src/main-with-channels.rs (same thing, with channels and threads)

extern crate piston;
extern crate graphics;
extern crate glutin_window;
extern crate opengl_graphics;

use piston::window::WindowSettings;
use piston::event_loop::*;
use piston::input::*;
use opengl_graphics::{ GlGraphics, OpenGL };
use glutin_window::GlutinWindow as Window;

use std::thread;
use std::time;
use std::sync::mpsc;

pub struct App {
    gl: GlGraphics, // OpenGL drawing backend.
    color: [f32; 4],
    background: [f32; 4],
    rotation: f64,   // Rotation for the square.
    position: (f64, f64),
    velocity: (f64, f64),
    frame_count: i32,

    color_sender: mpsc::Sender<([f32; 4], [f32; 4])>,
    color_receiver: mpsc::Receiver<([f32; 4], [f32; 4])>
}

impl App {
    fn move_up(&mut self) {
        self.velocity.1 = -1.0;
    }
    fn move_down(&mut self) {
        self.velocity.1 = 1.0;
    }
    fn move_left(&mut self) {
        self.velocity.0 = -1.0;
    }
    fn move_right(&mut self) {
        self.velocity.0 = 1.0;
    }
    fn stop_vertical(&mut self) {
        self.velocity.1 = 0.0;
    }
    fn stop_horizontal(&mut self) {
        self.velocity.0 = 0.0;
    }

    fn render(&mut self, args: &RenderArgs) {
        use graphics::*;

        let square = rectangle::square(0.0, 0.0, 50.0);
        let rotation = self.rotation;
        let (x, y) = self.position;

        let color = self.color;
        let background = self.background;

        self.gl.draw(args.viewport(), |c, gl| {
            // Clear the screen.
            clear(background, gl);

            let transform = c.transform.trans(x, y)
                .rot_rad(rotation)
                .trans(-25.0, -25.0);

            // Draw a box rotating around the middle of the screen.
            rectangle(color, square, transform, gl);
        });
    }

    fn update(&mut self, args: &UpdateArgs) {
        self.frame_count += 1;
        // Rotate 2 radians per second.
        self.rotation += 2.0 * args.dt;
        self.position.0 += self.velocity.0;
        self.position.1 += self.velocity.1;

        if self.frame_count % 300 == 0 {
            do_expensive_work(self.color, self.color_sender.clone());
        }

        if let Ok((color, background)) = self.color_receiver.try_recv() {
            self.color = color;
            self.background = background;
        }
    }

}

fn do_expensive_work(current_color: [f32; 4],
                     color_sender: mpsc::Sender<([f32; 4], [f32; 4])>) {
    // Another approach, if you don't want to keep spinning up a new
    // thread for 2 seconds of work, would be to have a longrunning
    // thread and a channel to pass work into it.

    thread::spawn(move || {
        //rather than doing work immediately, let's take a nap
        thread::sleep(time::Duration::from_secs(2));
        let b = current_color[2] + 1.0;
        let g = current_color[1] + if b > 1.0 { 1.0 } else { 0.0 };
        let r = current_color[0] + if g > 1.0 { 1.0 } else { 0.0 };
        let send_result = color_sender.send(
            ([r%2.0, g%2.0, b%2.0, 1.0],
             [(r+1.0)%2.0, (g+1.0)%2.0, (b+1.0)%2.0, 1.0])
        );

        if let Err(send_err) = send_result {
            println!("Error on sending colors back to main thread: {}", send_err);
        }
    });
}


fn main() {
    // Change this to OpenGL::V2_1 if not working.
    let opengl = OpenGL::V3_2;

    // Create an Glutin window.
    let mut window: Window = WindowSettings::new(
        "spinning-square",
        [400, 400]
    )
        .opengl(opengl)
        .exit_on_esc(true)
        .build()
        .unwrap();

    // Create a new game and run it.
    let (color_sender, color_receiver) = mpsc::channel();
    let mut app = App {
        gl: GlGraphics::new(opengl),
        color: [0.0, 0.0, 0.0, 1.0],
        background: [1.0, 1.0, 1.0, 1.0],
        rotation: 0.0,
        position: (200.0, 200.0),
        velocity: (0.0, 0.0),
        frame_count: 0,
        color_sender: color_sender,
        color_receiver: color_receiver
    };

    let mut events = Events::new(EventSettings::new());
    while let Some(e) = events.next(&mut window) {
        if let Some(r) = e.render_args() {
            app.render(&r);
        }

        if let Some(u) = e.update_args() {
            app.update(&u);
        }

        if let Some(Button::Keyboard(key)) = e.press_args() {
            match key {
                Key::Up => app.move_up(),
                Key::Down => app.move_down(),
                Key::Left => app.move_left(),
                Key::Right => app.move_right(),
                _ => {}
            }
        }

        if let Some(Button::Keyboard(key)) = e.release_args() {
            match key {
                Key::Up | Key::Down => app.stop_vertical(),
                Key::Left | Key::Right => app.stop_horizontal(),
                _ => {}
            }
        }
    }
}

If you'd like to share this article on social media, please use this link: https://www.worthe-it.co.za/blog/2017-06-19-a-rusty-tale-of-shared-memories-and-passed-messages-1.html

Copy link to clipboard

Tags: blog, rust


Support

If you get value from these blog articles, consider supporting me on Patreon. Support via Patreon helps to cover hosting, buying computer stuff, and will allow me to spend more time writing articles and open source software.


Related Articles

A Rusty Tale of Shared Memories and Passed Messages Part 2: Pointers to Shared Memory

Writing multithreaded code can be challenging. Luckily, the Rust programming language aims to make it easier by baking information about multithreading directly into the language's type system. This post explores the second way that you can have multiple threads communicate with each other in Rust: pointers to shared memory, with concurrent access protected with a lock.

Game Programming Inspires My Software Development

I've been fascinated by computer games for as long as I can remember. In fact, almost all of my early computer knowledge came about because I wanted to play games. A lot of time has passed since then and I now work as a software engineer, and most of the systems I work on don't look much like games. Throughout my career, I've maintained an interest in what game engineers are doing. This article presents a few concepts from the context of computer games that help me to solve the problems that I do face.

Subscribe to my RSS feed.