Acheter version papier

Rust

Fork me on GitHub

III. Aller plus loin

7. Le multi-threading

Commençons par un exemple tout simple :

Runuse std::thread;

fn main() {
    // On lance le thread.
    let handle = thread::spawn(|| {
        "Salutations depuis un thread !"
    });

    // On attend que le thread termine son travail avant de quitter.
    handle.join().unwrap();
}

La fonction thread::spawn exécute le code de la closure dans un nouveau thread. On appelle ensuite la méthode JoinHandle::join pour attendre la fin de l'exécution du thread.

Jusque-là, on reste dans le classique. Que peut bien apporter Rust ici ? Hé bien essayons maintenant de partager des variables entre les threads :

Runuse std::thread;

let mut data = vec![1u32, 2, 3];
// On va stocker les handlers des threads dans ce `Vec` pour pouvoir
// attendre la fin de leur exécution.
let mut handles = Vec::new();

for i in 0..3 {
    // On lance le thread.
    handles.push(thread::spawn(move || {
        data[i] += 1;
    }));
}

// On attend que les threads aient fini.
for handle in handles {
    handle.join().expect("`join` a échoué");
}

Vous devriez obtenir une magnifique erreur :

error: capture of moved value: `data`
        data[i] += 1;

Le système de propriété rentre ici aussi en jeu. Nous avons trois références mutables sur un même objet et Rust ne le permet pas, c'est aussi simple que cela. Pour contourner ce problème, plusieurs solutions s'offrent à nous :

Mutex

Le type Mutex permet d'utiliser une même donnée depuis plusieurs endroits. Une solution naïve serait de les utiliser de cette façon :

Runuse std::thread;
use std::sync::Mutex;

fn main() {
    // On crée notre mutex.
    let mut data = Mutex::new(vec![1u32, 2, 3]);
    let mut handles = Vec::new();

    for i in 0..3 {
        // On locke.
        let data = data.lock().unwrap();
        // On lance le thread.
        handles.push(thread::spawn(move || {
            data[i] += 1;
        }));
    }

    for handle in handles {
        handle.join().expect("`join` a échoué");
    }
}

Cependant nous tombons sur un autre problème :

<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~

Le trait Sync n'est pas implémenté sur le type MutexGuard retourné par la méthode Mutex::lock. Impossible de partager l'accès aux données de manière sûre ! C'est ici que rentre en jeu le type Arc !

Arc

Le type Arc est le même type que Rc, mais thread-safe car il implémente le trait Sync. Corrigeons le code précédent :

Runuse std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // On crée notre mutex,
    let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));
    let mut handles = Vec::new();

    for i in 0..3 {
        // On incrémente le compteur interne de Arc.
        let data = data.clone();
        handles.push(thread::spawn(move || {
            // On locke.
            let mut ret = data.lock();

            // on vérifie qu'il n'y a pas de problème
            match ret {
                Ok(ref mut d) => {
                    // Tout est bon, on peut modifier la donnée en toute sécurité !
                    d[i] += 1;
                }
                Err(e) => {
                    // Une erreur s'est produite.
                    println!("Impossible d'accéder aux données {:?}", e);
                }
            }
        }));
    }

    for handle in handles {
       handle.join().expect("`join` a échoué");
    }
}

Nous avons vu comment partager des données entre threads. Il existe cependant une autre façon de faire ça : les channels.

Les channels

On peut se servir des channels pour envoyer des données entre threads. Dans le cas présent, on va s'en servir pour notifier le thread principal qu'un des threads a fini son exécution. On crée un channel via la fonction mpsc::channel :

Runuse std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0u32));

    // On crée le channel.
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            // On envoie le signal de fin du thread.
            tx.send(()).expect("échec de l'envoi des données");
        });
    }

    for _ in 0..10 {
        // On attend le signal de fin du thread.
        rx.recv().expect("échec de réception des données");
    }
    // On va maintenant récupérer la donnée contenue dans `Arc<Mutex<>>`.
    let mutex = Arc::into_inner(data).expect("echec de récupération du contenu de Arc");
    let data = mutex.into_inner().expect("echec de récupération du contenu du mutex");
    println!("data: {}", data);
}

Dans ce code, on crée 10 threads qui vont chacun envoyer une donnée dans le channel avant de se terminer. Il nous suffit donc d'attendre d'avoir reçu 10 fois quelque chose pour savoir que tous les threads se sont terminés.

Dans cet exemple, on ne s'en sert que comme d'un signal en envoyant un tuple vide. Il est cependant possible d'envoyer plus que ça, du moment que le type envoyé implémente le trait Send :

Runuse std::thread;
use std::sync::mpsc;

fn main() {
    // On crée le channel.
    let (tx, rx) = mpsc::channel();

    for index in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = format!("index : {}", index);

            // On envoie la donnée dans le channel.
            tx.send(answer).expect("échec de l'envoi des données");
        });
    }

    for _ in 0..10 {
        match rx.recv() {
            Ok(data) => println!("Le channel vient de recevoir : {:?}", data),
            Err(e) => println!("Une erreur s'est produite : {:?}", e),
        }
    }
}

Ici, nous avons généré des String dans nos threads que nous avons ensuite réceptionné et utilisé dans le thread principal.

Les channels sont particulièrement pratiques quand on veut lancer un (ou plusieurs) thread en arrière-plan pour faire des calculs lourds pour éviter de bloquer le thread principal. Par-exemple, si vous faites une interface graphique pour lire des emails, pendant que les requêtes pour récuperer les données sont en cours, vous ne voulez pas bloquer l'interface graphique, donc vous allez faire ça dans un thread et utiliser un channel pour récupérer la donnée quand elle sera arrivée.

Dernier point : il est important de noter que seule la méthode send est non-bloquante. Si vous souhaitez ne pas attendre que des données soient disponibles, il vous faudra utiliser la méthode try_recv.

Utilisation détournée

Il est possible d'utiliser un thread pour isoler du code de cette façon :

Runuse std::thread;

match thread::spawn(move || {
    panic!("oops!");
}).join() {
    Ok(_) => println!("Tout s'est bien déroulé"),
    Err(e) => println!("Le thread a planté ! Erreur : {:?}", e),
};

Cela permet d'exécuter du code qui pourrait paniquer tout en empêchant le programme de s'arrêter si c'est le cas. Cela peut se révéler pratique dans de rares cas.

Les atomiques

Les atomiques sont des types primitifs qu'on peut utiliser pour communiquer entre des threads. À l'exception de AtomicBool, ce sont tous des entiers. Chaque opération sur un atomique doit préciser de quel façon on veut que la mémoire soit synchronisée. Prenons un exemple où l'on va augmenter la valeur d'un entier entre plusieurs threads :

Runuse std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let atomic = Arc::new(AtomicU32::new(0));
    let mut handles = Vec::new();

    for index in 1..10 {
        let atomic = Arc::clone(&atomic);
        handles.push(thread::spawn(move || {
            for _ in 0..index {
                atomic.fetch_add(1, Ordering::Relaxed);
            }
        }));
    }

    for handle in handles {
        handle.join().expect("`join` a échoué");
    }
    println!("On a fait {} itérations", atomic.load(Ordering::Relaxed));
}

Dans cet exemple, peu importe l'ordre dans lequel les opérations sur l'atomique sont exécutés donc on a utilisé Ordering::Relaxed, mais dans certains cas, il peut être utile de choisir des restrictions différentes. Si tel est votre cas, je vous recommande de jeter un oeil à la documentation de Ordering et surtout lire le chapitre sur les atomiques dans le rustnomicon que vous trouverez ici.

Empoisonnement de Mutex

Vous savez maintenant comment partager les données de manière sûre entre des threads. Il reste cependant un petit détail à connaître concernant les mutex : si jamais un thread panic alors qu'il a le lock, le Mutex sera "empoisonné".

Runuse std::sync::{Arc, Mutex};
use std::thread;

let lock = Arc::new(Mutex::new(0_u32));
let lock2 = lock.clone();

let _ = thread::spawn(move || -> () {
    // On locke.
    let _lock = lock2.lock().unwrap();

    // On lance un panic! alors que le mutex est toujours locké.
    panic!();
}).join();

Et maintenant vous vous retrouvez dans l'incapacité de lock de nouveau le Mutex dans les autres threads. Il est toutefois possible de "désempoisonner" le mutex :

Runlet mut guard = match lock.lock() {
    Ok(guard) => guard,
    // On récupère les données malgré le fait que le mutex soit lock.
    Err(poisoned) => poisoned.into_inner(),
};

*guard += 1;

Autres façons d'utiliser les threads

Il existe un plusieurs crates dans l'écosystème de Rust qui permettent d'utiliser les threads de manière bien plus simple. Je vous recommande au moins d'y jeter un coup d'oeil :