MMKV

参考

https://github.com/Tencent/MMKV/wiki/android_ipc

https://github.com/Tencent/MMKV/wiki/design

原理总结

MMKV 本质上是将文件 mmap 到内存块中,将新增的 key-value 统统 append 到内存中;到达边界后,进行重整回写以腾出空间,空间还是不够的话,就 double 内存空间;对于内存文件中可能存在的重复键值,MMKV 只选用最后写入的作为有效键值。

状态同步

写指针的同步

我们可以在每个进程内部缓存自己的写指针,然后在写入键值的同时,还要把最新的写指针位置也写到 mmap 内存中;这样每个进程只需要对比一下缓存的指针与 mmap 内存的写指针,如果不一样,就说明其他进程进行了写操作。事实上 MMKV 原本就在文件头部保存了有效内存的大小,这个数值刚好就是写指针的内存偏移量,我们可以重用这个数值来校对写指针。

内存重整的感知

考虑使用一个单调递增的序列号,每次发生内存重整,就将序列号递增。将这个序列号也放到 mmap 内存中,每个进程内部也缓存一份,只需要对比序列号是否一致,就能够知道其他进程是否触发了内存重整。

内存增长的感知

事实上 MMKV 在内存增长之前,会先尝试通过内存重整来腾出空间,重整后还不够空间才申请新的内存。所以内存增长可以跟内存重整一样处理。至于新的内存大小,可以通过查询文件大小来获得,无需在 mmap 内存另外存放。

挑选进程锁

  • 文件锁,优点是天然 robust,缺点是不支持递归加锁,也不支持读写锁升级/降级,需要自行实现。
  • pthread_mutex,优点是 pthread 库支持递归加锁,也支持读写锁升级/降级,缺点是不 robust,需要自行清理。

文件锁

到这里我们已经完成了数据的多进程同步工作,是时候回头处理锁事了,亦即前面提到的递归锁和锁升级/降级。

递归锁

意思是如果一个进程/线程已经拥有了锁,那么后续的加锁操作不会导致卡死,并且解锁也不会导致外层的锁被解掉。对于文件锁来说,前者是满足的,后者则不然。因为文件锁是状态锁,没有计数器,无论加了多少次锁,一个解锁操作就全解掉。只要用到子函数,就非常需要递归锁。

锁升级/降级

锁升级是指将已经持有的共享锁,升级为互斥锁,亦即将读锁升级为写锁;锁降级则是反过来。文件锁支持锁升级,但是容易死锁:假如 A、B 进程都持有了读锁,现在都想升级到写锁,就会陷入相互等待的困境,发生死锁。另外,由于文件锁不支持递归锁,也导致了锁降级无法进行,一降就降到没有锁。

为了解决递归锁和锁升级/降级这两个难题,需要对文件锁(系统调用flock)进行封装,增加读锁、写锁计数器。处理逻辑如下表:

读锁计数器 写锁计数器 加读锁 加写锁 解读锁 解写锁
0 0 加读锁 加写锁 - -
0 1 +1 +1 - 解写锁
0 N +1 +1 - -1
1 0 +1 解读锁再加写锁 解读锁 -
1 1 +1 +1 -1 加读锁
1 N +1 +1 -1 -1
N 0 +1 解读锁再加写锁 -1 -
N 1 +1 +1 -1 加读锁
N N +1 +1 -1 -1

需要注意的地方有两点:

  • 加写锁时,如果当前已经持有读锁,那么先尝试加写锁,try_lock 失败说明其他进程持有了读锁,我们需要先将自己的读锁释放掉,再进行加写锁操作,以避免死锁的发生。(这里的死锁指的是:本进程先加了读锁,之后又尝试加写锁,而这个写锁要等到前面那个读锁释放之后才能加上,而这是不可能的,因此造成死锁)。
  • 解写锁时,假如之前曾经持有读锁,那么我们不能直接释放掉写锁,这样会导致读锁也解了。我们应该加一个读锁,将锁降级。
