博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
memcached 并发原语CAS与GETS操作
阅读量:2488 次
发布时间:2019-05-11

本文共 1916 字,大约阅读时间需要 6 分钟。

   最近笔者自己的项目中,遇到了乐观锁的需求。但是redis没有这个操作,无奈,看了memcache天然的支持这种并发原语,即:GETS和CAS操作。因此准备打算继续使用REDIS,业务没有那么强的时序执行要求,因此可以使用没有CAS的算法在某种程度上解决。 

    我们为什么要使用这种并发原语呢?如果是单机版的,我们可以通过通过加锁同步就可以解决执行时序的问题。但是我们的应用是分布式的,无状态的应用服务器通过负载均衡,部署到了多台。加锁也解决不了多台服务器的时序执行。 


    如果不采用CAS,则有如下的情景: 

第一步,A取出数据对象X; 

第二步,B取出数据对象X; 

第三步,B修改数据对象X,并将其放入缓存; 

第四步,A修改数据对象X,并将其放入缓存。 

我们可以发现,第四步中会产生数据写入冲突。
 


    如果采用CAS协议,则是如下的情景。 

第一步,A取出数据对象X,并获取到CAS-ID1; 

第二步,B取出数据对象X,并获取到CAS-ID2; 

第三步,B修改数据对象X,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“一致”,就将修改后的带有CAS-ID2的X写入到缓存。 

第四步,A修改数据对象Y,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“不一致”,则拒绝写入,返回存储失败。 

我们可以通过重试,或者其他业务逻辑解决第四步设置失败的问题。
 


没有CAS的方案
 

一般使用redis用这个算法,可以从某种程度上保证时序性。 
SETNX key value 
将key 的值设为value ,当且仅当key 不存在。 
返回值: 
设置成功,返回1。 
设置失败,返回0。 

Java代码  
  1. if (memcache.get(key) == null) {  
  2.     // 3 min timeout to avoid mutex holder crash  
  3.     if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {  
  4.         value = db.get(key);  
  5.         memcache.set(key, value);  
  6.         memcache.delete(key_mutex);  
  7.     } else {  
  8.         sleep(50);  
  9.         retry();  
  10.     }  
  11. }  

最初的解决方案: 

利用memcached的add操作的原子性来控制并发,具体方式如下: 

1.申请锁:在校验是否创建过活动前,执行add操作key为key,如果add操作失败,则表示有另外的进程在并发的为该key创建活动,返回创建失败。否则表示无并发 

2.执行创建活动 

3.释放锁:创建活动完成后,执行delete操作,删除该key。 

问题: 

1.memcached中存放的值有有效期,即过期后自动失效,如add过M1后,M1失效,可以在此add成功 

2.即使通过配置,可以使memcached永久有效,即不设有效期,memcached有容量限制,当容量不够后会进行自动替换,即有可能add过M1后,M1被其他key值置换掉,则再次add可以成功。 

3.此外,memcached是基于内存的,掉电后数据会全部丢失,导致重启后所有memberId均可重新add。 

解决方案 

针对上述的几个问题,根本原因是add操作有时效性,过期,被替换,重启,都会是原来的add操作失效。解决该问题有方法 

1.减轻时效性的影响,使用memcached CAS(check and set)方式。 


使用CAS的方案
 

CAS的基本原理 

基本原理非常简单,一言以蔽之,就是“版本号”。每个存储的数据对象,多有一个版本号。我们可以从下面的例子来理解: 

Java代码  
  1. final GetsResponse<Object> response = memcachedClient.gets("key");  
  2. boolean flag = memcachedClient.cas("key"new CASOperation<Object>() {  
  3.     @Override  
  4.     public int getMaxTries() {  
  5.         //重试次数  
  6.         return 1;  
  7.     }  
  8.   
  9.     @Override  
  10.     public Object getNewValue(long currentCAS,  
  11.                                Object currentValue) {  
  12.         oldCAS = response.getCas();  
  13.         if (checkCAS()) {
    //比较CAS值  
  14.             value = db.get(key);  
  15.             return value;  
  16.         }else{  
  17.             //抛出异常,或者其他业务逻辑  
  18.         }  
  19.           
  20.     }  
  21. });  
  22. }  

转载地址:http://oekrb.baihongyu.com/

你可能感兴趣的文章
Django 源码阅读:服务启动(wsgi)
查看>>
Django 源码阅读:url解析
查看>>
Docker面试题(一)
查看>>
第一轮面试题
查看>>
2020-11-18
查看>>
Docker面试题(二)
查看>>
一、redis面试题及答案
查看>>
消息队列2
查看>>
C++ 线程同步之临界区CRITICAL_SECTION
查看>>
测试—自定义消息处理
查看>>
MFC中关于虚函数的一些问题
查看>>
根据图层名获取图层和图层序号
查看>>
规范性附录 属性值代码
查看>>
提取面狭长角
查看>>
Arcsde表空间自动增长
查看>>
Arcsde报ora-29861: 域索引标记为loading/failed/unusable错误
查看>>
记一次断电恢复ORA-01033错误
查看>>
C#修改JPG图片EXIF信息中的GPS信息
查看>>
从零开始的Docker ELK+Filebeat 6.4.0日志管理
查看>>
Sequelize的原始查询的时区问题
查看>>