在聊聊高并发(9)实现几种自旋锁(4)中实现的限时队列锁是1个基于链表的限时无界队列锁,它的tryLock方法支持限时操作和中断操作,无饥饿,保证了先来先服务的公平性,在多个同享状态上自旋,是低争用的。但是它的1个缺点是牺牲了空间,为了让线程可以屡次使用锁,每次Lock的时候都要new
QNode,并设置给线程,而不能重复使用原来的节点。
这篇说说限时有界队列锁,它采取了有界队列,并且和ArrayLock不同,它不限制线程的个数。它的特点主要有
1. 采取有界队列,减小了空间复杂度,L把锁的空间复杂度在最坏的情况下(有界队列长度为1)是O(L)
2. 非公平,不保证先来先服务,这也是1个很常见的需求
3. 由于是有界队列,所以在高并发下存在高争用,需要结合回退锁来下降争用
它的实现思路是:
1. 采取了1个有界的等待队列,等待队列的每一个节点都有多种状态,每一个节点是可复用的
2. 采取了1个工作队列,Tail指针指向工作队列的队尾节点。获得和是不是锁的操作是在工作队列中的节点之间进行
3. 由因而限时队列,并支持中断,所以队列中的节点都是可以退出队列的
4. 算法分为3步,第1步是线程从有界的等待队列中取得1个节点,并设置为WAITING,如果没有取得,就自旋
第2步是把这个节点加入工作队列,并取得前1个节点的指针
第3步是在前1个节点的状态上自旋,直到取得锁,并把前1个节点RELEASED状态改成FREE
节点有4种状态:
1. FREE: 表示节点可以被取得。当前1个节点释放锁,并设置状态为RELEASED的时候,后1个节点需要把前1个节点设置为FREE。当节点在没有进入工作队列时超时,也被设置为FREE.
2. RELEASED:节点释放锁时设置为RELEASED,需要后续节点把它设置为FREE。如果是工作队列的最后1个节点,那末RELEASED状态的节点在第1步时可被取得
3. WAITING:表示取得了锁或在工作队列中等待锁。是在第1步中被设置的,第1步的结果就是取得1个状态为WAITING的节点
4. ABORTED:工作队列中的节点超时或中断的节点被设置为ABORTED。 队尾的ABORTED节点可以被第1步取得,队中的ABORTED节点不能被第1步获得,只能把它的preNode指针指向它的前1个节点,表示它自己不能被获得了
理解节点这4种状态的转变是理解这个设计的关键。这个设计比较复杂,从篇幅斟酌,这篇只介绍Lock和UnLock操作,下1篇说tryLock限时操作
1. 创建枚举类型State来表示状态
2. 创建QNode表示节点,使用1个AtomicReference原子变量指向它的State,以便于支持CAS操作。节点保护1个PreNode援用,只有节点被Aborted的时候才设置这个援用的值,表示跳过这个节点
3. 1个有界的QNode队列,使用数组表示
4. MIN_BACKOFF和MAX_BACKOFF支持回退操作,单位是毫秒。这两个值依赖于硬件性能,需要通过不断测试来获得最优值
5. 1个Random随机数,来产生随即的数组下标,非公平性需要
6. 1个AtomicStampedReference类型的原子变量作为队尾指针tail。AtomicStampedReference采取了版本号来避免CAS操作的ABA问题。这很重要,由于有界等待队列的节点会屡次进出工作队列,所以可能产生同1个节点被前1个线程准备CAS操作时,已被后几个线程进出了工作队列,致使第1个线程拿到的QNode的状态不正确。
7. lock实现分为3步,上文已说过了
8. unlock操作就是两步,第1修改状态通知其他线程获得锁。第2是设置自己的节点援用,以便下次可再次取得锁而不影响其他线程的状态。这里是把线程指向的节点状态设置为RELEASED,同时设置线程的节点援用为空,这样其他线程可以继续使用这个节点。
package com.zc.lock;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 限时有界队列锁,并且直接不限数量的线程
* 由因而有界的队列,所以争用剧烈,可以复合回退锁的概念,减少高争用
* 分为3步:
* 第1步是获得1个State为FREE的节点,设置为WAITING
* 第2步是把这个节点加入队列,获得前1个节点
* 第3步是在前1个节点上自旋
*
* 优点是L个锁的空间复杂度是O(L),而限时无界队列锁的空间复杂度为O(Ln)
* **/
public class CompositeLock implements TryLock{
enum State {FREE, WAITING, RELEASED, ABORTED}
class QNode{
AtomicReference<State> state = new AtomicReference<CompositeLock.State>(State.FREE);
volatile QNode preNode;
}
private final int SIZE = 10;
private final int MIN_BACKOFF = 1;
private final int MAX_BACKOFF = 10;
private Random random = new Random();
// 有界的QNode数组,表示队列总共可使用的节点数
private QNode[] waitings = new QNode[10];
// 指向队尾节点,使用AtomicStampedReference带版本号的原子援用变量,可以避免ABA问题,由于这个算法实现需要对同1个Node屡次进出队列
private AtomicStampedReference<QNode> tail = new AtomicStampedReference<CompositeLock.QNode>(null, 0);
// 每一个线程保护1个QNode援用
private ThreadLocal<QNode> myNode = new ThreadLocal<CompositeLock.QNode>(){
public QNode initialValue(){
return null;
}
};
public CompositeLock(){
for(int i = 0; i < SIZE; i ++){
waitings[i] = new QNode();
}
}
@Override
public void lock() {
Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
QNode node = waitings[random.nextInt(SIZE)];
// 第1步: 先取得数组里的1个Node,并把它的状态设置为WAITING,否则就自旋
GETNODE:
while(true){
while(node.state.get() != State.FREE){
// 由于释放锁时只是设置了State为RELEASED,由后继的线程来设置RELEASED为FREE
// 如果该节点已是队尾节点了并且是RELEASED,那末可以直接可以被使用
// 获得当前原子援用变量的版本号
int[] currentStamp = new int[1];
QNode tailNode = tail.get(currentStamp);
if(tailNode == node && tailNode.state.get() == State.RELEASED){
if(tail.compareAndSet(tailNode, null, currentStamp[0], currentStamp[0] + 1)){
node.state.set(State.WAITING);
break GETNODE;
}
}
}
if(node.state.compareAndSet(State.FREE, State.WAITING)){
break;
}
try {
backoff.backoff();
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted, stop to get the lock");
}
}
// 第2步加入队列
int[] currentStamp = new int[1];
QNode preTailNode = null;
do{
preTailNode = tail.get(currentStamp);
}
// 如果没加入队列,就1直自旋
while(!tail.compareAndSet(preTailNode, node, currentStamp[0], currentStamp[0] + 1));
// 第3步在前1个节点自旋,如果前1个节点为null,证明是第1个加入队列的节点
if(preTailNode != null){
// 在前1个节点的状态自旋
while(preTailNode.state.get() != State.RELEASED){}
// 设置前1个节点的状态为FREE,可以被其他线程使用
preTailNode.state.set(State.FREE);
}
// 将线程的myNode指向取得锁的node
myNode.set(node);
return;
}
@Override
public void unlock() {
QNode node = myNode.get();
node.state.set(State.RELEASED);
myNode.set(null);
}
@Override
public boolean trylock(long time, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
}
采取我们之前的验证锁正确性的测试用例来测试lock, unlock操作。
package com.zc.lock;
public class Main {
//private static Lock lock = new TimeCost(new ArrayLock(150));
private static Lock lock = new CompositeLock();
//private static TimeCost timeCost = new TimeCost(new TTASLock());
private static volatile int value = 0;
public static void method(){
lock.lock();
System.out.println("Value: " + ++value);
lock.unlock();
}
public static void main(String[] args) {
for(int i = 0; i < 50; i ++){
Thread t = new Thread(new Runnable(){
@Override
public void run() {
method();
}
});
t.start();
}
}
}
结果是顺序打印的,证明锁是正确的,每次只有1个线程取得了锁
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 6
Value: 7
Value: 8
Value: 9
Value: 10
Value: 11
Value: 12
Value: 13
Value: 14
Value: 15
Value: 16
Value: 17
Value: 18
Value: 19
Value: 20
Value: 21
Value: 22
Value: 23
Value: 24
Value: 25
Value: 26
Value: 27
Value: 28
Value: 29
Value: 30
Value: 31
Value: 32
Value: 33
Value: 34
Value: 35
Value: 36
Value: 37
Value: 38
Value: 39
Value: 40
Value: 41
Value: 42
Value: 43
Value: 44
Value: 45
Value: 46
Value: 47
Value: 48
Value: 49