Skip to main content

Rust Book Datastructure - Rc

Rust
KIGA
Author
KIGA
This is a personal blog, intended for sharing.
Table of Contents

RC
#

use std::rc::Rc;
use std::sync::Arc;
use std::thread;
fn main() {
    let a = Rc::new(String::from("test ref counting"));
    println!("count after creating a = {}", Rc::strong_count(&a));
    let b = Rc::clone(&a);
    println!("count after creating b = {}", Rc::strong_count(&a));
    {
        let c = Rc::clone(&a);
        println!("count after creating c = {}", Rc::strong_count(&c));
    }
    println!("count after c goes out of scope = {}", Rc::strong_count(&a));

    let s = Arc::new(String::from("多线程漫游者"));
    for _ in 0..10 {
        let s = Arc::clone(&s);
        let handle = thread::spawn(move || println!("{}", s));
    }
}
  • Rc/Arc 是不可变引用,你无法修改它指向的值,只能进行读取,如果要修改,需要配合后面章节的内部可变性 RefCell 或互斥锁 Mutex
  • 一旦最后一个拥有者消失,则资源会自动被回收,这个生命周期是在编译期就确定下来的
  • Rc 只能用于同一线程内部,想要用于线程之间的对象共享,你需要使用 Arc
  • Rc<T> 是一个智能指针,实现了 Deref 特征,因此你无需先解开 Rc 指针,再使用里面的T ,而是可以直接使用 T ,例如上例中的 gadget1.owner.name

Rc + RefCell 组合使用
#

use std::cell::RefCell;
use std::rc::Rc;
fn main() {
    let s = Rc::new(RefCell::new("我很善变,还拥有多个主人".to_string()));
    let s1 = s.clone();
    let s2 = s.clone();
    // let mut s2 = s.borrow_mut();
    s2.borrow_mut().push_str(", on yeah!");
    println!("{:?}\n{:?}\n{:?}", s, s1, s2);
}

weak 解决循环引用
#

use std::rc::Rc;
fn main() {
    // 创建Rc,持有一个值5
    let five = Rc::new(5);
    // 通过Rc,创建一个Weak指针
    let weak_five = Rc::downgrade(&five);
    // Weak引用的资源依然存在,取到值5
    let strong_five: Option<Rc<_>> = weak_five.upgrade();
    assert_eq!(*strong_five.unwrap(), 5);
    // 手动释放资源`five`
    drop(five);
    // Weak引用的资源已不存在,因此返回None
    let strong_five: Option<Rc<_>> = weak_five.upgrade();
    assert_eq!(strong_five, None);
}

解析Code

use std::cell::RefCell;
use std::rc::Rc;
use std::rc::Weak;
// 主人
struct Owner {
    name: String,
    gadgets: RefCell<Vec<Weak<Gadget>>>,
}
// 工具
struct Gadget {
    id: i32,
    owner: Rc<Owner>,
}
fn main() {
    // 创建一个 Owner
    // 需要注意,该 Owner 也拥有多个 `gadgets`
    let gadget_owner: Rc<Owner> = Rc::new(Owner {
        name: "Gadget Man".to_string(),
        gadgets: RefCell::new(Vec::new()),
    });
    // 创建工具,同时与主人进行关联:创建两个 gadget,他们分别持有 gadget_owner 的一个引用。
    let gadget1 = Rc::new(Gadget {
        id: 1,
        owner: gadget_owner.clone(),
    });
    let gadget2 = Rc::new(Gadget {
        id: 2,
        owner: gadget_owner.clone(),
    });
    // 为主人更新它所拥有的工具
    // 因为之前使用了 `Rc`,现在必须要使用 `Weak`,否则就会循环引用
    gadget_owner
        .gadgets
        .borrow_mut()
        .push(Rc::downgrade(&gadget1));
    gadget_owner
        .gadgets
        .borrow_mut()
        .push(Rc::downgrade(&gadget2));
    // 遍历 gadget_owner 的 gadgets 字段
    for gadget_opt in gadget_owner.gadgets.borrow().iter() {
        // gadget_opt 是一个 Weak<Gadget> 。 因为 weak 指针不能保证他所引用的对象
        // 仍然存在。所以我们需要显式的调用 upgrade() 来通过其返回值(Option<_>)来判
        // 断其所指向的对象是否存在。
        // 当然,Option 为 None 的时候这个引用原对象就不存在了。
        let gadget = gadget_opt.upgrade().unwrap();
        println!("Gadget {} owned by {}", gadget.id, gadget.owner.name);
    }
    // 在 main 函数的最后,gadget_owner,gadget1 和 gadget2 都被销毁。
    // 具体是,因为这几个结构体之间没有了强引用(`Rc<T>`),所以,当他们销毁的时候。
    // 首先 gadget2 和 gadget1 被销毁。
    // 然后因为 gadget_owner 的引用数量为 0,所以这个对象可以被销毁了。
    // 循环引用问题也就避免了
}