graph LR
加读锁-->直接加
加写锁-->首次加写锁,如果有读锁,需要先unlock,防止死锁
加写锁-->非首次加写锁,直接+1

解读锁-->直接解
解写锁-->写锁数量大于1直接-1
解写锁-->写锁数量为1,如果有读锁,加读锁锁降级,避免解写锁时读锁也解了,锁失效

MMKV

initialize

public static String initialize(String rootDir, LibLoader loader, MMKVLogLevel logLevel) {
    if (loader != null) {
        if (BuildConfig.FLAVOR.equals("SharedCpp")) {
            loader.loadLibrary("c++_shared");
        }
        loader.loadLibrary("mmkv");
    } else {
        if (BuildConfig.FLAVOR.equals("SharedCpp")) {
            System.loadLibrary("c++_shared");
        }
        System.loadLibrary("mmkv");
    }
    jniInitialize(rootDir, logLevel2Int(logLevel));
    MMKV.rootDir = rootDir;
    return MMKV.rootDir;
}

loadLibrary

jniinitialize

registerContentChangeNotify

// content change notification of other process
// trigger by getXXX() or setXXX() or checkContentChangedByOuterProcess()
private static MMKVContentChangeNotification gContentChangeNotify;
public static void registerContentChangeNotify(MMKVContentChangeNotification notify) {
    gContentChangeNotify = notify;
    setWantsContentChangeNotify(gContentChangeNotify != null);
}

setwantscontentchangenotify

onContentChangedByOuterProcess

private static void onContentChangedByOuterProcess(String mmapID) {
    if (gContentChangeNotify != null) {
        gContentChangeNotify.onContentChangedByOuterProcess(mmapID);
    }
}

defaultMMKV

public static MMKV defaultMMKV() {
    long handle = getDefaultMMKV(SINGLE_PROCESS_MODE, null);
    return checkProcessMode(handle, "DefaultMMKV", SINGLE_PROCESS_MODE);
}

native-bridge.cpp

JNI_OnLoad

extern "C" JNIEXPORT JNICALL jint JNI_OnLoad(JavaVM *vm, void *reserved) {
    g_currentJVM = vm;
    JNIEnv *env;
    if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_6) != JNI_OK) {
        return -1;
    }
    static const char *clsName = "com/tencent/mmkv/MMKV";
    jclass instance = env->FindClass(clsName);
    g_cls = reinterpret_cast<jclass>(env->NewGlobalRef(instance));
  
    int ret = registerNativeMethods(env, g_cls);
    return JNI_VERSION_1_6;
}

registerNativeMethods

static int registerNativeMethods(JNIEnv *env, jclass cls) {
    return env->RegisterNatives(cls, g_methods, sizeof(g_methods) / sizeof(g_methods[0]));
}

jniInitialize

MMKV_JNI void jniInitialize(JNIEnv *env, jobject obj, jstring rootDir, jint logLevel) {
    const char *kstr = env->GetStringUTFChars(rootDir, nullptr);
    if (kstr) {
        MMKV::initializeMMKV(kstr, (MMKVLogLevel) logLevel);
        env->ReleaseStringUTFChars(rootDir, kstr);
    }
}

setWantsContentChangeNotify

MMKV_JNI void setWantsContentChangeNotify(JNIEnv *env, jclass type, jboolean notify) {
    if (notify == JNI_TRUE) {
        MMKV::registerContentChangeHandler(onContentChangedByOuterProcess);
    } else {
        MMKV::unRegisterContentChangeHandler();
    }
}

registercontentchangehandler

onContentChangedByOuterProcess_n

static void onContentChangedByOuterProcess(const std::string &mmapID) {
    auto currentEnv = getCurrentEnv();
    if (currentEnv && g_callbackOnContentChange) {
        jstring str = string2jstring(currentEnv, mmapID);
        currentEnv->CallStaticVoidMethod(g_cls, g_callbackOnContentChange, str);
    }
}

