自旋读写锁实现

读写锁

读写锁是并发控制的一种同步机制,也称 “共享-互斥锁”、多读者-单写者锁。读操作可以并发重入,写操作是互斥的。

读写锁实现有多种方式,本文描述的是 自旋读写锁 的实现。

优先策略

读写锁的策略分为:

  • 读操作优先:允许最大并发,但写操作可能饿死。
  • 写操作优先:一旦所有已经开始的读操作完成,等待的写操作立即获得锁。
  • 未指定优先级

本文实现的读写锁策略是 写操作优先

自旋读写锁的设计

采用 uint64_t(64位整形)类型作为锁内部值。

写操作占用 1 位最高位,其余位用于读操作。

写操作位用十六进制表示为 0x8000000000000000,每次只能有一个写锁操作。退出时重置写操作位。

读操作位支持多个并发读操作,最高支持 0x7FFFFFFFFFFFFFFF 个读操作。每发生一次读锁定操作,则增加 1,退出时减少 1。

备注:读写都使用 CAS 操作。

实现

自旋读写锁 C 语言实现

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <assert.h>


static const uint64_t SHARED_LOCK_INIT = 0;
static const uint64_t SHARED_LOCK_WRITER_BIT = 1UL << 63;


typedef struct shared_rwlock_s    shared_rwlock_t;

struct shared_rwlock_s {
    uint64_t  lock;
};


void shared_lock_init(shared_rwlock_t *p)
{
    p->lock = SHARED_LOCK_INIT;
}


void shared_read_lock(shared_rwlock_t *p)
{
    uint64_t value;

    for ( ;; ) {
        value = p->lock;

        // is wirte locked?
        if (value >= SHARED_LOCK_WRITER_BIT) {
            continue;
        }

        // increase reader bit.
        if (__sync_bool_compare_and_swap(&p->lock, value, value + 1)) {
            break;
        }
    }
}


void shared_read_unlock(shared_rwlock_t *p)
{
    assert(p->lock > SHARED_LOCK_INIT);

    __sync_sub_and_fetch(&p->lock, 1);
}


void shared_write_lock(shared_rwlock_t *p)
{
    uint64_t value;

    for ( ;; ) {
        value = p->lock;

        // is wirte locked?
        if (value >= SHARED_LOCK_WRITER_BIT) {
            continue;
        }

        // set write lock bit.
        if (__sync_bool_compare_and_swap(&p->lock, value, value | SHARED_LOCK_WRITER_BIT)) {
            break;
        }
    }

    // wait for active readers.
    while (p->lock != SHARED_LOCK_WRITER_BIT) { /* void */ }
}


void shared_write_unlock(shared_rwlock_t *p)
{
    assert(p->lock == SHARED_LOCK_WRITER_BIT);

    __sync_sub_and_fetch(&p->lock, SHARED_LOCK_WRITER_BIT);
}

附加功能

在锁内部增加一个循环等待上限值,当循环计数到达阈值时,仍然没有获得锁,让出当前 CPU 时间片。

伪代码

count = 0;

for ( ;; ) {
    if (is_write_locked(&rwlock)) {
        if (++count >= limit_rate) {
            count = 0;
            sched_yield();
        }
    }
    ...
}

Rust 实现版本

该版本读写锁使用 RAII 哨兵,并增加了 owner 字段,能够发现自身线程在使用过程中产生的死锁问题。

use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};

use crate::{Error, Result};

// The writer lock bit.
const SHARED_LOCK_WRITER_BIT: u64 = 1u64 << 63;

unsafe impl<T> Send for SharedLock<T> {}
unsafe impl<T> Sync for SharedLock<T> {}

/*
 * A reader-writer lock
 */
pub struct SharedLock<T: ?Sized> {
    inner: AtomicU64,
    owner: AtomicU64,
    data: UnsafeCell<T>,
}

impl<T> SharedLock<T> {
    pub fn new(t: T) -> Self {
        SharedLock {
            inner: AtomicU64::default(),
            owner: AtomicU64::default(),
            data: UnsafeCell::new(t),
        }
    }
}

