https://github.com/Tencent/MMKV/wiki/android_ipc
https://github.com/Tencent/MMKV/wiki/design
MMKV 本质上是将文件 mmap 到内存块中,将新增的 key-value 统统 append 到内存中;到达边界后,进行重整回写以腾出空间,空间还是不够的话,就 double 内存空间;对于内存文件中可能存在的重复键值,MMKV 只选用最后写入的作为有效键值。
我们可以在每个进程内部缓存自己的写指针,然后在写入键值的同时,还要把最新的写指针位置也写到 mmap 内存中;这样每个进程只需要对比一下缓存的指针与 mmap 内存的写指针,如果不一样,就说明其他进程进行了写操作。事实上 MMKV 原本就在文件头部保存了有效内存的大小,这个数值刚好就是写指针的内存偏移量,我们可以重用这个数值来校对写指针。
考虑使用一个单调递增的序列号,每次发生内存重整,就将序列号递增。将这个序列号也放到 mmap 内存中,每个进程内部也缓存一份,只需要对比序列号是否一致,就能够知道其他进程是否触发了内存重整。
事实上 MMKV 在内存增长之前,会先尝试通过内存重整来腾出空间,重整后还不够空间才申请新的内存。所以内存增长可以跟内存重整一样处理。至于新的内存大小,可以通过查询文件大小来获得,无需在 mmap 内存另外存放。
到这里我们已经完成了数据的多进程同步工作,是时候回头处理锁事了,亦即前面提到的递归锁和锁升级/降级。
意思是如果一个进程/线程已经拥有了锁,那么后续的加锁操作不会导致卡死,并且解锁也不会导致外层的锁被解掉。对于文件锁来说,前者是满足的,后者则不然。因为文件锁是状态锁,没有计数器,无论加了多少次锁,一个解锁操作就全解掉。只要用到子函数,就非常需要递归锁。
锁升级是指将已经持有的共享锁,升级为互斥锁,亦即将读锁升级为写锁;锁降级则是反过来。文件锁支持锁升级,但是容易死锁:假如 A、B 进程都持有了读锁,现在都想升级到写锁,就会陷入相互等待的困境,发生死锁。另外,由于文件锁不支持递归锁,也导致了锁降级无法进行,一降就降到没有锁。
为了解决递归锁和锁升级/降级这两个难题,需要对文件锁(系统调用flock)进行封装,增加读锁、写锁计数器。处理逻辑如下表:
| 读锁计数器 | 写锁计数器 | 加读锁 | 加写锁 | 解读锁 | 解写锁 |
|---|---|---|---|---|---|
| 0 | 0 | 加读锁 | 加写锁 | - | - |
| 0 | 1 | +1 | +1 | - | 解写锁 |
| 0 | N | +1 | +1 | - | -1 |
| 1 | 0 | +1 | 解读锁再加写锁 | 解读锁 | - |
| 1 | 1 | +1 | +1 | -1 | 加读锁 |
| 1 | N | +1 | +1 | -1 | -1 |
| N | 0 | +1 | 解读锁再加写锁 | -1 | - |
| N | 1 | +1 | +1 | -1 | 加读锁 |
| N | N | +1 | +1 | -1 | -1 |
需要注意的地方有两点:
graph LR
加读锁-->直接加
加写锁-->首次加写锁,如果有读锁,需要先unlock,防止死锁
加写锁-->非首次加写锁,直接+1
解读锁-->直接解
解写锁-->写锁数量大于1直接-1
解写锁-->写锁数量为1,如果有读锁,加读锁锁降级,避免解写锁时读锁也解了,锁失效
public static String initialize(String rootDir, LibLoader loader, MMKVLogLevel logLevel) {
if (loader != null) {
if (BuildConfig.FLAVOR.equals("SharedCpp")) {
loader.loadLibrary("c++_shared");
}
loader.loadLibrary("mmkv");
} else {
if (BuildConfig.FLAVOR.equals("SharedCpp")) {
System.loadLibrary("c++_shared");
}
System.loadLibrary("mmkv");
}
jniInitialize(rootDir, logLevel2Int(logLevel));
MMKV.rootDir = rootDir;
return MMKV.rootDir;
}
// content change notification of other process
// trigger by getXXX() or setXXX() or checkContentChangedByOuterProcess()
private static MMKVContentChangeNotification gContentChangeNotify;
public static void registerContentChangeNotify(MMKVContentChangeNotification notify) {
gContentChangeNotify = notify;
setWantsContentChangeNotify(gContentChangeNotify != null);
}
private static void onContentChangedByOuterProcess(String mmapID) {
if (gContentChangeNotify != null) {
gContentChangeNotify.onContentChangedByOuterProcess(mmapID);
}
}
public static MMKV defaultMMKV() {
long handle = getDefaultMMKV(SINGLE_PROCESS_MODE, null);
return checkProcessMode(handle, "DefaultMMKV", SINGLE_PROCESS_MODE);
}
extern "C" JNIEXPORT JNICALL jint JNI_OnLoad(JavaVM *vm, void *reserved) {
g_currentJVM = vm;
JNIEnv *env;
if (vm->GetEnv(reinterpret_cast<void **>(&env), JNI_VERSION_1_6) != JNI_OK) {
return -1;
}
static const char *clsName = "com/tencent/mmkv/MMKV";
jclass instance = env->FindClass(clsName);
g_cls = reinterpret_cast<jclass>(env->NewGlobalRef(instance));
int ret = registerNativeMethods(env, g_cls);
return JNI_VERSION_1_6;
}
static int registerNativeMethods(JNIEnv *env, jclass cls) {
return env->RegisterNatives(cls, g_methods, sizeof(g_methods) / sizeof(g_methods[0]));
}
MMKV_JNI void jniInitialize(JNIEnv *env, jobject obj, jstring rootDir, jint logLevel) {
const char *kstr = env->GetStringUTFChars(rootDir, nullptr);
if (kstr) {
MMKV::initializeMMKV(kstr, (MMKVLogLevel) logLevel);
env->ReleaseStringUTFChars(rootDir, kstr);
}
}
MMKV_JNI void setWantsContentChangeNotify(JNIEnv *env, jclass type, jboolean notify) {
if (notify == JNI_TRUE) {
MMKV::registerContentChangeHandler(onContentChangedByOuterProcess);
} else {
MMKV::unRegisterContentChangeHandler();
}
}
static void onContentChangedByOuterProcess(const std::string &mmapID) {
auto currentEnv = getCurrentEnv();
if (currentEnv && g_callbackOnContentChange) {
jstring str = string2jstring(currentEnv, mmapID);
currentEnv->CallStaticVoidMethod(g_cls, g_callbackOnContentChange, str);
}
}
MMKV_JNI jlong getDefaultMMKV(JNIEnv *env, jobject obj, jint mode, jstring cryptKey) {
MMKV *kv = nullptr;
if (cryptKey) {
string crypt = jstring2string(env, cryptKey);
if (crypt.length() > 0) {
kv = MMKV::defaultMMKV((MMKVMode) mode, &crypt);
}
}
if (!kv) {
kv = MMKV::defaultMMKV((MMKVMode) mode, nullptr);
}
return (jlong) kv;
}
MMKV_JNI jlong getMMKVWithID(JNIEnv *env, jobject, jstring mmapID, jint mode, jstring cryptKey, jstring rootPath) {
string str = jstring2string(env, mmapID);
kv = MMKV::mmkvWithID(str, DEFAULT_MMAP_SIZE, (MMKVMode) mode, &crypt, &path);
return (jlong) kv;
}
void MMKV::initializeMMKV(const MMKVPath_t &rootDir, MMKVLogLevel logLevel) {
g_currentLogLevel = logLevel;
ThreadLock::ThreadOnce(&once_control, initialize);
g_rootDir = rootDir;
mkPath(g_rootDir);
}
void initialize() {
g_instanceDic = new unordered_map<string, MMKV *>;
g_instanceLock = new ThreadLock();
g_instanceLock->initialize();
mmkv::DEFAULT_MMAP_SIZE = mmkv::getPageSize();
}
void MMKV::registerContentChangeHandler(mmkv::ContentChangeHandler handler) {
g_contentChangeHandler = handler;
}
MMKV *MMKV::defaultMMKV(MMKVMode mode, string *cryptKey) {
#ifndef MMKV_ANDROID
return mmkvWithID(DEFAULT_MMAP_ID, mode, cryptKey);
#else
return mmkvWithID(DEFAULT_MMAP_ID, DEFAULT_MMAP_SIZE, mode, cryptKey);
#endif
}
MMKV *MMKV::mmkvWithID(const string &mmapID, int size, MMKVMode mode, string *cryptKey, string *rootPath) {
SCOPED_LOCK(g_instanceLock);
auto mmapKey = mmapedKVKey(mmapID, rootPath);
auto itr = g_instanceDic->find(mmapKey);
if (itr != g_instanceDic->end()) {
MMKV *kv = itr->second;
return kv;
}
auto kv = new MMKV(mmapID, size, mode, cryptKey, rootPath);
(*g_instanceDic)[mmapKey] = kv;
return kv;
}
MMKV::MMKV(const string &mmapID, int size, MMKVMode mode, string *cryptKey, string *rootPath)
: m_mmapID(mmapedKVKey(mmapID, rootPath)) // historically Android mistakenly use mmapKey as mmapID
, m_path(mappedKVPathWithID(m_mmapID, mode, rootPath))
, m_crcPath(crcPathWithID(m_mmapID, mode, rootPath))
, m_dic(nullptr)
, m_dicCrypt(nullptr)
, m_file(new MemoryFile(m_path, size, (mode & MMKV_ASHMEM) ? MMFILE_TYPE_ASHMEM : MMFILE_TYPE_FILE))
, m_metaFile(new MemoryFile(m_crcPath, DEFAULT_MMAP_SIZE, m_file->m_fileType))
, m_metaInfo(new MMKVMetaInfo())
, m_crypter(nullptr)
, m_lock(new ThreadLock())
, m_fileLock(new FileLock(m_metaFile->getFd(), (mode & MMKV_ASHMEM)))
, m_sharedProcessLock(new InterProcessLock(m_fileLock, SharedLockType))
, m_exclusiveProcessLock(new InterProcessLock(m_fileLock, ExclusiveLockType))
, m_isInterProcess((mode & MMKV_MULTI_PROCESS) != 0 || (mode & CONTEXT_MODE_MULTI_PROCESS) != 0) {
m_actualSize = 0;
m_output = nullptr;
// force use fcntl(), otherwise will conflict with MemoryFile::reloadFromFile()
m_fileModeLock = new FileLock(m_file->getFd(), true);
m_sharedProcessModeLock = new InterProcessLock(m_fileModeLock, SharedLockType);
m_exclusiveProcessModeLock = nullptr;
# ifndef MMKV_DISABLE_CRYPT
if (cryptKey && cryptKey->length() > 0) {
m_dicCrypt = new MMKVMapCrypt();
m_crypter = new AESCrypt(cryptKey->data(), cryptKey->length());
} else
# endif
{
m_dic = new MMKVMap();
}
m_needLoadFromFile = true;
m_hasFullWriteback = false;
m_crcDigest = 0;
m_sharedProcessLock->m_enable = m_isInterProcess;
m_exclusiveProcessLock->m_enable = m_isInterProcess;
// sensitive zone
{
SCOPED_LOCK(m_sharedProcessLock);
loadFromFile();
}
}
void MMKV::loadFromFile() {
if (!m_file->isFileValid()) {
m_file->reloadFromFile();
}
auto ptr = (uint8_t *) m_file->getMemory();
if (loadFromFile && m_actualSize > 0) {
MMBuffer inputBuffer(ptr + Fixed32Size, m_actualSize, MMBufferNoCopy);
}
void MemoryFile::reloadFromFile() {
m_fd = open(m_name.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, S_IRWXU);
FileLock fileLock(m_fd);
InterProcessLock lock(&fileLock, ExclusiveLockType);
SCOPED_LOCK(&lock);
mmkv::getFileSize(m_fd, m_size);
// round up to (n * pagesize)
if (m_size < DEFAULT_MMAP_SIZE || (m_size % DEFAULT_MMAP_SIZE != 0)) {
size_t roundSize = ((m_size / DEFAULT_MMAP_SIZE) + 1) * DEFAULT_MMAP_SIZE;
truncate(roundSize);
} else {
auto ret = mmap();
if (!ret) {
doCleanMemoryCache(true);
}
}
}
bool MemoryFile::mmap() {
m_ptr = (char *) ::mmap(m_ptr, m_size, PROT_READ | PROT_WRITE, MAP_SHARED, m_fd, 0);
if (m_ptr == MAP_FAILED) {
MMKVError("fail to mmap [%s], %s", m_name.c_str(), strerror(errno));
m_ptr = nullptr;
return false;
}
return true;
}
void *getMemory() { return m_ptr; }
FileLock::FileLock(MMKVFileHandle_t fd, bool isAshmem)
: m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0), m_isAshmem(isAshmem) {
m_lockInfo.l_type = F_WRLCK;
m_lockInfo.l_start = 0;
m_lockInfo.l_whence = SEEK_SET;
m_lockInfo.l_len = 0;
m_lockInfo.l_pid = 0;
}
bool FileLock::lock(LockType lockType) {
return doLock(lockType, true);
}
bool FileLock::doLock(LockType lockType, bool wait, bool *tryAgain) {
if (lockType == SharedLockType) {
// don't want shared-lock to break any existing locks
if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
m_sharedLockCount++;
return true;
}
} else {
// don't want exclusive-lock to break existing exclusive-locks
if (m_exclusiveLockCount > 0) {
m_exclusiveLockCount++;
return true;
}
// prevent deadlock
if (m_sharedLockCount > 0) {
unLockFirstIfNeeded = true;
}
}
auto ret = platformLock(lockType, wait, unLockFirstIfNeeded, tryAgain);
if (ret) {
if (lockType == SharedLockType) {
m_sharedLockCount++;
} else {
m_exclusiveLockCount++;
}
}
return ret;
}
bool FileLock::unlock(LockType lockType) {
if (lockType == SharedLockType) {
if (m_sharedLockCount == 0) {
return false;
}
// don't want shared-lock to break any existing locks
if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
m_sharedLockCount--;
return true;
}
} else {
if (m_exclusiveLockCount == 0) {
return false;
}
if (m_exclusiveLockCount > 1) {
m_exclusiveLockCount--;
return true;
}
// restore shared-lock when all exclusive-locks are done
if (m_sharedLockCount > 0) {
unlockToSharedLock = true;
}
}
auto ret = platformUnLock(unlockToSharedLock);
if (ret) {
if (lockType == SharedLockType) {
m_sharedLockCount--;
} else {
m_exclusiveLockCount--;
}
}
return ret;
}
bool FileLock::platformLock(LockType lockType, bool wait, bool unLockFirstIfNeeded, bool *tryAgain) {
if (m_isAshmem) {
return ashmemLock(lockType, wait, unLockFirstIfNeeded, tryAgain);
}
auto realLockType = LockType2FlockType(lockType);
auto cmd = wait ? realLockType : (realLockType | LOCK_NB);
if (unLockFirstIfNeeded) {
// try lock
auto ret = flock(m_fd, realLockType | LOCK_NB);
if (ret == 0) {
return true;
}
// let's be gentleman: unlock my shared-lock to prevent deadlock
ret = flock(m_fd, LOCK_UN);
if (ret != 0) {
MMKVError("fail to try unlock first fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
}
}
auto ret = flock(m_fd, cmd);
......
}
bool FileLock::platformUnLock(bool unlockToSharedLock) {
if (m_isAshmem) {
return ashmemUnLock(unlockToSharedLock);
}
int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
auto ret = flock(m_fd, cmd);
}
InterProcessLock(FileLock *fileLock, LockType lockType)
: m_fileLock(fileLock), m_lockType(lockType), m_enable(true) {
MMKV_ASSERT(m_fileLock);
}
void lock() {
if (m_enable) {
m_fileLock->lock(m_lockType);
}
}
void unlock() {
if (m_enable) {
m_fileLock->unlock(m_lockType);
}
}
explicit ScopedLock(T *oLock) : m_lock(oLock) {
MMKV_ASSERT(m_lock);
lock();
}
~ScopedLock() {
unlock();
m_lock = nullptr;
}
template <typename T>
class ScopedLock {
T *m_lock;
void lock() {
if (m_lock) {
m_lock->lock();
}
}
void unlock() {
if (m_lock) {
m_lock->unlock();
}
}
}
pthread_once用来确保在C++下多线程并发时,callback只调用一次,可用于C++中的单例模式
void ThreadLock::ThreadOnce(ThreadOnceToken_t *onceToken, void (*callback)()) {
pthread_once(onceToken, callback);
}