专注、坚持

iOS 多线程技术实践之 @synchronized(四)

2021.06.01 by kingcos
Preface · 序
@synchronized 是 Obj-C 中常用的同步代码块,用于在多线程中保护资源访问的安全性。

What

大致流程

为了保护代码的并发执行,我们尝试使用 @synchronized 即同步代码块写一段代码:

// SomeClass.m

#import <Foundation/Foundation.h>

@implementation SomeClass

- (void)someFunc {
    printf(">>>>>\n");

    @synchronized(self) {
        printf("Protected code.\n");
    }

    printf("<<<<<\n");
}

@end

接着,我们使用 xcrun -sdk iphoneos clang -arch arm64 -rewrite-objc SomeClass.m -o SomeClass.cpp 命令将以上代码重写为 C++,并找到 someFunc 方法的 C++ 实现:

// main.mm

// 避免无法编译而添加的空实现
int objc_sync_enter(id obj) { return 0; }
int objc_sync_exit(id obj) { return 0; }
void objc_exception_throw(id exception) {}

// @implementation SomeClass

static void _I_SomeClass_someFunc(SomeClass * self, SEL _cmd) {
    printf(">>>>>\n"); // 将要进入 @synchronized Block

    {
        id _rethrow = 0;
        id _sync_obj = (id)self;    // 同步对象,这里为 self
        
        objc_sync_enter(_sync_obj); // 执行前调用 objc_sync_enter,传入同步对象
        
        try {
            struct _SYNC_EXIT {
                _SYNC_EXIT(id arg) : sync_exit(arg) {} // 构造方法 sync_exit = arg;
                ~_SYNC_EXIT() {                        // 析构方法
                    objc_sync_exit(sync_exit);         // 析构时(即局部变量超出其作用域时)调用 objc_sync_exit,传入同步对象
                }
                id sync_exit;                          // 成员变量
            } _sync_exit(_sync_obj);                   // 声明 _sync_exit 变量,类型为 _SYNC_EXIT 结构体

            printf("CRUD\n");                          // 同步代码块的实际执行内容
        } catch (id e) {                               // 代码块结束,_sync_exit 变量将要销毁,调用析构函数
            _rethrow = e;                              // 出现异常时,保存异常对象
        }
        
        {
            struct _FIN {
                _FIN(id reth) : rethrow(reth) {}                // 构造方法 rethrow = reth;
                ~_FIN() {                                       // 析构方法
                    if (rethrow) objc_exception_throw(rethrow); // 析构时调用 objc_exception_throw,传入异常对象
                }
                id rethrow;                                     // 成员变量
            } _fin_force_rethow(_rethrow);                      // 声明 _fin_force_rethow 变量,类型为 _FIN 结构体
        }                                                       // 代码块结束,_fin_force_rethow 变量将要销毁,调用析构函数
    }

    printf("<<<<<\n"); // 将要离开 @synchronized 代码块
}

// @end

由上可以得出 @synchronized 执行的大致流程:

objc_sync_enter(_sync_obj);                 // 执行前调用 objc_sync_enter,传入同步对象
printf("CRUD\n");                           // 同步代码块的实际执行内容,若出现异常,保存异常对象
objc_sync_exit(_sync_obj);                  // 析构时调用 objc_sync_exit,传入同步对象
if (rethrow) objc_exception_throw(rethrow); // 析构时调用 objc_exception_throw,传入异常对象

入口与出口

关于 objc_sync_enter 入口与 objc_sync_exit 出口函数,我们可以在 Apple 开源的 objc4 中查看其最新实现:

// objc-sync.mm

// -> 入口 ➡️