线程屏障(Barrier)
#

使用 Barrier 让多个线程都执行到某个点后

use std::sync::{Arc, Barrier};
use std::thread;
fn main() {
    let mut handles = Vec::with_capacity(6);
    let barrier = Arc::new(Barrier::new(6));
    for _ in 0..6 {
        let b = barrier.clone();
        handles.push(thread::spawn(move || {
            println!("before wait");
            b.wait();
            println!("after wait");
        }));
    }
    for handle in handles {
        handle.join().unwrap();
    }
}

线程局部变量(Thread Local Variable)
#

use std::cell::RefCell;
use std::sync::{Arc, Barrier};
use std::thread;

thread_local!(static FOO: RefCell<u32> = RefCell::new(1));

fn main() {
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 1);
        *f.borrow_mut() = 2;
    });
    // 每个线程开始时都会拿到线程局部变量的FOO的初始值
    let t = thread::spawn(move || {
        FOO.with(|f| {
            assert_eq!(*f.borrow(), 1);
            *f.borrow_mut() = 3;
        });
    });
    // 等待线程完成
    t.join().unwrap();
    // 尽管子线程中修改为了3,我们在这里依然拥有main线程中的局部值:2
    FOO.with(|f| {
        assert_eq!(*f.borrow(), 2);
    });
}
  • main 线程首先进入 while 循环,调用 wait 方法挂起等待子线程的通知,并释放了锁started
  • 子线程获取到锁,并将其修改为 true ,然后调用条件变量的 notify_one 方法来通知主线程继续执行
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
fn main() {
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = pair.clone();
    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        println!("changing started");
        *started = true;
        cvar.notify_one();
    });
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        started = cvar.wait(started).unwrap();
    }
    println!("started changed");
}

执行初始化过程一次,并且只执行一次

use std::sync::Once;
use std::thread;
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn main() {
    let handle1 = thread::spawn(move || {
        INIT.call_once(|| unsafe {
            VAL = 1;
        });
    });
    let handle2 = thread::spawn(move || {
        INIT.call_once(|| unsafe {
            VAL = 2;
        });
    });
    handle1.join().unwrap();
    handle2.join().unwrap();
    println!("{}", unsafe { VAL });
}

线程间的消息传递
#

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    // 创建一个消息通道, 返回一个元组:(发送者,接收者)
    let (tx, rx) = mpsc::channel();

    // 克隆发送者,发送者可以通过clone方法创建多个发送者,这样就可以将其发送到多个线程中
    let tx1 = tx.clone();

    thread::spawn(move || {
        tx1.send(String::from("World")).unwrap();
    });
    // 创建线程,并发送消息
    thread::spawn(move || {
        let vars = vec![
            String::from("This"),
            String::from("is"),
            String::from("test"),
        ];

        for val in vars {
            // send方法返回Result<T,E>,通过unwrap进行快速错误处理
            tx.send(val).unwrap();
            // 下面代码将报错,因为编译器自动推导出通道传递的值是i32类型,那么Option<i32>类型将产生不匹配错误
            // tx.send(Some(1)).unwrap()
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
    // 在主线程中接收子线程发送的消息并输出
    // println!("receive {}", rx.recv().unwrap());
    // 不会阻塞线程,当通道中没有消息时,它会立刻返回一个错误receive Err(Empty)
    // println!("receive {:?}", rx.try_recv());
}

mpsc::channel 创建的通道是异步通道 与异步通道相反,同步通道发送消息是阻塞的,只有在消息被接收后才解除阻塞

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::sync_channel(0);
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}
睡眠之前
发送之前
睡眠之后
receive 1
发送之后

发送之后的输出是在receive 1之后,说明只有接收消息彻底成功后,发送消息才算完成。

#

use std::sync::Mutex;
fn main() {
    // 使用`Mutex`结构体的关联函数创建新的互斥锁实例
    let m = Mutex::new(5);
    {
        // 获取锁,然后deref为`m`的引用
        // lock返回的是Result.m.lock() 返回一个智能指针MutexGuard<T>
        let mut num = m.lock().unwrap();
        // 实现了 Deref 特征,会被自动解引用后获得一个引用类型,该引用指向 Mutex 内部的数据

        *num = 6;
        // 实现了 Drop 特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁
        // 锁自动被drop
    }
    println!("m = {:?}", m);
}
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("{}", *counter.lock().unwrap());
}

读写锁