g_callbackOnContentChange

getDefaultMMKV

MMKV_JNI jlong getDefaultMMKV(JNIEnv *env, jobject obj, jint mode, jstring cryptKey) {
    MMKV *kv = nullptr;

    if (cryptKey) {
        string crypt = jstring2string(env, cryptKey);
        if (crypt.length() > 0) {
            kv = MMKV::defaultMMKV((MMKVMode) mode, &crypt);
        }
    }
    if (!kv) {
        kv = MMKV::defaultMMKV((MMKVMode) mode, nullptr);
    }

    return (jlong) kv;
}

getMMKVWithID

MMKV_JNI jlong getMMKVWithID(JNIEnv *env, jobject, jstring mmapID, jint mode, jstring cryptKey, jstring rootPath) {
    string str = jstring2string(env, mmapID);
    kv = MMKV::mmkvWithID(str, DEFAULT_MMAP_SIZE, (MMKVMode) mode, &crypt, &path);
    return (jlong) kv;
}

mmkvwithid

MMKV.cpp

initializeMMKV

void MMKV::initializeMMKV(const MMKVPath_t &rootDir, MMKVLogLevel logLevel) {
    g_currentLogLevel = logLevel;

    ThreadLock::ThreadOnce(&once_control, initialize);

    g_rootDir = rootDir;
    mkPath(g_rootDir);

}

initialize

void initialize() {
    g_instanceDic = new unordered_map<string, MMKV *>;
    g_instanceLock = new ThreadLock();
    g_instanceLock->initialize();

    mmkv::DEFAULT_MMAP_SIZE = mmkv::getPageSize();
}

registerContentChangeHandler

void MMKV::registerContentChangeHandler(mmkv::ContentChangeHandler handler) {
    g_contentChangeHandler = handler;
}

defaultMMKV

MMKV *MMKV::defaultMMKV(MMKVMode mode, string *cryptKey) {
#ifndef MMKV_ANDROID
    return mmkvWithID(DEFAULT_MMAP_ID, mode, cryptKey);
#else
    return mmkvWithID(DEFAULT_MMAP_ID, DEFAULT_MMAP_SIZE, mode, cryptKey);
#endif
}

mmkvWithID

MMKV *MMKV::mmkvWithID(const string &mmapID, int size, MMKVMode mode, string *cryptKey, string *rootPath) {
    SCOPED_LOCK(g_instanceLock);

    auto mmapKey = mmapedKVKey(mmapID, rootPath);
    auto itr = g_instanceDic->find(mmapKey);
    if (itr != g_instanceDic->end()) {
        MMKV *kv = itr->second;
        return kv;
    }
    auto kv = new MMKV(mmapID, size, mode, cryptKey, rootPath);
    (*g_instanceDic)[mmapKey] = kv;
    return kv;
}

MMKV()