// Begin synchronizing on 'obj'.
// 开始在「obj」上进行同步。
// Allocates recursive mutex associated with 'obj' if needed.
// 必要时,将分配与「obj」关联的递归互斥锁。
// Returns OBJC_SYNC_SUCCESS once lock is acquired.
// 一旦当锁可以获得时,返回 OBJC_SYNC_SUCCESS(0)。
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS; // 枚举值,0

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE); // 同步对象转 SyncData
        ASSERT(data);
        data->mutex.lock(); // 加锁
    } else {
        // @synchronized(nil) does nothing
        // 传入对象为 nil 时,无效
        if (DebugNilSync) {
            // 可以通过在 Xcode - Edit Scheme - Environment Variables 中
            // 添加 OBJC_DEBUG_NIL_SYNC 环境变量为 YES 以在控制台显示 nil 时的日志
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}

// <- 出口 ⬅️

// End synchronizing on 'obj'. 
// 结束在「obj」上的同步。
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
// 返回 OBJC_SYNC_SUCCESS(0)或 OBJC_SYNC_NOT_OWNING_THREAD_ERROR(-1)。
int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); // 同步对象转 SyncData
        if (!data) {
            // data 为空时,无法获得线程
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            // 尝试解锁
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                // 解锁失败时,无法获得线程
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
        // 传入对象为 nil 时,无效
    }
 

    return result;
}

// objc-sync.h

enum {
    OBJC_SYNC_SUCCESS                 = 0,
    OBJC_SYNC_NOT_OWNING_THREAD_ERROR = -1
};

由上,在入口与出口函数中,主要是针对同步对象的判空,并在非空时调用 id2data 函数将同步对象封装为 SyncData,再进行加锁或解锁操作。

数据结构

在分析 id2data 函数前,我们先对涉及的数据结构进行一定的了解。

StripedMap<T>

// objc-private.h

// 缓存行大小是计算机体系结构中一个重要的概念,对于多线程编程来说,可以通过使数据对齐到缓存行的边界来避免伪共享等性能问题。
enum { CacheLineSize = 64 };

// objc-sync.mm

// 关于 alignas(expression),详见下文 ⬇️
typedef struct alignas(CacheLineSize) SyncData {
    struct SyncData* nextData;        // 指向下一个同步数据(链表)
    DisguisedPtr<objc_object> object; // 对象;关于 DisguisedPtr,可详见 Reference 中的链接
    int32_t threadCount;              // number of THREADS using this block 使用该代码块的线程数
    recursive_mutex_t mutex;          // 递归锁,允许同一线程多次获取锁
} SyncData;

struct SyncList {
    SyncData *data;
    spinlock_t lock;

    // constexpr:该关键字于 C++11 引入;
    // - 修饰构造函数时(如下),使得自定义类型的对象可以用作有效的常量表达式;
    // - 修饰函数时,若函数参数可以在编译时刻计算出来,那么该函数将可以产生编译时刻的常量,否则与普通函数无异;
    // - 修饰对象时,constexpr 限定为编译时刻常量,const 修饰则限定为运行时常量。

    // 在初始化时,data 为 nil,lock 为 fork_unsafe_lock
    // data = nil; lock = fork_unsafe_lock;
    constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
};

// Use multiple parallel lists to decrease contention among unrelated objects.
// 使用多个并行列表来减少不相关对象之间的竞争。
#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;
// objc-private.h

enum { CacheLineSize = 64 };

// 提供了一种高效的方式来管理大量的锁对象,以降低在并发环境中的锁竞争和性能损失

// StripedMap<T> is a map of void* -> T, sized appropriately 
// for cache-friendly lock striping. 
// StripedMap<T> 是一种 void* -> T 的映射,且恰好适用于缓存友好的锁分离。
// For example, this may be used as StripedMap<spinlock_t>
// or as StripedMap<SomeStruct> where SomeStruct stores a spin lock.
// 例如,其可能被用作 StripedMap<spinlock_t> 或当 SomeStruct 存储自旋锁时用作 StripedMap<SomeStruct>。
template<typename T>
class StripedMap {
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
    enum { StripeCount = 8 };
#else
    enum { StripeCount = 64 };
#endif

    struct PaddedT {
        // 确保 value 成员按照缓存行大小对齐
        T value alignas(CacheLineSize);
    };

    PaddedT array[StripeCount];

