自旋读写锁实现
自旋读写锁实现
读写锁
读写锁是并发控制的一种同步机制,也称 “共享-互斥锁”、多读者-单写者锁。读操作可以并发重入,写操作是互斥的。
读写锁实现有多种方式,本文描述的是 自旋读写锁 的实现。
优先策略
读写锁的策略分为:
- 读操作优先:允许最大并发,但写操作可能饿死。
- 写操作优先:一旦所有已经开始的读操作完成,等待的写操作立即获得锁。
- 未指定优先级
本文实现的读写锁策略是 写操作优先
自旋读写锁的设计
采用 uint64_t(64位整形)类型作为锁内部值。
写操作占用 1 位最高位,其余位用于读操作。
写操作位用十六进制表示为 0x8000000000000000,每次只能有一个写锁操作。退出时重置写操作位。
读操作位支持多个并发读操作,最高支持 0x7FFFFFFFFFFFFFFF 个读操作。每发生一次读锁定操作,则增加 1,退出时减少 1。
备注:读写都使用 CAS 操作。
实现
自旋读写锁 C 语言实现
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <assert.h>
static const uint64_t SHARED_LOCK_INIT = 0;
static const uint64_t SHARED_LOCK_WRITER_BIT = 1UL << 63;
typedef struct shared_rwlock_s shared_rwlock_t;
struct shared_rwlock_s {
uint64_t lock;
};
void shared_lock_init(shared_rwlock_t *p)
{
p->lock = SHARED_LOCK_INIT;
}
void shared_read_lock(shared_rwlock_t *p)
{
uint64_t value;
for ( ;; ) {
value = p->lock;
// is wirte locked?
if (value >= SHARED_LOCK_WRITER_BIT) {
continue;
}
// increase reader bit.
if (__sync_bool_compare_and_swap(&p->lock, value, value + 1)) {
break;
}
}
}
void shared_read_unlock(shared_rwlock_t *p)
{
assert(p->lock > SHARED_LOCK_INIT);
__sync_sub_and_fetch(&p->lock, 1);
}
void shared_write_lock(shared_rwlock_t *p)
{
uint64_t value;
for ( ;; ) {
value = p->lock;
// is wirte locked?
if (value >= SHARED_LOCK_WRITER_BIT) {
continue;
}
// set write lock bit.
if (__sync_bool_compare_and_swap(&p->lock, value, value | SHARED_LOCK_WRITER_BIT)) {
break;
}
}
// wait for active readers.
while (p->lock != SHARED_LOCK_WRITER_BIT) { /* void */ }
}
void shared_write_unlock(shared_rwlock_t *p)
{
assert(p->lock == SHARED_LOCK_WRITER_BIT);
__sync_sub_and_fetch(&p->lock, SHARED_LOCK_WRITER_BIT);
}
附加功能
在锁内部增加一个循环等待上限值,当循环计数到达阈值时,仍然没有获得锁,让出当前 CPU 时间片。
伪代码
count = 0;
for ( ;; ) {
if (is_write_locked(&rwlock)) {
if (++count >= limit_rate) {
count = 0;
sched_yield();
}
}
...
}
Rust 实现版本
该版本读写锁使用 RAII 哨兵,并增加了 owner 字段,能够发现自身线程在使用过程中产生的死锁问题。
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
use crate::{Error, Result};
// The writer lock bit.
const SHARED_LOCK_WRITER_BIT: u64 = 1u64 << 63;
unsafe impl<T> Send for SharedLock<T> {}
unsafe impl<T> Sync for SharedLock<T> {}
/*
* A reader-writer lock
*/
pub struct SharedLock<T: ?Sized> {
inner: AtomicU64,
owner: AtomicU64,
data: UnsafeCell<T>,
}
impl<T> SharedLock<T> {
pub fn new(t: T) -> Self {
SharedLock {
inner: AtomicU64::default(),
owner: AtomicU64::default(),
data: UnsafeCell::new(t),
}
}
}
impl<T: ?Sized> SharedLock<T> {
pub fn read(&self) -> Result<SharedLockReadGuard<'_, T>> {
SharedLockReadGuard::new(self)
}
pub fn write(&self) -> Result<SharedLockWriteGuard<'_, T>> {
SharedLockWriteGuard::new(self)
}
fn is_hold(&self) -> bool {
let tid = self.owner.load(Ordering::Acquire);
tid > 0 && tid == unsafe { libc::pthread_self() } as u64
}
fn set_owner_id(&self, tid: u64) {
self.owner.store(tid, Ordering::Release);
}
}
/*
* RAII structure used to release the shared read access of a lock when dropped.
* This structure is created by the read methods on SharedLock.
*/
pub struct SharedLockReadGuard<'a, T: ?Sized + 'a> {
lock: &'a SharedLock<T>,
}
impl<'a, T: ?Sized> SharedLockReadGuard<'a, T> {
fn new(lock: &'a SharedLock<T>) -> Result<SharedLockReadGuard<'a, T>> {
if lock.is_hold() {
return Err(Error::DeadLockError);
}
loop {
let value = lock.inner.load(Ordering::Acquire);
if value >= SHARED_LOCK_WRITER_BIT {
continue;
}
if lock
.inner
.compare_exchange(value, value + 1, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
break;
}
}
Ok(SharedLockReadGuard { lock })
}
}
impl<T: ?Sized> Deref for SharedLockReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T: ?Sized> Drop for SharedLockReadGuard<'_, T> {
fn drop(&mut self) {
self.lock.inner.fetch_sub(1, Ordering::Release);
}
}
/*
* RAII structure used to release the exclusive write access of a lock when dropped.
* This structure is created by the write methods on SharedLock.
*/
pub struct SharedLockWriteGuard<'a, T: ?Sized + 'a> {
lock: &'a SharedLock<T>,
}
impl<'a, T: ?Sized> SharedLockWriteGuard<'a, T> {
fn new(lock: &'a SharedLock<T>) -> Result<SharedLockWriteGuard<'a, T>> {
if lock.is_hold() {
return Err(Error::DeadLockError);
}
loop {
let value = lock.inner.load(Ordering::Acquire);
if value >= SHARED_LOCK_WRITER_BIT {
continue;
}
if lock
.inner
.compare_exchange(
value,
value | SHARED_LOCK_WRITER_BIT,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
if lock.owner.load(Ordering::Acquire) != 0 {
return Err(Error::Poisoned);
}
lock.set_owner_id(unsafe { libc::pthread_self() } as u64);
// wait for active readers.
while lock.inner.load(Ordering::Acquire) != SHARED_LOCK_WRITER_BIT {}
Ok(SharedLockWriteGuard { lock })
}
}
impl<T: ?Sized> Deref for SharedLockWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.data.get() }
}
}
impl<T: ?Sized> Drop for SharedLockWriteGuard<'_, T> {
fn drop(&mut self) {
let value = self.lock.inner.load(Ordering::Acquire);
if value != SHARED_LOCK_WRITER_BIT {
panic!("write unlock inner value: {}", value);
}
// reset owner id.
if !self.lock.is_hold() {
panic!(
"Poisoned!!! owner id: {}",
self.lock.owner.load(Ordering::Acquire)
);
}
self.lock.set_owner_id(0);
self.lock
.inner
.fetch_sub(SHARED_LOCK_WRITER_BIT, Ordering::Release);
}
}
impl<T: ?Sized> DerefMut for SharedLockWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.data.get() }
}
}