RC #
use std::rc::Rc;
use std::sync::Arc;
use std::thread;
fn main() {
let a = Rc::new(String::from("test ref counting"));
println!("count after creating a = {}", Rc::strong_count(&a));
let b = Rc::clone(&a);
println!("count after creating b = {}", Rc::strong_count(&a));
{
let c = Rc::clone(&a);
println!("count after creating c = {}", Rc::strong_count(&c));
}
println!("count after c goes out of scope = {}", Rc::strong_count(&a));
let s = Arc::new(String::from("多线程漫游者"));
for _ in 0..10 {
let s = Arc::clone(&s);
let handle = thread::spawn(move || println!("{}", s));
}
}
- Rc/Arc 是不可变引用,你无法修改它指向的值,只能进行读取,如果要修改,需要配合后面章节的内部可变性 RefCell 或互斥锁 Mutex
- 一旦最后一个拥有者消失,则资源会自动被回收,这个生命周期是在编译期就确定下来的
- Rc 只能用于同一线程内部,想要用于线程之间的对象共享,你需要使用 Arc
Rc<T>
是一个智能指针,实现了 Deref 特征,因此你无需先解开 Rc 指针,再使用里面的T ,而是可以直接使用 T ,例如上例中的 gadget1.owner.name
Rc + RefCell 组合使用 #
use std::cell::RefCell;
use std::rc::Rc;
fn main() {
let s = Rc::new(RefCell::new("我很善变,还拥有多个主人".to_string()));
let s1 = s.clone();
let s2 = s.clone();
// let mut s2 = s.borrow_mut();
s2.borrow_mut().push_str(", on yeah!");
println!("{:?}\n{:?}\n{:?}", s, s1, s2);
}
weak 解决循环引用 #
use std::rc::Rc;
fn main() {
// 创建Rc,持有一个值5
let five = Rc::new(5);
// 通过Rc,创建一个Weak指针
let weak_five = Rc::downgrade(&five);
// Weak引用的资源依然存在,取到值5
let strong_five: Option<Rc<_>> = weak_five.upgrade();
assert_eq!(*strong_five.unwrap(), 5);
// 手动释放资源`five`
drop(five);
// Weak引用的资源已不存在,因此返回None
let strong_five: Option<Rc<_>> = weak_five.upgrade();
assert_eq!(strong_five, None);
}
解析Code
use std::cell::RefCell;
use std::rc::Rc;
use std::rc::Weak;
// 主人
struct Owner {
name: String,
gadgets: RefCell<Vec<Weak<Gadget>>>,
}
// 工具
struct Gadget {
id: i32,
owner: Rc<Owner>,
}
fn main() {
// 创建一个 Owner
// 需要注意,该 Owner 也拥有多个 `gadgets`
let gadget_owner: Rc<Owner> = Rc::new(Owner {
name: "Gadget Man".to_string(),
gadgets: RefCell::new(Vec::new()),
});
// 创建工具,同时与主人进行关联:创建两个 gadget,他们分别持有 gadget_owner 的一个引用。
let gadget1 = Rc::new(Gadget {
id: 1,
owner: gadget_owner.clone(),
});
let gadget2 = Rc::new(Gadget {
id: 2,
owner: gadget_owner.clone(),
});
// 为主人更新它所拥有的工具
// 因为之前使用了 `Rc`,现在必须要使用 `Weak`,否则就会循环引用
gadget_owner
.gadgets
.borrow_mut()
.push(Rc::downgrade(&gadget1));
gadget_owner
.gadgets
.borrow_mut()
.push(Rc::downgrade(&gadget2));
// 遍历 gadget_owner 的 gadgets 字段
for gadget_opt in gadget_owner.gadgets.borrow().iter() {
// gadget_opt 是一个 Weak<Gadget> 。 因为 weak 指针不能保证他所引用的对象
// 仍然存在。所以我们需要显式的调用 upgrade() 来通过其返回值(Option<_>)来判
// 断其所指向的对象是否存在。
// 当然,Option 为 None 的时候这个引用原对象就不存在了。
let gadget = gadget_opt.upgrade().unwrap();
println!("Gadget {} owned by {}", gadget.id, gadget.owner.name);
}
// 在 main 函数的最后,gadget_owner,gadget1 和 gadget2 都被销毁。
// 具体是,因为这几个结构体之间没有了强引用(`Rc<T>`),所以,当他们销毁的时候。
// 首先 gadget2 和 gadget1 被销毁。
// 然后因为 gadget_owner 的引用数量为 0,所以这个对象可以被销毁了。
// 循环引用问题也就避免了
}
线程屏障(Barrier) #
使用 Barrier 让多个线程都执行到某个点后
use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
let mut handles = Vec::with_capacity(6);
let barrier = Arc::new(Barrier::new(6));
for _ in 0..6 {
let b = barrier.clone();
handles.push(thread::spawn(move || {
println!("before wait");
b.wait();
println!("after wait");
}));
}
for handle in handles {
handle.join().unwrap();
}
}
线程局部变量(Thread Local Variable) #
use std::cell::RefCell;
use std::sync::{Arc, Barrier};
use std::thread;
thread_local!(static FOO: RefCell<u32> = RefCell::new(1));
fn main() {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 2;
});
// 每个线程开始时都会拿到线程局部变量的FOO的初始值
let t = thread::spawn(move || {
FOO.with(|f| {
assert_eq!(*f.borrow(), 1);
*f.borrow_mut() = 3;
});
});
// 等待线程完成
t.join().unwrap();
// 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
FOO.with(|f| {
assert_eq!(*f.borrow(), 2);
});
}
- main 线程首先进入 while 循环,调用 wait 方法挂起等待子线程的通知,并释放了锁started
- 子线程获取到锁,并将其修改为 true ,然后调用条件变量的 notify_one 方法来通知主线程继续执行
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = pair.clone();
thread::spawn(move || {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
println!("changing started");
*started = true;
cvar.notify_one();
});
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
println!("started changed");
}
执行初始化过程一次,并且只执行一次
use std::sync::Once;
use std::thread;
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn main() {
let handle1 = thread::spawn(move || {
INIT.call_once(|| unsafe {
VAL = 1;
});
});
let handle2 = thread::spawn(move || {
INIT.call_once(|| unsafe {
VAL = 2;
});
});
handle1.join().unwrap();
handle2.join().unwrap();
println!("{}", unsafe { VAL });
}
线程间的消息传递 #
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// 创建一个消息通道, 返回一个元组:(发送者,接收者)
let (tx, rx) = mpsc::channel();
// 克隆发送者,发送者可以通过clone方法创建多个发送者,这样就可以将其发送到多个线程中
let tx1 = tx.clone();
thread::spawn(move || {
tx1.send(String::from("World")).unwrap();
});
// 创建线程,并发送消息
thread::spawn(move || {
let vars = vec![
String::from("This"),
String::from("is"),
String::from("test"),
];
for val in vars {
// send方法返回Result<T,E>,通过unwrap进行快速错误处理
tx.send(val).unwrap();
// 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
// tx.send(Some(1)).unwrap()
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {}", received);
}
// 在主线程中接收子线程发送的消息并输出
// println!("receive {}", rx.recv().unwrap());
// 不会阻塞线程,当通道中没有消息时,它会立刻返回一个错误receive Err(Empty)
// println!("receive {:?}", rx.try_recv());
}
mpsc::channel 创建的通道是异步通道 与异步通道相反,同步通道发送消息是阻塞的,只有在消息被接收后才解除阻塞
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::sync_channel(0);
let handle = thread::spawn(move || {
println!("发送之前");
tx.send(1).unwrap();
println!("发送之后");
});
println!("睡眠之前");
thread::sleep(Duration::from_secs(3));
println!("睡眠之后");
println!("receive {}", rx.recv().unwrap());
handle.join().unwrap();
}
睡眠之前
发送之前
睡眠之后
receive 1
发送之后
发送之后的输出是在receive 1之后,说明只有接收消息彻底成功后,发送消息才算完成。
锁 #
use std::sync::Mutex;
fn main() {
// 使用`Mutex`结构体的关联函数创建新的互斥锁实例
let m = Mutex::new(5);
{
// 获取锁,然后deref为`m`的引用
// lock返回的是Result.m.lock() 返回一个智能指针MutexGuard<T>
let mut num = m.lock().unwrap();
// 实现了 Deref 特征,会被自动解引用后获得一个引用类型,该引用指向 Mutex 内部的数据
*num = 6;
// 实现了 Drop 特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁
// 锁自动被drop
}
println!("m = {:?}", m);
}
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("{}", *counter.lock().unwrap());
}
读写锁
use std::sync::RwLock;
fn main() {
let lock = RwLock::new(5);
// 同一时间允许多个读
{
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
assert_eq!(*r1, 5);
assert_eq!(*r2, 5);
} // 读锁在此处被drop
// 同一时间只允许一个写
{
let mut w = lock.write().unwrap();
*w += 1;
assert_eq!(*w, 6);
// 以下代码会panic,因为读和写不允许同时存在
// 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
// let r1 = lock.read();
// println!("{:?}",r1);
} // 写锁在此处被drop
}
解决资源访问顺序的问题 #
用条件变量(Condvar)控制线程的同步
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
fn main() {
let flag = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
let cflag = flag.clone();
let ccond = cond.clone();
let hdl = spawn(move || {
let mut m = { *cflag.lock().unwrap() };
let mut counter = 0;
while counter < 3 {
while !m {
m = *ccond.wait(cflag.lock().unwrap()).unwrap();
}
{
m = false;
*cflag.lock().unwrap() = false;
}
counter += 1;
println!("inner counter: {}", counter);
}
});
let mut counter = 0;
loop {
sleep(Duration::from_millis(1000));
*flag.lock().unwrap() = true;
counter += 1;
if counter > 3 {
break;
}
println!("outside counter: {}", counter);
cond.notify_one();
}
hdl.join().unwrap();
println!("{:?}", flag);
}
使用 Atomic 作为全局变量 #
Atomic 的值具有内部可变性,无需将其声明为 mut
use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
thread::spawn(move || {
for _ in 0..n {
R.fetch_add(1, Ordering::Relaxed);
}
})
}
fn main() {
let s = Instant::now();
let mut threads = Vec::with_capacity(N_THREADS);
for _ in 0..N_THREADS {
threads.push(add_n_times(N_TIMES));
}
for thread in threads {
thread.join().unwrap();
}
assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
println!("{:?}", Instant::now().sub(s));
}
内存屏障 #
防止编译器和 CPU 将屏障前(Release)和屏障后(Acquire)中的数据操作重新排在屏障围成的范围之外
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);
fn reset() {
unsafe {
DATA = 0;
}
READY.store(false, Ordering::Relaxed);
}
fn producer() -> JoinHandle<()> {
thread::spawn(move || {
unsafe {
DATA = 100; // A
}
READY.store(true, Ordering::Release); // B: 内存屏障 ↑
})
}
fn consumer() -> JoinHandle<()> {
thread::spawn(move || {
while !READY.load(Ordering::Acquire) {} // C: 内存屏障 ↓
assert_eq!(100, unsafe { DATA }); // D
})
}
fn main() {
loop {
reset();
let t_producer = producer();
let t_consumer = consumer();
t_producer.join().unwrap();
t_consumer.join().unwrap();
}
}
原则上, Acquire 用于读取,而 Release 用于写入。 但是由于有些原子操作同时拥有读取和写入的功能,此时就需要使用 AcqRel 来设置内存顺序了。 在内存屏障中被写入的数据,都可以被其它线程读取到,不会有 CPU 缓存的问题。
多线程中使用 Atomic #
在多线程环境中要使用 Atomic 需要配合 Arc
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{hint, thread};
fn main() {
let spinlock = Arc::new(AtomicUsize::new(1));
let spinlock_clone = Arc::clone(&spinlock);
let thread = thread::spawn(move || {
spinlock_clone.store(0, Ordering::SeqCst);
});
// 等待其它线程释放锁
while spinlock.load(Ordering::SeqCst) != 0 {
hint::spin_loop();
}
if let Err(panic) = thread.join() {
println!("Thread had an error: {:?}", panic);
}
}
Atomic 不能替代锁 #
对于复杂的场景下,锁的使用简单粗暴,不容易有坑
- std::sync::atomic 包中仅提供了数值类型的原子操作: AtomicBool , AtomicIsize ,AtomicUsize , AtomicI8 , AtomicU16 等,而锁可以应用于各种类型
- 在有些情况下,必须使用锁来配合,例如上一章节中使用 Mutex 配合 Condvar
Atomic 的应用场景
- Atomic 虽然对于用户不太常用,但是对于高性能库的开发者、标准库开发者都非常常用, 它是并发原语的基石,除此之外,还有一些场景适用:
- 无锁(lock free)数据结构
- 全局变量,例如全局自增 ID, 在后续章节会介绍
- 跨线程计数器,例如可以用于统计指标
- 以上列出的只是 Atomic 适用的部分场景,具体场景需要大家未来根据自己的需求进行权衡选择。