    // 根据指针的地址值计算出在数组中的索引位置。通过对地址值进行位移和异或操作,可以增加哈希的随机性,并减小哈希冲突的可能性,从而更均匀地分配指针到数组中不同的位置。
    static unsigned int indexForPointer(const void *p) {
        uintptr_t addr = reinterpret_cast<uintptr_t>(p);
        return ((addr >> 4) ^ (addr >> 9)) % StripeCount;
    }

 public:
    T& operator[] (const void *p) { 
        return array[indexForPointer(p)].value; 
    }
    const T& operator[] (const void *p) const { 
        return const_cast<StripedMap<T>>(this)[p]; 
    }

    // Shortcuts for StripedMaps of locks.
    void lockAll() {
        for (unsigned int i = 0; i < StripeCount; i++) {
            array[i].value.lock();
        }
    }

    void unlockAll() {
        for (unsigned int i = 0; i < StripeCount; i++) {
            array[i].value.unlock();
        }
    }

    void forceResetAll() {
        for (unsigned int i = 0; i < StripeCount; i++) {
            array[i].value.forceReset();
        }
    }

    void defineLockOrder() {
        for (unsigned int i = 1; i < StripeCount; i++) {
            lockdebug_lock_precedes_lock(&array[i-1].value, &array[i].value);
        }
    }

    void precedeLock(const void *newlock) {
        // assumes defineLockOrder is also called
        lockdebug_lock_precedes_lock(&array[StripeCount-1].value, newlock);
    }

    void succeedLock(const void *oldlock) {
        // assumes defineLockOrder is also called
        lockdebug_lock_precedes_lock(oldlock, &array[0].value);
    }

    const void *getLock(int i) {
        if (i < StripeCount) return &array[i].value;
        else return nil;
    }
    
#if DEBUG
    StripedMap() {
        // Verify alignment expectations.
        uintptr_t base = (uintptr_t)&array[0].value;
        uintptr_t delta = (uintptr_t)&array[1].value - base;
        ASSERT(delta % CacheLineSize == 0);
        ASSERT(base % CacheLineSize == 0);
    }
#else
    constexpr StripedMap() {}
#endif
};

关于 alignas(expression)

在 C++ 11 中,引入了该对齐修饰符(Alignas Specifier),即在声明变量或自定义结构体等类型时,可以用该修饰符控制内存对齐,相关文档介绍如下:

alignas(expression) must be an integral constant expression that evaluates to zero, or to a valid value for an alignment or extended alignment.

译:alignas(expression) 必须是一个整数常数表达式,其值为零,或者是一个对齐或扩展对齐的有效值。

—— alignas specifier, C++ Reference

先从一个简单的结构体开始,其中包含 ab 两个成员:

// main.mm

struct A {
    int a;
    int b;
};

struct A a = { 1, 2 };
struct A b = { 3, 4 };

printf("%zu, %zu\n", alignof(int), sizeof(int));           // 4, 4
printf("%zu, %zu\n", alignof(struct A), sizeof(struct A)); // 4, 8

// LLDB:
// (lldb) p &a
// (A *) $0 = 0x00007ffeefbff3f8
// (lldb) p &b
// (A *) $1 = 0x00007ffeefbff3f0
// (lldb) x/16xb 0x00007ffeefbff3f0
// 0x7ffeefbff3f0: 0x03 0x00 0x00 0x00 0x04 0x00 0x00 0x00
// 0x7ffeefbff3f8: 0x01 0x00 0x00 0x00 0x02 0x00 0x00 0x00

由上面的输出我们可以得出,struct A 按 4 字节对齐,该类型变量的大小为对齐数的两倍 —— 8 字节,其中的两个成员分别占用 4 字节。接着我们尝试使用 alignas(expression)

struct B {
    alignas(long) int a;
    int b;
};

struct B a = { 1, 2 };
struct B b = { 3, 4 };

printf("%zu, %zu\n", alignof(long), sizeof(long));         // 8, 8
printf("%zu, %zu\n", alignof(struct B), sizeof(struct B)); // 8, 8