MMKV::MMKV(const string &mmapID, int size, MMKVMode mode, string *cryptKey, string *rootPath)
    : m_mmapID(mmapedKVKey(mmapID, rootPath)) // historically Android mistakenly use mmapKey as mmapID
    , m_path(mappedKVPathWithID(m_mmapID, mode, rootPath))
    , m_crcPath(crcPathWithID(m_mmapID, mode, rootPath))
    , m_dic(nullptr)
    , m_dicCrypt(nullptr)
    , m_file(new MemoryFile(m_path, size, (mode & MMKV_ASHMEM) ? MMFILE_TYPE_ASHMEM : MMFILE_TYPE_FILE))
    , m_metaFile(new MemoryFile(m_crcPath, DEFAULT_MMAP_SIZE, m_file->m_fileType))
    , m_metaInfo(new MMKVMetaInfo())
    , m_crypter(nullptr)
    , m_lock(new ThreadLock())
    , m_fileLock(new FileLock(m_metaFile->getFd(), (mode & MMKV_ASHMEM)))
    , m_sharedProcessLock(new InterProcessLock(m_fileLock, SharedLockType))
    , m_exclusiveProcessLock(new InterProcessLock(m_fileLock, ExclusiveLockType))
    , m_isInterProcess((mode & MMKV_MULTI_PROCESS) != 0 || (mode & CONTEXT_MODE_MULTI_PROCESS) != 0) {
    m_actualSize = 0;
    m_output = nullptr;

    // force use fcntl(), otherwise will conflict with MemoryFile::reloadFromFile()
    m_fileModeLock = new FileLock(m_file->getFd(), true);
    m_sharedProcessModeLock = new InterProcessLock(m_fileModeLock, SharedLockType);
    m_exclusiveProcessModeLock = nullptr;

#    ifndef MMKV_DISABLE_CRYPT
    if (cryptKey && cryptKey->length() > 0) {
        m_dicCrypt = new MMKVMapCrypt();
        m_crypter = new AESCrypt(cryptKey->data(), cryptKey->length());
    } else
#    endif
    {
        m_dic = new MMKVMap();
    }

    m_needLoadFromFile = true;
    m_hasFullWriteback = false;

    m_crcDigest = 0;

    m_sharedProcessLock->m_enable = m_isInterProcess;
    m_exclusiveProcessLock->m_enable = m_isInterProcess;

    // sensitive zone
    {
        SCOPED_LOCK(m_sharedProcessLock);
        loadFromFile();
    }
}

loadFromFile

void MMKV::loadFromFile() {
    if (!m_file->isFileValid()) {
        m_file->reloadFromFile();
    }
  
    auto ptr = (uint8_t *) m_file->getMemory();
  
        if (loadFromFile && m_actualSize > 0) {
            MMBuffer inputBuffer(ptr + Fixed32Size, m_actualSize, MMBufferNoCopy);
        
}

MemoryFile

reloadFromFile

void MemoryFile::reloadFromFile() {
      m_fd = open(m_name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRWXU);
  
        FileLock fileLock(m_fd);
        InterProcessLock lock(&fileLock, ExclusiveLockType);
        SCOPED_LOCK(&lock);

        mmkv::getFileSize(m_fd, m_size);
        // round up to (n * pagesize)
        if (m_size < DEFAULT_MMAP_SIZE || (m_size % DEFAULT_MMAP_SIZE != 0)) {
            size_t roundSize = ((m_size / DEFAULT_MMAP_SIZE) + 1) * DEFAULT_MMAP_SIZE;
            truncate(roundSize);
        } else {
            auto ret = mmap();
            if (!ret) {
                doCleanMemoryCache(true);
            }
        }
}

mmap

bool MemoryFile::mmap() {
    m_ptr = (char *) ::mmap(m_ptr, m_size, PROT_READ | PROT_WRITE, MAP_SHARED, m_fd, 0);
    if (m_ptr == MAP_FAILED) {
        MMKVError("fail to mmap [%s], %s", m_name.c_str(), strerror(errno));
        m_ptr = nullptr;
        return false;
    }

    return true;
}

getMemory

void *getMemory() { return m_ptr; }

FileLock

FileLock::FileLock(MMKVFileHandle_t fd, bool isAshmem)
    : m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0), m_isAshmem(isAshmem) {
    m_lockInfo.l_type = F_WRLCK;
    m_lockInfo.l_start = 0;
    m_lockInfo.l_whence = SEEK_SET;
    m_lockInfo.l_len = 0;
    m_lockInfo.l_pid = 0;
}

lock

bool FileLock::lock(LockType lockType) {
    return doLock(lockType, true);
}

doLock

