介绍 锁的概念 锁 – 是保证线程安全常见的同步工具。锁是一种非强制的机制,如果使用了锁的话,每一个线程在访问数据或者资源前,要先获取锁,并在访问结束之后释放锁。如果锁已经被占用,其它试图获取锁的线程会等待,直到锁重新可用。
两种锁——自旋锁与互斥锁 自旋锁
如果共享数据已经有其它线程加锁了,资源申请线程会以死循环的方式等待锁。一旦被访问的资源被解锁,则等待资源的线程会立即执行。
不会引起调用线程睡眠,处于忙等状态。
当获取到锁之后,等待资源的线程会立即执行,不会进行线程调度,CPU时间片轮转等耗时操作。
虽然会一直自旋等待获取锁,但不会一直占用CPU,超过了操作系统分配的时间片会被强制挂起。
自旋锁如果不能保证所有线程都是同一优先级,则可能造成死锁。
不适用于较长时间的任务,因为会一直占用 CPU 资源。
互斥锁
如果共享数据已经有其它线程加锁了,资源申请线程会进入休眠状态等待锁。一旦被访问的资源被解锁,则等待资源的线程会被唤醒。
互斥锁又分为递归锁和非递归锁:
让等待的线程进入休眠等待,一定程度上影响效率,会进行线程调度,CPU时间片轮转等耗时操作。
适用于较长时间的任务。
主动让出时间片(互斥的做法,休眠)并不总是代表效率高。让出时间片会导致操作系统切换到另一个线程,这种上下文切换通常需要 10 微秒左右,而且至少需要两次切换。如果等待时间很短,比如只有几个微秒,忙等就比线程睡眠更高效。
两种锁的使用选择 自旋锁
预计线程等待锁的时间很短。
加锁的代码(临界区)经常被调用,但竞争情况很少发生。
CPU 资源不紧张。
多核处理器。
互斥锁
预计线程等待锁的时间较长。
临界区有 IO 操作。
临界区代码复杂或者循环量大。
临界区竞争非常激烈。
单核处理器。
几种常见的锁
OSSpinLock 自旋锁
不再安全的 OSSpinLock 大神的文章已经说了这个锁的问题,苹果也废弃了这个锁。
如果一个低优先级的线程获得锁并访问共享资源,这时一个高优先级的线程也尝试获得这个锁,它会处于 spin lock 的忙等状态从而占用大量 CPU 。此时低优先级线程无法与高优先级线程争夺 CPU 时间,从而导致任务迟迟完不成、无法释放 lock 。
os_unfair_lock 互斥锁
os_unfair_lock 是苹果替换 OSSpinLock 的锁。可以在 objc-runtime 源码中可以看到 runtime 中使用的 OSSpinLock 的都已经替换为了 os_unfair_lock。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // objc-locks.h extern mutex_t selLock; #if CONFIG_USE_CACHE_LOCK extern mutex_t cacheUpdateLock; #endif extern recursive_mutex_t loadMethodLock; extern mutex_t crashlog_lock; extern spinlock_t objcMsgLogLock; extern mutex_t AltHandlerDebugLock; extern mutex_t AssociationsManagerLock; // objc-os.h using spinlock_t = mutex_tt<LOCKDEBUG>; using mutex_t = mutex_tt<LOCKDEBUG>; using recursive_mutex_t = recursive_mutex_tt<LOCKDEBUG>;
上面 mutex_tt、recursive_mutex_tt 都是类。它们的源码太多,这里就不贴出来了。它们都是基于 os_unfair_lock 的封装使用。
os_unfair_lock 用于取代不安全的 OSSpinLock,从 iOS10 开始才支持。从底层调用看,等待 os_unfair_lock 锁的线程会处于休眠状态,并非忙等。
所以,os_unfair_lock 属于互斥锁。
1 2 3 4 5 6 7 8 // 初始化 os_unfair_lock lock = OS_UNFAIR_LOCK_INIT; //尝试加锁 os_unfair_lock_trylock(&lock); // 加锁 os_unfair_lock_lock(&lock); // 解锁 os_unfair_lock_unlock(&lock);
atomic 直接查看源码中 atomic 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void objc_setProperty_atomic(id self, SEL _cmd, id newValue, ptrdiff_t offset) { reallySetProperty(self, _cmd, newValue, offset, true, false, false); } static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy) { // ... if (!atomic) { // 不是 atomic 修饰 oldValue = *slot; *slot = newValue; } else { // 如果是 atomic 修饰,加一把同步锁,保证 setter 的安全 spinlock_t& slotlock = PropertyLocks[slot]; slotlock.lock(); oldValue = *slot; *slot = newValue; slotlock.unlock(); } } id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) { // ... // 非原子属性,直接返回值 if (!atomic) return *slot; // 原子属性,加同步锁,保证 getter 的安全 spinlock_t& slotlock = PropertyLocks[slot]; slotlock.lock(); id value = objc_retain(*slot); slotlock.unlock(); }
这里在 set 和 get 的方法中,都添加了一个 spinlock_t 锁(是一个 os_unfair_lock),保证了 atomic 属性的 getter/setter 操作的完整性。
所以 atomic 不能保证对象多线程的安全。它只是能保证你访问的时候给你返回一个完好无损的 value 而已。
@synchronized @synchronized 的实现源码,我们可以在 objc_runtime 源码中 objc_sync.mm 文件下找到。
@synchronize 的使用本质上就是使用 objc_sync_enter 和 objc_sync_exit 的成对使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 int objc_sync_enter(id obj) { int result = OBJC_SYNC_SUCCESS; if (obj) { SyncData* data = id2data(obj, ACQUIRE); ASSERT(data); data->mutex.lock(); } else { // @synchronized(nil) does nothing if (DebugNilSync) { _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug"); } objc_sync_nil(); } return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int objc_sync_exit(id obj) { int result = OBJC_SYNC_SUCCESS; if (obj) { SyncData* data = id2data(obj, RELEASE); if (!data) { result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR; } else { bool okay = data->mutex.tryUnlock(); if (!okay) { result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR; } } } else { // @synchronized(nil) does nothing } return result; }
在继续看源码之前先将其中涉及到的数据结构先看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 // 可以看成是一个链表的节点 typedef struct alignas(CacheLineSize) SyncData { struct SyncData* nextData; // 指向另一个 SyncData 对象的指针 DisguisedPtr<objc_object> object; // 传入的 object int32_t threadCount; // 使用这个 SyncData 对象中锁的线程数量 recursive_mutex_t mutex; // 递归锁 } SyncData; // 每个 SyncList 结构体都有个指向 SyncData 节点链表头部的指针,也有一个用于防止多个线程对此列表做并发修改的锁。 struct SyncList { SyncData *data; spinlock_t lock; constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { } }; static StripedMap<SyncList> sDataLists; // 缓存中的 item typedef struct { SyncData *data; unsigned int lockCount; // 这个线程锁定这个 object 的次数 } SyncCacheItem; // 线程缓存 typedef struct SyncCache { unsigned int allocated; unsigned int used; SyncCacheItem list[0]; } SyncCache;
接下来可以来看看源码中的实现了,在具体的实现逻辑中,我们可以看到通过 id2data 方法,对 obj 进行了捕获和释放的操作,并生成了一个 SyncData 类型的对象。我们发现 SyncData 是一个结构体,而且有一个SyncData类型的nextData变量,指向下个数据,所以我们可以知道SyncData是一个链表结构中的一个元素。所以这是一个递归锁。
准备SyncData
我们可以看到会会通过 LOCK_FOR_OBJ 和 LIST_FOR_OBJ 取出 object 所对应的 lockp 和 listp。
1 2 3 4 5 6 7 static SyncData* id2data(id object, enum usage why) { spinlock_t *lockp = &LOCK_FOR_OBJ(object); SyncData **listp = &LIST_FOR_OBJ(object); SyncData* result = NULL; ... }
既然我们在任何地方都可以直接通过调用方法来使用,那么说明底层必然维护着一套内部的存储。通过代码我们也可以看出,系统在底层维护了一个哈希表,里面存储了 SyncList 结构的数据,而 SyncList 是一个结构体,包含一个 SyncData 的头结点和一个 spinlock_t 锁对象
快速检查线程缓存
此步操作会通过 tls 封装的相关 pthead 操作线程的相关增删改查方法,获取到单个线程中缓存的 SyncData 数据,并进行快速查询和缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 static SyncData* id2data(id object, enum usage why) { ... #if SUPPORT_DIRECT_THREAD_KEYS // Check per-thread single-entry fast cache for matching object ✅// 检查每线程单项快速缓存中是否有匹配的对象 bool fastCacheOccupied = NO; ✅// 通过tls相关封装的pthead方法获取是否有再底层存储的SyncData SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY); if (data) { fastCacheOccupied = YES; ✅// 如果获取到的数据和传入数据相同 if (data->object == object) { // Found a match in fast cache. uintptr_t lockCount; result = data; lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY); if (result->threadCount <= 0 || lockCount <= 0) { _objc_fatal("id2data fastcache is buggy"); } switch(why) { case ACQUIRE: { // 如果是 entry,则对 lockCount 加 1,并通过 tls 保存 lockCount++; tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount); break; } case RELEASE: // 如果是 exit,则对 lockCount 减 1,并通过 tls 保存 lockCount--; tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount); if (lockCount == 0) { // remove from fast cache // 如果 lockCount 为 0,则从高速缓存中删除 tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL); // atomic because may collide with concurrent ACQUIRE OSAtomicDecrement32Barrier(&result->threadCount); } break; case CHECK: // do nothing break; } return result; } } #endif ... }
检查有锁线程中的缓存
检查所有线程中的缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static SyncData* id2data(id object, enum usage why) { ... // Check per-thread cache of already-owned locks for matching object // 检查已拥有锁的每个线程高速缓存中是否有匹配的对象 SyncCache *cache = fetch_cache(NO); if (cache) { unsigned int i; for (i = 0; i < cache->used; i++) { SyncCacheItem *item = &cache->list[i]; if (item->data->object != object) continue; // Found a match. result = item->data; if (result->threadCount <= 0 || item->lockCount <= 0) { _objc_fatal("id2data cache is buggy"); } switch(why) { case ACQUIRE: item->lockCount++; break; case RELEASE: item->lockCount--; if (item->lockCount == 0) { // remove from per-thread cache cache->list[i] = cache->list[--cache->used]; // atomic because may collide with concurrent ACQUIRE OSAtomicDecrement32Barrier(&result->threadCount); } break; case CHECK: // do nothing break; } return result; } } ... }
全局哈希表查找
如果上述两步中,单个线程和已经锁住的线程中的缓存数据都没有找到的话,那么就会来到此步,回来系统保存的哈希表中 SyncList 结果中,进行链式查找。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static SyncData* id2data(id object, enum usage why) { ... { SyncData* p; SyncData* firstUnused = NULL; for (p = *listp; p != NULL; p = p->nextData) { if ( p->object == object ) { result = p; // atomic because may collide with concurrent RELEASE OSAtomicIncrement32Barrier(&result->threadCount); goto done; } if ( (firstUnused == NULL) && (p->threadCount == 0) ) firstUnused = p; } // no SyncData currently associated with object if ( (why == RELEASE) || (why == CHECK) ) goto done; // an unused one was found, use it if ( firstUnused != NULL ) { result = firstUnused; result->object = (objc_object *)object; result->threadCount = 1; goto done; } } ... }
生成新数据并写入缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 static SyncData* id2data(id object, enum usage why) { ... posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData)); result->object = (objc_object *)object; result->threadCount = 1; new (&result->mutex) recursive_mutex_t(fork_unsafe_lock); result->nextData = *listp; *listp = result; done: lockp->unlock(); if (result) { // Only new ACQUIRE should get here. // All RELEASE and CHECK and recursive ACQUIRE are // handled by the per-thread caches above. ✅// 只有创建的 SyncData 才能进入这里。 ✅// 所有的释放、检查和递归获取都是由上面的线程缓存处理 if (why == RELEASE) { // Probably some thread is incorrectly exiting // while the object is held by another thread. return nil; } if (why != ACQUIRE) _objc_fatal("id2data is buggy"); if (result->object != object) _objc_fatal("id2data is buggy"); #if SUPPORT_DIRECT_THREAD_KEYS if (!fastCacheOccupied) { // Save in fast thread cache ✅// 存入快速线程缓存 tls_set_direct(SYNC_DATA_DIRECT_KEY, result); tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1); } else #endif { // Save in thread cache ✅// 存入线程缓存 if (!cache) cache = fetch_cache(YES); cache->list[cache->used].data = result; cache->list[cache->used].lockCount = 1; cache->used++; } } return result; }
至此一个 @synchronized 的相关操作已经执行完成。总结来说就是底层保存了一个哈希表,其中存储了 SyncData 结构的一个链表,通过线程缓存等操作,来进行增删改查,从来实现加解锁。但是操作结构复杂,步骤多,导致性能较差,而且需要注意传入的 obj 不能为空,否则无法进行锁操作。
dispatch_semaphore GCD 同步的一种方式,通过 dispatch_semaphore_t 信号量来实现。
当我们将信号量的初始值设置为 1 时,可以将其当做互斥锁来使用。
1 2 3 4 5 6 7 8 // 生成信号量,参数 value 是信号量计数的初始值 _lock = dispatch_semaphore_create(1); // 让信号量值减一,当信号量值为0时会等待(直到超时),否则正常执行 dispatch_semaphore_wait(_lock, DISPATCH_TIME_FOREVER); // 让信号量值加一,如果有通过 dispatch_semaphore_wait 函数等待 Dispatch Semaphore 的计数值增加的线程,会由系统唤醒最先等待的线程执行 dispatch_semaphore_signal(_lock);
拓展一下
YYDisskCache 在磁盘缓存的情况下,使用的信号量的方式对多线程进行加锁处理。磁盘 IO 的情况下,线程的等待时间较长,在这种场景下信号量的休眠等待机制较自旋锁的忙等(下面 pthread_mutex 介绍了 YYMemoryCache 的自旋忙等)更有优势。
pthread_mutex 这里还是借用一下大神的图:
可以看到 pthread_mutex 的效率还是很强大的。大神的 YYMemoryCache 中也是使用 pthread_mutex_t 来对多线程情况下进行的加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 // 声明互斥锁 pthread_mutex_t lock; // 声明互斥锁属性对象 pthread_mutexattr_t attr; // 初始化属性对象 pthread_mutexattr_init(&attr); // 设置互斥锁的类型属性 pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL); // pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK); // pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); // pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_DEFAULT); // PTHREAD_MUTEX_NORMAL // 利用属性初始化锁 pthread_mutex_init(&lock, &attr); pthread_mutex_init(&lock, NULL); pthread_mutex_trylock(&lock); pthread_mutex_lock(&lock); pthread_mutex_unlock(&lock); pthread_mutexattr_destroy(&attr); pthread_mutex_destroy(&lock);
在 iOS 中使用时,默认为 PTHREAD_MUTEX_NORMAL。
此类型的互斥锁不会检测死锁。如果线程在不首先解除互斥锁的情况下尝试重新锁定该互斥锁,则会产生死锁。尝试解除由其他线程锁定的互斥锁会产生不确定的行为。如果尝试解除锁定的互斥锁未锁定,则会产生不确定的行为。
此类型的互斥锁可提供错误检查。如果线程在不首先解除锁定互斥锁的情况下尝试重新锁定该互斥锁,则会返回错误。如果线程尝试解除锁定的互斥锁已经由其他线程锁定,则会返回错误。如果线程尝试解除锁定的互斥锁未锁定,则会返回错误。
类型的互斥锁不同,对此类型互斥锁进行重新锁定时不会产生死锁情况。多次锁定互斥锁需要进行相同次数的解除锁定才可以释放该锁,然后其他线程才能获取该互斥锁。如果线程尝试解除锁定的互斥锁已经由其他线程锁定,则会返回错误。 如果线程尝试解除锁定的互斥锁未锁定,则会返回错误。
上面就是 pthread_mutex_t 的基本使用。
更多的使用方式可以参考这篇文章 ,里面详细的介绍了 pthread_mutext 的配置。
拓展一下
pthread_mutex_t 是互斥锁,它有一个特性:当多个线程出现数据竞争时,除了”竞争成功”的那个线程外,其他线程都会进入被动挂起状态,而当”竞争成功”的那个线程解锁时,会主动去将其他线程激活,这个过程包含了上下文的切换,CPU 抢占,信号发送等开销,很明显,互斥锁的起始开销有些大,效率低于自旋锁。
所以在 YYMemoryCache 使用了 pthread_mutex_trylock () 尝试解锁,若解锁失败该方法会立即返回,让当前线程不会进入被动的挂起状态(也可以说阻塞),在下一次循环时又继续尝试获取锁。这个过程很有意思,感觉是手动实现了一个自旋锁。而自旋锁有个需要注意的问题是:死循环等待的时间越长,对 CPU 的消耗越大。所以作者做了一个很短的睡眠 usleep(10 * 1000);,有效的减小了循环的调用次数,至于这个睡眠时间的长度为什么是 10ms, 作者应该做了测试。
NSLock、NSRecurisiveLock、NSCondition、NSConditionLock NSLock 源码在 CoreFundation 框架中,无法进行查看,所以我们看 Swift 版本的 CoreFundation 实现,来类比 NSLock 实现,源码在这里 。
本来只是查看 NSLock 的,然后发现 NSRecurisiveLock、NSCondition、NSConditionLock 都在这里,就放在一起记录一下吧。
下面我们先看看源码:(这里的源码是删减版的,若是有兴趣可以去查看完整代码 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 private typealias _MutexPointer = UnsafeMutablePointer<pthread_mutex_t> private typealias _RecursiveMutexPointer = UnsafeMutablePointer<pthread_mutex_t> private typealias _ConditionVariablePointer = UnsafeMutablePointer<pthread_cond_t> open class NSLock: NSObject, NSLocking { internal var mutex = _MutexPointer.allocate(capacity: 1) private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1) private var timeoutMutex = _MutexPointer.allocate(capacity: 1) public override init() { pthread_mutex_init(mutex, nil) pthread_cond_init(timeoutCond, nil) pthread_mutex_init(timeoutMutex, nil) } open func lock() { pthread_mutex_lock(mutex) } open func unlock() { pthread_mutex_unlock(mutex) // Wakeup any threads waiting in lock(before:) pthread_mutex_lock(timeoutMutex) pthread_cond_broadcast(timeoutCond) pthread_mutex_unlock(timeoutMutex) } } open class NSRecursiveLock: NSObject, NSLocking { internal var mutex = _RecursiveMutexPointer.allocate(capacity: 1) private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1) private var timeoutMutex = _MutexPointer.allocate(capacity: 1) public override init() { super.init() var attrib = pthread_mutexattr_t() withUnsafeMutablePointer(to: &attrib) { attrs in pthread_mutexattr_init(attrs) pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE)) pthread_mutex_init(mutex, attrs) } pthread_cond_init(timeoutCond, nil) pthread_mutex_init(timeoutMutex, nil) } } open class NSCondition: NSObject, NSLocking { internal var mutex = _MutexPointer.allocate(capacity: 1) internal var cond = _ConditionVariablePointer.allocate(capacity: 1) public override init() { pthread_mutex_init(mutex, nil) pthread_cond_init(cond, nil) } open func lock() { pthread_mutex_lock(mutex) } open func unlock() { pthread_mutex_unlock(mutex) } open func wait() { pthread_cond_wait(cond, mutex) } open func signal() { pthread_cond_signal(cond) } open func broadcast() { pthread_cond_broadcast(cond) } } open class NSConditionLock : NSObject, NSLocking { internal var _cond = NSCondition() internal var _value: Int internal var _thread: _swift_CFThreadRef? open var condition: Int { return _value } public convenience override init() { self.init(condition: 0) } public init(condition: Int) { _value = condition } open func lock() { let _ = lock(before: Date.distantFuture) } open func unlock() { _cond.lock() _thread = nil _cond.broadcast() _cond.unlock() } open func lock(before limit: Date) -> Bool { _cond.lock() while _thread != nil { if !_cond.wait(until: limit) { _cond.unlock() return false } } _thread = pthread_self() _cond.unlock() return true } }
从源码我们看到,这四种锁都是对 pthread_mutex 的上层封装。
NSLock 不是递归锁,只是对
NSRecursiveLock 只是在内部初始化 pthread_mutex 的时候添加了属性 PTHREAD_MUTEX_RECURSIVE。
NSCondition 底层实现其实就是一个互斥锁和条件变量的封装。
NSCondition 是对 mutex 和 cond 的一种封装。cond 就是用于访问和操作特定类型数据的指针。
wait 操作在没有超时时,会阻塞线程,使其进入休眠状态,需要在 lock 状态下使用。
signal 操作是唤醒一个正在休眠等待的线程,需要在 lock 状态下使用。
broadcast 唤醒所有正在等待的线程,需要在 lock 状态下使用。
NSConditionLock 是 NSCondition 加线程数的封装,继承 NSLocking 协议,也有 lock 和 unlock 等方法。实现了类似dispatch_semaphore的效果。