// LLDB:
// (lldb) p &a
// (B *) $0 = 0x00007ffeefbff3f8
// (lldb) p &b
// (B *) $1 = 0x00007ffeefbff3f0
// (lldb) x/16xb 0x00007ffeefbff3f0
// 0x7ffeefbff3f0: 0x03 0x00 0x00 0x00 0x04 0x00 0x00 0x00
// 0x7ffeefbff3f8: 0x01 0x00 0x00 0x00 0x02 0x00 0x00 0x00

此时,struct B 将按照成员的最大对齐数 —— 即 8 字节的 alignas(long) int a; 对齐,但该成员仍然只占用 int 类型的 4 字节,最终结构体本身占用 8 字节,也符合内存对齐。我们可以在 struct B 的基础上再添加一个成员:

struct C {
    alignas(long) int a;
    int b;
    char c;
};

struct C a = { 1, 2, 5 };
struct C b = { 3, 4, 6 };

printf("%zu, %zu\n", alignof(long), sizeof(long));         // 8, 8
printf("%zu, %zu\n", alignof(char), sizeof(char));         // 1, 1
printf("%zu, %zu\n", alignof(struct C), sizeof(struct C)); // 8, 16

// LLDB:
// (lldb) p &a
// (C *) $0 = 0x00007ffeefbff3f0
// (lldb) p &b
// (C *) $1 = 0x00007ffeefbff3e0
// (lldb) x/16xb 0x00007ffeefbff3e0
// 0x7ffeefbff3e0: 0x03 0x00 0x00 0x00 0x04 0x00 0x00 0x00
// 0x7ffeefbff3e8: 0x06 0x00 0x00 0x00 0x00 0x00 0x00 0x00
// (lldb) x/32xb 0x00007ffeefbff3e0
// 0x7ffeefbff3e0: 0x03 0x00 0x00 0x00 0x04 0x00 0x00 0x00
// 0x7ffeefbff3e8: 0x06 0x00 0x00 0x00 0x00 0x00 0x00 0x00
// 0x7ffeefbff3f0: 0x01 0x00 0x00 0x00 0x02 0x00 0x00 0x00
// 0x7ffeefbff3f8: 0x05 0x00 0x00 0x00 0x00 0x00 0x00 0x00

此时,即使新增成员仅占用 1 个字节,但由于结构体按照内部成员最大对齐数对齐,因此 struct C 类型的结构体变量将实际占用内存对齐的 2 倍 —— 即 16 字节。

id2data

id2data 的本质是将传入的 object 封装成 SyncData,当缓存中存在或全局 sDataLists 包含时则直接使用,否则新建 SyncData 并缓存:

// objc-sync.mm

// Use multiple parallel lists to decrease contention among unrelated objects.
#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;

enum usage { ACQUIRE, RELEASE, CHECK };

