Rust

Any donation is very welcome
Fork me on GitHub

III. Aller plus loin

5. Les threads

Commençons par un exemple tout bête :

Runuse std::thread;

fn main() {
    // on lance le thread
    let handle = thread::spawn(|| {
        "Salutations depuis un thread !"
    });

    // on attend que le thread termine son travail avant de quitter
    handle.join().unwrap();
}

La fonction thread::spawn exécute le code de la closure dans un nouveau thread. On appelle ensuite la méthode JoinHandle::join pour attendre la fin de l'exécution du thread.

Jusque là, on reste dans le classique. Que peut bien apporter Rust ici ? Hé bien essayons maintenant de partager des variables entre les threads :

Runlet mut data = vec![1u32, 2, 3];

for i in 0..3 {
    // on lance le thread
    thread::spawn(move || {
        data[i] += 1;
    });
}

// on attend 50 millisecondes, le temps que les threads finissent leur travail
thread::sleep_ms(50);

Vous devriez obtenir une magnifique erreur :

error: capture of moved value: `data`
        data[i] += 1;

Le système de propriété que vous haïssez sans doute rentre ici aussi en jeu. Nous avons trois références mutables sur un même objet et Rust ne le permet pas, c'est aussi simple que ça. Pour contourner ce problème, plusieurs solutions s'offrent à vous :

Mutex

Le type Mutex permet donc d'échanger des informations entre threads. Une solution naïve serait de les utiliser de cette façon :

Runuse std::thread;
use std::sync::Mutex;

fn main() {
    let mut data = Mutex::new(vec![1u32, 2, 3]); // on crée notre mutex

    for i in 0..3 {
        let data = data.lock().unwrap(); // on lock
        // on lance le thread
        thread::spawn(move || {
            data[i] += 1;
        });
    }

    // on attend 50 millisecondes, le temps que les threads finissent leur travail
    thread::sleep_ms(50);
}

Cependant nous tombons sur un autre problème :

<anon>:9:9: 9:22 error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` [E0277]
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~
<anon>:9:9: 9:22 note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec<u32>>` cannot be sent between threads safely
<anon>:11         thread::spawn(move || {
                  ^~~~~~~~~~~~~

Le trait Sync n'est pas implémenté sur le type MutexGuard retourné par la méthode Mutex::lock, impossible d'utiliser les données partagées de manière sûre ! C'est ici que rentre en jeu le type Arc !

Arc

Vous l'aurez deviné (ou peut-être pas), le type Arc est le même type que Rc mais thread-safe car il implémente le trait Sync. Corrigeons le code précédent :

Runuse std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    // on crée notre mutex
    let data = Arc::new(Mutex::new(vec![1u32, 2, 3]));

    for i in 0..3 {
        // on incrémente le compteur interne de Arc
        let data = data.clone();
        thread::spawn(move || {
            let mut ret = data.lock(); // on lock

            // on vérifie qu'il n'y a pas de problème
            match ret {
                Ok(ref mut d) => {
                    // tout est bon, on peut modifier la donnée en toute sécurité !
                    d[i] += 1;
                },
                Err(e) => {
                    // une erreur s'est produite
                    println!("Impossible d'accéder aux données {:?}", e);
                }
            }
        });
    }

    // on attend 50 millisecondes, le temps que les threads finissent leur travail
    thread::sleep_ms(50);
}

Nous avons vu comment partager des données entre threads mais il nous reste cette ligne dont on voudrait bien se débarrasser :

Runthread::sleep_ms(50);

les channels sont la solution à notre problème !

Les channels

Nous aimerions donc bien pouvoir continuer l'exécution de notre programme mais uniquement après que les threads aient terminé. On crée un channel via la fonction mpsc::channel. Exemple :

Runuse std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;

fn main() {
    let data = Arc::new(Mutex::new(0u32));

    // on crée le channel
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let (data, tx) = (data.clone(), tx.clone());

        thread::spawn(move || {
            let mut data = data.lock().unwrap();
            *data += 1;

            // on envoie le signal de fin du thread
            tx.send(());
        });
    }

    for _ in 0..10 {
        // on attend le signal de fin du thread
        rx.recv();
    }
}

Dans ce code, on crée 10 threads qui vont chacun envoyer une donnée dans le channel avant de se terminer. Il nous suffit donc d'attendre d'avoir reçu 10 données pour savoir que tous les threads se sont terminés.

Dans le code que je viens de vous montrer, on ne s'en sert que comme d'un signal en envoyant des données vides. Il est cependant possible d'envoyer des données, du moment qu'elles implémentent le trait Send. Exemple :

Runuse std::thread;
use std::sync::mpsc;

fn main() {
    // on crée le channel
    let (tx, rx) = mpsc::channel();

    for _ in 0..10 {
        let tx = tx.clone();

        thread::spawn(move || {
            let answer = 42u32;

            // on envoie la donnée dans le channel
            tx.send(answer);
        });
    }

    match rx.recv() {
        Ok(data) => println!("Le channel vient de recevoir : {}", data),
        Err(e) => println!("Une erreur s'est produite : {:?}", e)
    };
}

Et voilà ! Il est important de noter que seule la méthode send est non-bloquante. Si vous souhaitez ne pas attendre que des données soient disponibles, il vous faudra utiliser la méthode try_recv.

Utilisation détournée

Il est possible d'utiliser un thread pour isoler du code de cette façon :

Runuse std::thread;

match thread::spawn(move || {
    panic!("oops!");
}).join() {
    Ok(_) => println!("Tout s'est bien déroulé"),
    Err(e) => println!("Le thread a planté ! Erreur : {:?}", e)
};

Magique !

Empoisonnement de Mutex

Vous savez maintenant comment partager les données de manière sûre entre des threads. Il reste cependant un petit détail à savoir concernant les mutex : si jamais un thread panic! alors qu'il a le lock, le Mutex sera "empoisonné".

Runuse std::sync::{Arc, Mutex};
use std::thread;

let lock = Arc::new(Mutex::new(0_u32));
let lock2 = lock.clone();

let _ = thread::spawn(move || -> () {
    // On lock
    let _lock = lock2.lock().unwrap();

    // On lance un panic! alors que le mutex est toujours locké
    panic!();
}).join();

Et maintenant vous vous retrouvez dans l'incapacité de lock de nouveau le Mutex dans les autres threads. Il est toutefois possible de "désempoisonner" le mutex :

Runlet mut guard = match lock.lock() {
    Ok(guard) => guard,
    // on récupère les données malgré le fait que le mutex soit lock
    Err(poisoned) => poisoned.into_inner(),
};

*guard += 1;

Autres façons d'utiliser les threads

Il existe un plusieurs crates dans l'écosystème de Rust qui permettent maintenant d'utiliser les threads de manière bien plus simple. Je vous recommande au moins d'y jeter un coup d'oeil :