Rust - Futures library
Futures is a library for writing asynchronous code.
导入库
futures = { version = "0.3.30", features = ["thread-pool"] }
开始
创建一个线程池
use futures::executor::ThreadPool;
fn main() {
// 创建一个线程池
let pool = ThreadPool::new().expect("Failed to build pool");
}
这个是最朴实无华的创建方式,他的本质如下:
pub fn new() -> Self {
Self {
pool_size: cmp::max(1, num_cpus::get()),
stack_size: 0,
name_prefix: None,
after_start: None,
before_stop: None,
}
}
创建高级一点的线程池
use futures::executor::ThreadPool;
fn main() {
// 创建一个线程池
let pool = ThreadPool::builder()
.pool_size(4) // 设置线程池的大小
.after_start(|size| {
println!("Thread started");
}) // 设置线程池的回调函数
.before_stop(|size| {
println!("Thread stopping");
}) // 设置线程池的回调函数
.name_prefix("my-pool-".to_string()) // 设置线程池的名字
.stack_size(8192) // 设置线程栈的大小
.create()
.expect("Failed to build pool");
}
什么是线程栈?
有些人会问什么是线程栈?
其实Linux并没有线程的概念。
是否共享地址空间几乎是进程和 Linux 中所谓线程的唯一区别。
线程创建的时候,加上了 CLONE_VM 标记,这样 线程的内存描述符 将直接指向 父进程的内存描述符。
对于主线程的栈,写时拷贝/动态增长
对于子线程的栈,线程栈不能动态增长
创建一个Future
use futures::executor;
let fut_values = async {
// do something
};
线程池执行feature
pool.spawn_ok(fut_values);
完整代码示例
use futures::channel::mpsc; // 消息通道
use futures::executor; // future 执行器
use futures::executor::ThreadPool; // 线程池
use futures::StreamExt; // Stream 流扩展(比如一些流操作组合)
fn main() {
// 构建一个线程池
// 一个很朴实无华的写法
//let pool = ThreadPool::new().expect("Failed to build pool");
// 一个稍微复杂一点的写法
let pool = ThreadPool::builder()
.pool_size(4) // 设置线程池的大小
.after_start(|size| {
println!("Thread started");
}) // 设置线程池的回调函数
.before_stop(|size| {
println!("Thread stopping");
}) // 设置线程池的回调函数
.name_prefix("my-pool-".to_string()) // 设置线程池的名字
.stack_size(8192) // 设置线程栈的大小
.create()
.expect("Failed to build pool");
// 构建一个无界的channel
// 他有什么用?
// 无界的channel是指channel的容量是无限的,不会有发送者阻塞的情况
// tx代表发送者,rx代表接收者
let (tx, rx) = mpsc::unbounded::<i32>();
// Create a future by an async block, where async is responsible for an
// implementation of Future. At this point no executor has been provided
// to this future, so it will not be running.
let fut_values = async {
// Create another async block, again where the Future implementation
// is generated by async. Since this is inside of a parent async block,
// it will be provided with the executor of the parent block when the parent
// block is executed.
//
// This executor chaining is done by Future::poll whose second argument
// is a std::task::Context. This represents our executor, and the Future
// implemented by this async block can be polled using the parent async
// block's executor.
let fut_tx_result = async move {
(0..100).for_each(|v| {
// 这里有一个send,他和unbounded_send有什么区别?
// send是有界的,当发送者发送的消息超过了接收者的容量,发送者会阻塞
// unbounded_send是无界的,当发送者发送的消息超过了接收者的容量,发送者不会阻塞
tx.unbounded_send(v).expect("Failed to send");
})
};
// Use the provided thread pool to spawn the generated future
// responsible for transmission
pool.spawn_ok(fut_tx_result);
let fut_values = rx
.map(|v| {
v * 2
})
.collect();
// Use the executor provided to this async block to wait for the
// future to complete.
fut_values.await
};
// Actually execute the above future, which will invoke Future::poll and
// subsequently chain appropriate Future::poll and methods needing executors
// to drive all futures. Eventually fut_values will be driven to completion.
let values: Vec<i32> = executor::block_on(fut_values);
println!("Values={:?}", values);
}