impl<T: ?Sized> SharedLock<T> {
    pub fn read(&self) -> Result<SharedLockReadGuard<'_, T>> {
        SharedLockReadGuard::new(self)
    }

    pub fn write(&self) -> Result<SharedLockWriteGuard<'_, T>> {
        SharedLockWriteGuard::new(self)
    }

    fn is_hold(&self) -> bool {
        let tid = self.owner.load(Ordering::Acquire);
        tid > 0 && tid == unsafe { libc::pthread_self() } as u64
    }

    fn set_owner_id(&self, tid: u64) {
        self.owner.store(tid, Ordering::Release);
    }
}

/*
 * RAII structure used to release the shared read access of a lock when dropped.
 * This structure is created by the read methods on SharedLock.
 */
pub struct SharedLockReadGuard<'a, T: ?Sized + 'a> {
    lock: &'a SharedLock<T>,
}

impl<'a, T: ?Sized> SharedLockReadGuard<'a, T> {
    fn new(lock: &'a SharedLock<T>) -> Result<SharedLockReadGuard<'a, T>> {
        if lock.is_hold() {
            return Err(Error::DeadLockError);
        }

        loop {
            let value = lock.inner.load(Ordering::Acquire);
            if value >= SHARED_LOCK_WRITER_BIT {
                continue;
            }

            if lock
                .inner
                .compare_exchange(value, value + 1, Ordering::Release, Ordering::Relaxed)
                .is_ok()
            {
                break;
            }
        }

        Ok(SharedLockReadGuard { lock })
    }
}

impl<T: ?Sized> Deref for SharedLockReadGuard<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        unsafe { &*self.lock.data.get() }
    }
}

impl<T: ?Sized> Drop for SharedLockReadGuard<'_, T> {
    fn drop(&mut self) {
        self.lock.inner.fetch_sub(1, Ordering::Release);
    }
}

/*
 * RAII structure used to release the exclusive write access of a lock when dropped.
 * This structure is created by the write methods on SharedLock.
 */
pub struct SharedLockWriteGuard<'a, T: ?Sized + 'a> {
    lock: &'a SharedLock<T>,
}

impl<'a, T: ?Sized> SharedLockWriteGuard<'a, T> {
    fn new(lock: &'a SharedLock<T>) -> Result<SharedLockWriteGuard<'a, T>> {
        if lock.is_hold() {
            return Err(Error::DeadLockError);
        }

        loop {
            let value = lock.inner.load(Ordering::Acquire);
            if value >= SHARED_LOCK_WRITER_BIT {
                continue;
            }

            if lock
                .inner
                .compare_exchange(
                    value,
                    value | SHARED_LOCK_WRITER_BIT,
                    Ordering::Release,
                    Ordering::Relaxed,
                )
                .is_ok()
            {
                break;
            }
        }

        if lock.owner.load(Ordering::Acquire) != 0 {
            return Err(Error::Poisoned);
        }
        lock.set_owner_id(unsafe { libc::pthread_self() } as u64);

        // wait for active readers.
        while lock.inner.load(Ordering::Acquire) != SHARED_LOCK_WRITER_BIT {}

        Ok(SharedLockWriteGuard { lock })
    }
}

impl<T: ?Sized> Deref for SharedLockWriteGuard<'_, T> {
    type Target = T;

    fn deref(&self) -> &T {
        unsafe { &*self.lock.data.get() }
    }
}

impl<T: ?Sized> Drop for SharedLockWriteGuard<'_, T> {
    fn drop(&mut self) {
        let value = self.lock.inner.load(Ordering::Acquire);

        if value != SHARED_LOCK_WRITER_BIT {
            panic!("write unlock inner value: {}", value);
        }

        // reset owner id.
        if !self.lock.is_hold() {
            panic!(
                "Poisoned!!! owner id: {}",
                self.lock.owner.load(Ordering::Acquire)
            );
        }
        self.lock.set_owner_id(0);

        self.lock
            .inner
            .fetch_sub(SHARED_LOCK_WRITER_BIT, Ordering::Release);
    }
}

impl<T: ?Sized> DerefMut for SharedLockWriteGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.lock.data.get() }
    }
}

参考

https://github.com/cppcoffee/sharelock-rs

https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock