Thanks!
X
Paypal
Github sponsorship
Patreon
Acheter version papier

Rust

Fork me on GitHub

III. Aller plus loin

6. Les threads

Commençons par un exemple tout simple :

Runuse std::thread;

fn main() {
    // On lance le thread.
    let handle = thread::spawn(|| {
        "Salutations depuis un thread !"
    });

    // On attend que le thread termine son travail avant de quitter.
    handle.join().unwrap();
}

La fonction thread::spawn exécute le code de la closure dans un nouveau thread. On appelle ensuite la méthode JoinHandle::join pour attendre la fin de l'exécution du thread.

Jusque-là, on reste dans le classique. Que peut bien apporter Rust ici ? Hé bien essayons maintenant de partager des variables entre les threads :

Runlet mut data = vec![1u32, 2, 3];

for i in 0..3 {
    // On lance le thread.
    thread::spawn(move || {
        data[i] += 1;
    });
}

// On attend 50 millisecondes, le temps que les threads finissent leur travail.
thread::sleep_ms(50);

Vous devriez obtenir une magnifique erreur :

error: capture of moved value: `data`
        data[i] += 1;

Le système de propriété rentre ici aussi en jeu. Nous avons trois références mutables sur un même objet et Rust ne le permet pas, c'est aussi simple que cela. Pour contourner ce problème, plusieurs solutions s'offrent à nous :

Mutex

Le type Mutex permet d'utiliser une même donnée depuis plusieurs endroits. Une solution naïve serait de les utiliser de cette façon :

Runuse std::thread;
use std::sync::Mutex;

fn main() {
    // On crée notre mutex.
    let mut data = Mutex::new(vec![1u32, 2, 3]);

    for i in 0..3 {
        // On locke.
        let data = data.lock().unwrap();
        // On lance le thread.
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    // On attend 50 millisecondes, le temps que les threads finissent leur
    // travail.
    thread::sleep_ms(50);
}

Cependant nous tombons sur un autre problème :

<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~

Le trait Sync n'est pas implémenté sur le type MutexGuard retourné par la méthode Mutex::lock. Impossible de partager l'accès aux données de manière sûre ! C'est ici que rentre en jeu le type Arc !

Arc

Le type Arc est le même type que Rc, mais thread-safe car il implémente le trait Sync. Corrigeons le code précédent :

Runuse std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // On crée notre mutex,
    let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));

    for i in 0..3 {
        // On incrémente le compteur interne de Arc.
        let data = data.clone();
        thread::spawn(move || {
            // On locke.
            let mut ret = data.lock();

            // on vérifie qu'il n'y a pas de problème
            match ret {
                Ok(ref mut d) => {
                    // Tout est bon, on peut modifier la donnée en toute sécurité !
                    d[i] += 1;
                },
                Err(e) => {
                    // Une erreur s'est produite.
                    println!("Impossible d'accéder aux données {:?}", e);
                }
            }
        });
    }

    // On attend 50 millisecondes, le temps que les threads finissent leur travail.
    thread::sleep_ms(50);
}

Nous avons vu comment partager des données entre threads mais il nous reste cette ligne dont on voudrait bien se débarrasser :

Runthread::sleep_ms(50);

les channels sont la solution à notre problème !

Les channels

Nous aimerions donc bien pouvoir continuer l'exécution de notre programme mais uniquement après que les threads aient terminé. On crée un channel via la fonction mpsc::channel. Exemple :

Runuse std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0u32));

    // On crée le channel.
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            // On envoie le signal de fin du thread.
            tx.send(());
        });
    }

    for _ in 0..10 {
        // On attend le signal de fin du thread.
        rx.recv();
    }
}

Dans ce code, on crée 10 threads qui vont chacun envoyer une donnée dans le channel avant de se terminer. Il nous suffit donc d'attendre d'avoir reçu 10 fois quelque chose pour savoir que tous les threads se sont terminés.

Dans le code que je viens de vous montrer, on ne s'en sert que comme d'un signal en envoyant des données vides. Il est cependant possible d'envoyer des données, du moment qu'elles implémentent le trait Send :

Runuse std::thread;
use std::sync::mpsc;

fn main() {
    // On crée le channel.
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = 42u32;

            // On envoie la donnée dans le channel.
            tx.send(answer);
        });
    }

    match rx.recv() {
        Ok(data) => println!("Le channel vient de recevoir : {}", data),
        Err(e) => println!("Une erreur s'est produite : {:?}", e)
    };
}

Et voilà ! Il est important de noter que seule la méthode send est non-bloquante. Si vous souhaitez ne pas attendre que des données soient disponibles, il vous faudra utiliser la méthode try_recv.

Utilisation détournée

Il est possible d'utiliser un thread pour isoler du code de cette façon :

Runuse std::thread;

match thread::spawn(move || {
    panic!("oops!");
}).join() {
    Ok(_) => println!("Tout s'est bien déroulé"),
    Err(e) => println!("Le thread a planté ! Erreur : {:?}", e)
};

Magique !

Empoisonnement de Mutex

Vous savez maintenant comment partager les données de manière sûre entre des threads. Il reste cependant un petit détail à connaître concernant les mutex : si jamais un thread panic! alors qu'il a le lock, le Mutex sera "empoisonné".

Runuse std::sync::{Arc, Mutex};
use std::thread;

let lock = Arc::new(Mutex::new(0_u32));
let lock2 = lock.clone();

let _ = thread::spawn(move || -> () {
    // On locke.
    let _lock = lock2.lock().unwrap();

    // On lance un panic! alors que le mutex est toujours locké.
    panic!();
}).join();

Et maintenant vous vous retrouvez dans l'incapacité de lock de nouveau le Mutex dans les autres threads. Il est toutefois possible de "désempoisonner" le mutex :

Runlet mut guard = match lock.lock() {
    Ok(guard) => guard,
    // On récupère les données malgré le fait que le mutex soit lock.
    Err(poisoned) => poisoned.into_inner(),
};

*guard += 1;

Autres façons d'utiliser les threads

Il existe un plusieurs crates dans l'écosystème de Rust qui permettent d'utiliser les threads de manière bien plus simple. Je vous recommande au moins d'y jeter un coup d'oeil :