use std::sync::RwLock;
fn main() {
    let lock = RwLock::new(5);
    // 同一时间允许多个读
    {
        let r1 = lock.read().unwrap();
        let r2 = lock.read().unwrap();
        assert_eq!(*r1, 5);
        assert_eq!(*r2, 5);
    } // 读锁在此处被drop
      // 同一时间只允许一个写
    {
        let mut w = lock.write().unwrap();
        *w += 1;
        assert_eq!(*w, 6);
        // 以下代码会panic,因为读和写不允许同时存在
        // 写锁w直到该语句块结束才被释放,因此下面的读锁依然处于`w`的作用域中
        // let r1 = lock.read();
        // println!("{:?}",r1);
    } // 写锁在此处被drop
}

解决资源访问顺序的问题
#

用条件变量(Condvar)控制线程的同步

use std::sync::{Arc, Condvar, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
fn main() {
    let flag = Arc::new(Mutex::new(false));
    let cond = Arc::new(Condvar::new());
    let cflag = flag.clone();
    let ccond = cond.clone();
    let hdl = spawn(move || {
        let mut m = { *cflag.lock().unwrap() };
        let mut counter = 0;
        while counter < 3 {
            while !m {
                m = *ccond.wait(cflag.lock().unwrap()).unwrap();
            }
            {
                m = false;
                *cflag.lock().unwrap() = false;
            }
            counter += 1;
            println!("inner counter: {}", counter);
        }
    });
    let mut counter = 0;
    loop {
        sleep(Duration::from_millis(1000));
        *flag.lock().unwrap() = true;
        counter += 1;
        if counter > 3 {
            break;
        }
        println!("outside counter: {}", counter);
        cond.notify_one();
    }
    hdl.join().unwrap();
    println!("{:?}", flag);
}

使用 Atomic 作为全局变量
#

Atomic 的值具有内部可变性,无需将其声明为 mut

use std::ops::Sub;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Instant;
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
fn add_n_times(n: u64) -> JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n {
            R.fetch_add(1, Ordering::Relaxed);
        }
    })
}
fn main() {
    let s = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);
    for _ in 0..N_THREADS {
        threads.push(add_n_times(N_TIMES));
    }
    for thread in threads {
        thread.join().unwrap();
    }
    assert_eq!(N_TIMES * N_THREADS as u64, R.load(Ordering::Relaxed));
    println!("{:?}", Instant::now().sub(s));
}

内存屏障
#

防止编译器和 CPU 将屏障前(Release)和屏障后(Acquire)中的数据操作重新排在屏障围成的范围之外

use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};
static mut DATA: u64 = 0;
static READY: AtomicBool = AtomicBool::new(false);
fn reset() {
    unsafe {
        DATA = 0;
    }
    READY.store(false, Ordering::Relaxed);
}
fn producer() -> JoinHandle<()> {
    thread::spawn(move || {
        unsafe {
            DATA = 100; // A
        }
        READY.store(true, Ordering::Release); // B: 内存屏障 ↑
    })
}
fn consumer() -> JoinHandle<()> {
    thread::spawn(move || {
        while !READY.load(Ordering::Acquire) {} // C: 内存屏障 ↓
        assert_eq!(100, unsafe { DATA }); // D
    })
}
fn main() {
    loop {
        reset();
        let t_producer = producer();
        let t_consumer = consumer();
        t_producer.join().unwrap();
        t_consumer.join().unwrap();
    }
}

原则上, Acquire 用于读取,而 Release 用于写入。 但是由于有些原子操作同时拥有读取和写入的功能,此时就需要使用 AcqRel 来设置内存顺序了。 在内存屏障中被写入的数据,都可以被其它线程读取到,不会有 CPU 缓存的问题。

多线程中使用 Atomic
#

在多线程环境中要使用 Atomic 需要配合 Arc

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{hint, thread};
fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));
    let spinlock_clone = Arc::clone(&spinlock);
    let thread = thread::spawn(move || {
        spinlock_clone.store(0, Ordering::SeqCst);
    });
    // 等待其它线程释放锁
    while spinlock.load(Ordering::SeqCst) != 0 {
        hint::spin_loop();
    }
    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}

Atomic 不能替代锁
#

对于复杂的场景下,锁的使用简单粗暴,不容易有坑

  • std::sync::atomic 包中仅提供了数值类型的原子操作: AtomicBool , AtomicIsize ,AtomicUsize , AtomicI8 , AtomicU16 等,而锁可以应用于各种类型
  • 在有些情况下,必须使用锁来配合,例如上一章节中使用 Mutex 配合 Condvar

Atomic 的应用场景

  • Atomic 虽然对于用户不太常用,但是对于高性能库的开发者、标准库开发者都非常常用, 它是并发原语的基石,除此之外,还有一些场景适用:
  • 无锁(lock free)数据结构
  • 全局变量,例如全局自增 ID, 在后续章节会介绍
  • 跨线程计数器,例如可以用于统计指标
  • 以上列出的只是 Atomic 适用的部分场景,具体场景需要大家未来根据自己的需求进行权衡选择。