static SyncData* id2data(id object, enum usage why)
{
    spinlock_t *lockp = &LOCK_FOR_OBJ(object); // 自旋锁
    SyncData **listp = &LIST_FOR_OBJ(object);  // 数据 -> sDataLists[object].data -> array[indexForPointer(object)].value.data -> SyncData
    SyncData* result = NULL;

#if SUPPORT_DIRECT_THREAD_KEYS
    // Check per-thread single-entry fast cache for matching object
    bool fastCacheOccupied = NO;
    // 从线程本地存储中获取与 SYNC_DATA_DIRECT_KEY 关联的数据,并将其转换为 SyncData* 类型
    SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
    if (data) {
        fastCacheOccupied = YES;

        if (data->object == object) {
            // 快速缓存中匹配到
            // Found a match in fast cache.
            uintptr_t lockCount;

            result = data;
            lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
            if (result->threadCount <= 0  ||  lockCount <= 0) {
                _objc_fatal("id2data fastcache is buggy");
            }

            switch(why) {
            case ACQUIRE: {
                lockCount++;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                break;
            }
            case RELEASE:
                lockCount--;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                if (lockCount == 0) {
                    // remove from fast cache
                    tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }
#endif

    // Check per-thread cache of already-owned locks for matching object
    // 检查当前线程的缓存,看是否存在已经拥有的与指定对象相关的锁
    SyncCache *cache = fetch_cache(NO);
    if (cache) {
        unsigned int i;
        for (i = 0; i < cache->used; i++) {
            SyncCacheItem *item = &cache->list[i];
            if (item->data->object != object) continue;

            // Found a match.
            result = item->data;
            if (result->threadCount <= 0  ||  item->lockCount <= 0) {
                _objc_fatal("id2data cache is buggy");
            }
                
            switch(why) {
            case ACQUIRE:
                item->lockCount++;
                break;
            case RELEASE:
                item->lockCount--;
                if (item->lockCount == 0) {
                    // remove from per-thread cache
                    cache->list[i] = cache->list[--cache->used];
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }

    // Thread cache didn't find anything.
    // 线程缓存未命中。
    // Walk in-use list looking for matching object
    // Spinlock prevents multiple threads from creating multiple 
    // locks for the same new object.
    // We could keep the nodes in some hash table if we find that there are
    // more than 20 or so distinct locks active, but we don't do that now.
    
    lockp->lock();
    // 使用自旋锁,在数据中查找

    {
        SyncData* p;
        SyncData* firstUnused = NULL;
        for (p = *listp; p != NULL; p = p->nextData) {
            if ( p->object == object ) {
                result = p;
                // atomic because may collide with concurrent RELEASE
                OSAtomicIncrement32Barrier(&result->threadCount);
                goto done;
            }
            if ( (firstUnused == NULL) && (p->threadCount == 0) )
                firstUnused = p;
        }
    
        // no SyncData currently associated with object
        if ( (why == RELEASE) || (why == CHECK) )
            goto done;
    
        // an unused one was found, use it
        if ( firstUnused != NULL ) {
            result = firstUnused;
            result->object = (objc_object *)object;
            result->threadCount = 1;
            goto done;
        }
    }

    // Allocate a new SyncData and add to list.
    // 分配新的 SyncData 并添加到列表。
    // XXX allocating memory with a global lock held is bad practice,
    // might be worth releasing the lock, allocating, and searching again.
    // But since we never free these guys we won't be stuck in allocation very often.
    posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
    result->object = (objc_object *)object;
    result->threadCount = 1;
    new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
    result->nextData = *listp;
    *listp = result;
    
 done:
    lockp->unlock();
    if (result) {
        // Only new ACQUIRE should get here.
        // All RELEASE and CHECK and recursive ACQUIRE are 
        // handled by the per-thread caches above.
        if (why == RELEASE) {
            // Probably some thread is incorrectly exiting 
            // while the object is held by another thread.
            return nil;
        }
        if (why != ACQUIRE) _objc_fatal("id2data is buggy");
        if (result->object != object) _objc_fatal("id2data is buggy");

#if SUPPORT_DIRECT_THREAD_KEYS
        if (!fastCacheOccupied) {
            // Save in fast thread cache
            tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
            tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
        } else 
#endif
        {
            // Save in thread cache
            if (!cache) cache = fetch_cache(YES);
            cache->list[cache->used].data = result;
            cache->list[cache->used].lockCount = 1;
            cache->used++;
        }
    }

    return result;
}

问与答

为什么同步对象为空时无效

// objc.os.h

/* Use this for functions that are intended to be breakpoint hooks.
   If you do not, the compiler may optimize them away.
   BREAKPOINT_FUNCTION( void stop_on_error(void) ); */
#   define BREAKPOINT_FUNCTION(prototype)                             \
    OBJC_EXTERN __attribute__((noinline, used, visibility("hidden"))) \
    prototype { asm(""); }
OBJC_EXTERN __attribute__((noinline, used, visibility("hidden"))) void objc_sync_nil(void) { asm(""); }

Reference