bool FileLock::doLock(LockType lockType, bool wait, bool *tryAgain) {
    if (lockType == SharedLockType) {
        // don't want shared-lock to break any existing locks
        if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
            m_sharedLockCount++;
            return true;
        }
    } else {
        // don't want exclusive-lock to break existing exclusive-locks
        if (m_exclusiveLockCount > 0) {
            m_exclusiveLockCount++;
            return true;
        }
        // prevent deadlock
        if (m_sharedLockCount > 0) {
            unLockFirstIfNeeded = true;
        }
    }

    auto ret = platformLock(lockType, wait, unLockFirstIfNeeded, tryAgain);
    if (ret) {
        if (lockType == SharedLockType) {
            m_sharedLockCount++;
        } else {
            m_exclusiveLockCount++;
        }
    }
    return ret;
}

unlock

bool FileLock::unlock(LockType lockType) {
    if (lockType == SharedLockType) {
        if (m_sharedLockCount == 0) {
            return false;
        }
        // don't want shared-lock to break any existing locks
        if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
            m_sharedLockCount--;
            return true;
        }
    } else {
        if (m_exclusiveLockCount == 0) {
            return false;
        }
        if (m_exclusiveLockCount > 1) {
            m_exclusiveLockCount--;
            return true;
        }
        // restore shared-lock when all exclusive-locks are done
        if (m_sharedLockCount > 0) {
            unlockToSharedLock = true;
        }
    }

    auto ret = platformUnLock(unlockToSharedLock);
    if (ret) {
        if (lockType == SharedLockType) {
            m_sharedLockCount--;
        } else {
            m_exclusiveLockCount--;
        }
    }
    return ret;
}

platformLock

bool FileLock::platformLock(LockType lockType, bool wait, bool unLockFirstIfNeeded, bool *tryAgain) {
    if (m_isAshmem) {
        return ashmemLock(lockType, wait, unLockFirstIfNeeded, tryAgain);
    }
    auto realLockType = LockType2FlockType(lockType);
    auto cmd = wait ? realLockType : (realLockType | LOCK_NB);
    if (unLockFirstIfNeeded) {
        // try lock
        auto ret = flock(m_fd, realLockType | LOCK_NB);
        if (ret == 0) {
            return true;
        }
        // let's be gentleman: unlock my shared-lock to prevent deadlock
        ret = flock(m_fd, LOCK_UN);
        if (ret != 0) {
            MMKVError("fail to try unlock first fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
        }
    }
  auto ret = flock(m_fd, cmd);
  ......
}

platformUnLock

bool FileLock::platformUnLock(bool unlockToSharedLock) {
    if (m_isAshmem) {
        return ashmemUnLock(unlockToSharedLock);
    }
    int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
    auto ret = flock(m_fd, cmd);
}

InterProcessLock

InterProcessLock(FileLock *fileLock, LockType lockType)
    : m_fileLock(fileLock), m_lockType(lockType), m_enable(true) {
    MMKV_ASSERT(m_fileLock);
}

lock

void lock() {
    if (m_enable) {
        m_fileLock->lock(m_lockType);
    }
}

unlock

void unlock() {
    if (m_enable) {
        m_fileLock->unlock(m_lockType);
    }
}

ScopedLock

explicit ScopedLock(T *oLock) : m_lock(oLock) {
    MMKV_ASSERT(m_lock);
    lock();
}

    ~ScopedLock() {
        unlock();
        m_lock = nullptr;
    }

lock unLock

template <typename T>
class ScopedLock {
    T *m_lock;

    void lock() {
        if (m_lock) {
            m_lock->lock();
        }
    }

    void unlock() {
        if (m_lock) {
            m_lock->unlock();
        }
    }
}

ThreadLock.cpp

ThreadOnce

pthread_once用来确保在C++下多线程并发时,callback只调用一次,可用于C++中的单例模式

pthread_once实现简析

void ThreadLock::ThreadOnce(ThreadOnceToken_t *onceToken, void (*callback)()) {
    pthread_once(onceToken, callback);
}