一、分布式全局锁
利用Redis实现多服务器多进程环境下的分布式全局锁
首先调用 INCR 并检测返回值,如果等于1就表示获得了锁
然后EXPIRE设置此Key的过期时间,
然后开始进行操作,
当操作完成后DEL删除这个KEY
伪代码如下
- if( INCR( 'EXCLUSION_COUNT' ) == 1 )
- {
- EXPIRE(60) // set ttl for 1 minute
-
- // DO some work
-
- DEL( 'EXCLUSION_COUNT' )}
- }
这里的 EXPIRE(60) 表示我们假定接下来的操作一定会在1分钟内完成。需要根据实际情况调整这个值.如果当前获得了锁的进程或者机器在执行过程中崩溃了,其它进程或者机器也能在1分钟后重新获得锁执行。
如果处理的过程是一个不确定执行时间的过程,可以每隔一段时间renew一下这个KEY, 比如
- if( INCR( 'EXCLUSION_COUNT' ) == 1 )
- {
-
-
- for(...){
- EXPIRE(60) // set ttl for 1 minute
-
- // DO some work
- }
-
- DEL( 'EXCLUSION_COUNT' )
- }
二基于redis的分布式锁
核心类
using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using BookSleeve;
namespace ViewAlloc.Threading
{
public class RedisBillLockHandler
{
private const int DEFAULT_SINGLE_EXPIRE_TIME = 10;
private static readonly DateTime DEFAULT_DATETIME = new DateTime(1970, 1, 1);
private const int DEFAULT_DB = 0;
private readonly RedisConnection client;
/// <summary>
/// 构造
/// </summary>
/// <param name="client"></param>
public RedisBillLockHandler(RedisConnection client)
{
this.client = client;
}
/// <summary>
/// 获取锁
/// 如果锁可用立即返回true,否则返回false
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool TryLock(String key)
{
return TryLock(key, 0L);
}
/// <summary>
/// 锁在给定的等待时间内空闲,则获取锁成功 返回true, 否则返回false
/// </summary>
/// <param name="key"></param>
/// <param name="timeout"></param>
/// <returns></returns>
public bool TryLock(String key, long timeout)
{
try
{
Stopwatch watch = Stopwatch.StartNew();
do
{
long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
long timestamp = tt + DEFAULT_SINGLE_EXPIRE_TIME + 1;
var tran = client.CreateTransaction();
var taskSetIfNotExists = tran.Strings.SetIfNotExists(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
var taskGet = tran.Strings.Get(DEFAULT_DB, key);
tran.Execute().Wait();
tran.Dispose();
if (taskSetIfNotExists.Result == true)
{
return true;
}
else
{
long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
if (tt > ex)
{
var taskGetSet = client.Strings.GetSet(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
long old = long.Parse(Encoding.UTF8.GetString(taskGetSet.Result));
if (ex == old)
{
return true;
}
}
}
if (timeout == 0)
{
break;
}
Thread.Sleep(300);
} while (watch.ElapsedMilliseconds < timeout * 1000);
return false;
}
catch (Exception exc)
{
throw new RedisBillLockException(exc.Message, exc);
}
}
/// <summary>
/// 如果锁空闲立即返回
/// 获取失败一直等待
/// </summary>
/// <param name="key"></param>
public void Lock(String key)
{
try
{
do
{
long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
long timestamp = tt + DEFAULT_SINGLE_EXPIRE_TIME + 1;
var tran = client.CreateTransaction();
var taskSetIfNotExists = tran.Strings.SetIfNotExists(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
var taskGet = tran.Strings.Get(DEFAULT_DB, key);
tran.Execute().Wait();
tran.Dispose();
if (taskSetIfNotExists.Result == true)
{
break;
}
else
{
long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
if (tt > ex)
{
var taskGetSet = client.Strings.GetSet(DEFAULT_DB, key, Encoding.UTF8.GetBytes(timestamp.ToString()));
long old = long.Parse(Encoding.UTF8.GetString(taskGetSet.Result));
if (ex == old)
{
break;
}
}
}
Thread.Sleep(300);
} while (true);
}
catch (Exception exc)
{
throw new RedisBillLockException(exc.Message, exc);
}
}
/// <summary>
/// 释放锁
/// </summary>
/// <param name="keys"></param>
public void UnLock(String key)
{
try
{
long tt = (long)(DateTime.Now - DEFAULT_DATETIME).TotalSeconds;
var taskGet = client.Strings.Get(DEFAULT_DB, key);
long ex = long.Parse(Encoding.UTF8.GetString(taskGet.Result));
if (tt < ex)
{
var taskRemove = client.Keys.Remove(DEFAULT_DB, key);
taskRemove.Wait();
}
}
catch (Exception exc)
{
throw new RedisBillLockException(exc.Message, exc);
}
}
}
}
为了不破坏原有的代码逻辑我又加了下面两个类
using System;
namespace ViewAlloc.Threading
{
/// <summary>
/// 分布式锁属性
/// </summary>
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Property)]
public class RedisBillLockAttribute : Attribute
{
public string Scheme { set; get; }
public string Key { set; get; }
public RedisBillLockAttribute(string scheme, string key)
{
this.Scheme = scheme;
this.Key = key;
}
}
}
#########################
using System;
using System.Reflection;
using System.Runtime.Remoting;
using System.Runtime.Remoting.Messaging;
namespace ViewAlloc.Threading
{
/// <summary>
/// 装饰类,不需要对每一个类进行封装了,性能可能会有一细细的影响
/// </summary>
public class RedisBillLockWrapper
{
public static T Wrap<T>(T target) where T : MarshalByRefObject
{
return new MyProxy(typeof(T), target).GetTransparentProxy() as T;
}
private class MyProxy : System.Runtime.Remoting.Proxies.RealProxy
{
public MyProxy(Type t, MarshalByRefObject target)
: base(t)
{
this.target = target;
}
public override IMessage Invoke(IMessage msg)
{
MethodBase method = (msg as IMethodMessage).MethodBase;
object[] atts = method.GetCustomAttributes(typeof(RedisBillLockAttribute), false);
bool locking = atts.Length == 1;
IMessage result = null;
if (locking)
{
RedisBillLockAttribute redisBillLockAttribute = atts[0] as RedisBillLockAttribute;
BookSleeve.RedisConnection client = new BookSleeve.RedisConnection(redisBillLockAttribute.Scheme);
client.Open();
try
{
RedisBillLockHandler lockHandler = new RedisBillLockHandler(client);
lockHandler.Lock(redisBillLockAttribute.Key);
try
{
result = RemotingServices.ExecuteMessage(target, msg as IMethodCallMessage);
}
finally
{
lockHandler.UnLock(redisBillLockAttribute.Key);
}
}
finally
{
client.Close(false);
}
}
else
{
result = RemotingServices.ExecuteMessage(target, msg as IMethodCallMessage);
}
return result;
}
private MarshalByRefObject target;
}
}
}
#########################
原先的业务逻辑类
class TestLock
{
public void Run()
{
Console.WriteLine("{0:yyyyMMddHHmmssfff}获取了锁", DateTime.Now);
Thread.Sleep(1000);
}
}
#########################
修改后的
class TestLock : MarshalByRefObject
{
[RedisBillLock("127.0.0.1", "viewalloc_lock_service_key_test")]
public void Run()
{
Console.WriteLine("{0:yyyyMMddHHmmssfff}获取了锁", DateTime.Now);
Thread.Sleep(1000);
}
}
#########################
调用
TestLock testLock = RedisBillLockWrapper.Wrap<TestLock>(new TestLock());
testLock.Run();