Rust - Futures library

Futures is a library for writing asynchronous code.

official website

导入库

futures = { version = "0.3.30", features = ["thread-pool"] }

开始

创建一个线程池

use futures::executor::ThreadPool;

fn main() {
    // 创建一个线程池
    let pool = ThreadPool::new().expect("Failed to build pool");
}

这个是最朴实无华的创建方式,他的本质如下:

pub fn new() -> Self {
    Self {
        pool_size: cmp::max(1, num_cpus::get()),
        stack_size: 0,
        name_prefix: None,
        after_start: None,
        before_stop: None,
    }
}

创建高级一点的线程池

use futures::executor::ThreadPool;

fn main() {
    // 创建一个线程池
    let pool = ThreadPool::builder()
        .pool_size(4) // 设置线程池的大小
        .after_start(|size| {
            println!("Thread started");
        }) // 设置线程池的回调函数
        .before_stop(|size| {
            println!("Thread stopping");
        }) // 设置线程池的回调函数
        .name_prefix("my-pool-".to_string()) // 设置线程池的名字
        .stack_size(8192) // 设置线程栈的大小
        .create()
        .expect("Failed to build pool");
}

什么是线程栈?

有些人会问什么是线程栈?

其实Linux并没有线程的概念。

是否共享地址空间几乎是进程和 Linux 中所谓线程的唯一区别。

线程创建的时候,加上了 CLONE_VM 标记,这样 线程的内存描述符 将直接指向 父进程的内存描述符。

对于主线程的栈,写时拷贝/动态增长

对于子线程的栈,线程栈不能动态增长

创建一个Future

use futures::executor;

let fut_values = async {
    // do something
};

线程池执行feature

pool.spawn_ok(fut_values);

完整代码示例

use futures::channel::mpsc;   // 消息通道
use futures::executor;  // future 执行器
use futures::executor::ThreadPool; // 线程池
use futures::StreamExt; // Stream 流扩展(比如一些流操作组合)

fn main() {
    // 构建一个线程池
    // 一个很朴实无华的写法
    //let pool = ThreadPool::new().expect("Failed to build pool");

    // 一个稍微复杂一点的写法
    let pool = ThreadPool::builder()
        .pool_size(4) // 设置线程池的大小
        .after_start(|size| {
            println!("Thread started");
        }) // 设置线程池的回调函数
        .before_stop(|size| {
            println!("Thread stopping");
        }) // 设置线程池的回调函数
        .name_prefix("my-pool-".to_string()) // 设置线程池的名字
        .stack_size(8192) // 设置线程栈的大小
        .create()
        .expect("Failed to build pool");


    // 构建一个无界的channel
    // 他有什么用?
    // 无界的channel是指channel的容量是无限的,不会有发送者阻塞的情况
    // tx代表发送者,rx代表接收者
    let (tx, rx) = mpsc::unbounded::<i32>();

    // Create a future by an async block, where async is responsible for an
    // implementation of Future. At this point no executor has been provided
    // to this future, so it will not be running.
    let fut_values = async {
        // Create another async block, again where the Future implementation
        // is generated by async. Since this is inside of a parent async block,
        // it will be provided with the executor of the parent block when the parent
        // block is executed.
        //
        // This executor chaining is done by Future::poll whose second argument
        // is a std::task::Context. This represents our executor, and the Future
        // implemented by this async block can be polled using the parent async
        // block's executor.
        let fut_tx_result = async move {
            (0..100).for_each(|v| {
                // 这里有一个send,他和unbounded_send有什么区别?
                // send是有界的,当发送者发送的消息超过了接收者的容量,发送者会阻塞
                // unbounded_send是无界的,当发送者发送的消息超过了接收者的容量,发送者不会阻塞
                tx.unbounded_send(v).expect("Failed to send");
            })
        };

        // Use the provided thread pool to spawn the generated future
        // responsible for transmission
        pool.spawn_ok(fut_tx_result);

        let fut_values = rx
            .map(|v| {
                v * 2
            })
            .collect();

        // Use the executor provided to this async block to wait for the
        // future to complete.
        fut_values.await
    };

    // Actually execute the above future, which will invoke Future::poll and
    // subsequently chain appropriate Future::poll and methods needing executors
    // to drive all futures. Eventually fut_values will be driven to completion.
    let values: Vec<i32> = executor::block_on(fut_values);

    println!("Values={:?}", values);
}