NightPxy 个人技术博客

并发编程中的线程安全技巧

Posted on By NightPxy

概述

这篇不谈原理,只简单总结下关于线程安全的一些技巧
保持线程安全的方式无外乎以下几种方式

  • 不可变
  • 线程资源隔离
  • 带volatile的原子操作
  • 乐观锁(CAS)
  • 悲观锁

锁(悲观锁)机制,是一种强破坏并行操作的方式.通过强行转串行来防止并行执行中的资源争夺问题
所以对性能影响较大
分段锁

源码中的线程安全技巧

不可变

线程资源隔离

线程资源隔离是线程安全手段的首先,因为这意味无锁(彻底消灭串行),也就意味着能够得到最大的并行性能

chm的无锁并发扩容

chm(ConcurrentHashMap)1.8中,其HashMap的扩容是并发扩容的,这是一个非常了不起的设计
数组型(哈希的本质是数组)最大的缺陷就是扩容,因为数组型要求连续空间,这是优势也是劣势
如果能并发扩容,至少扩容暂停时间会减少很多

chm的无锁并发扩容的核心就是线程资源隔离
简单来说就是多线程中,每个线程只操作自己的数据块
chm的设计是

  • 用transferIndex等于扩容前的初始长度,视为总任务数
    每一个参与线程用CAS从这个总量数字中截下一段视为自己的任务数,每个线程负责就处理下标这个范围槽位负责迁移扩容到新容器中
    通过这样完成扩容资源分配,线程间就可以无锁的并行扩容了
  • 既然是无锁状态,也就意味着chm在扩容未完成之前也必须保持安全访问
    在这个扩容开始且未完成的中间状态,引入了一个中间节点概念ForwardingNode
    ForwardingNode是旧容器和新容器的桥梁,任意时刻扩容节点要么在旧容器中要么在新容器中
    所以扩容操作为原槽位放入新容器,旧容器内同时放入一个用ForwardingNode指向的桥梁
    这样任意未扩容成功前访问,通过旧容器始终可以读取目标槽位完成定位
  • 线程并行,彼此是独立的,带来的问题是即无法预知其它扩容执行结果(是否完成与是否成功)
    chm的解决方法是每一个执行完毕的线程都会检测自己是否是最后一个线程
    最后一个线程负责校验是否全部都已扩容完成,如果没有就完成剩余的全部工作
    如果最后一个线程失败了,就此放弃.而下一个使用chm的线程进来如果检测到依然处于扩容状态会继续参与扩容,它会继续之后的工作,以此类推直到迁移完成

RingBuffer

volatile

volatile本身是保证有序性和可见性的,只是不保证原子性
如果volatile修饰下本身执行的是原子操作,那就是同时保证有序,可见,原子的安全操作

什么操作下是原子的呢.最常见的就是赋值

volatile本身有相当多需要注意的地方

  • double,Long的赋值操作不是原子操作(2次32位操作)
    但是volatile下double和long是原子的(Hotspot)
    这是某些虚拟机为了方便使用特意加上的规则(必须在volatile修饰下才有这个特性)
  • volatile修饰下的引用对象属性不带有volatile特性(数组同理)
  • volatile修饰的引用不具有传递性
    比如在方法栈内写object a = volatileObject,通过a引用使用将失去volatile特性
    但是如果final a = volatileObject,则可以继续持有volatile特性(final本身有栅栏)
  • volatile必须注意伪共享,也就是注意补位

补位方式一

public final static class VolatileLong {
	public volatile long value = 0L;
	public long p1, p2, p3, p4, p5, p6;    
}

补位方式二

@sun.misc.Contended
public class VolatileLong {
    public volatile long value = 0L;
}

这两种补位方式各有优劣

  • 第一种是固定补64字节
    但事实上不能保证每个操作系统下的缓存行大小都是64字节(绝大部分是)
  • 第二种由虚拟机负责补全(内置动态识别缓存行)
    缺点是需要jvm启动时设置-XX:-RestrictContended才能真正生效

CAS

CAS是线程安全的次选
CAS是本身算是一种轻量锁实现,往往也视为无锁(无内核锁)
它的优势是没有线程上下文切换(频繁的线程上下文切换可以对性能造成数量级级别的下降)
它的劣势是如果冲突剧烈,会大量消耗CPU资源(CAS往往是以自旋活锁的形式使用)

大部分情况下,使用CAS更需要关注的冲突耗时而不是冲突几率()
如果冲突耗时极短(一两个CPU指令),CAS的性能都是非常不错的,实际情况中,一般根本达不到会在一两个指令间产生大量的冲突
如果冲突耗时本身很长,那就值得注意了,因为在这个过程中会因为不断的变更失败而消耗大量的资源

所以自旋CAS是一种必须谨慎使用的手段

  • 确认冲突耗时极短
  • 最好带上自旋超时时间,将自旋放在可控的程度上
//这是chm中初始化的一段
private final Node<K,V>[] initTable() {
	Node<K,V>[] tab; int sc;
	while ((tab = table) == null || tab.length == 0) {
	    // 注意这里的自旋失败是 Thread.yield()
	    // 因为初始过程是一个相对漫长的过程
	    // 所以没有用普通自旋,而是强制线程切换等待下次调度来避免长时自旋
		if ((sc = sizeCtl) < 0)
			Thread.yield(); 
		// 这是CAS的一个非常常用的手段  
		// 并发安全的保证有且只有一个线程执行某个逻辑
		else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
			......
		}
	}

数组CAS

数组CAS也是一个非常有用的手段(也可用在哈希中,哈希的本质是数组)

//CAS写入数组与CASE写入对象某一个属性没有区别 (数组可以视为N个字段的对象)  
//只需找出某个对应下标的内存偏移量就可以了
Class<?> ak = Node[].class;
//数字元素的起始偏移量
ABASE = U.arrayBaseOffset(ak);
//数组元素的元素大小
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
	throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
    //第N个元素 就是数组的起始偏移量+元素大小*N 就是了
    //chm中使用了更有技巧的写法,就是做位运算得到  
	return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    //获取对象同理
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
}

链表双向队列CAS

链表队列CAS也是非常有用的,一个无锁的线程安全链表队列是非常有用的
链表CAS实现要复杂的多

  • 链表一般为双向两边,也就是有前驱后驱两个指针
  • 链表本身一般还有HeadTail两个哨兵指针
    CAS的特性是只能保证某一步的线程安全,链表中至少有四步

ConcurrentLinkedDeque(cld)的设计

  • 首先是抛弃两个哨兵指针的完全同步(允许哨兵指向延时)
    Head不一定指向第一个元素,Tail不一定指向最后一个元素,改用另外的方式识别头尾
    Head往前中前驱为指向自己的节点
    Tail往后中后驱指向自己的节点
    通过这种方式,就可以在哨兵指针出现延时依然能正常使用了
    这种方式最大的问题是,哨兵指针如果指向中段了,就以为每一次使用头尾都意味着遍历半个链表了
    所以还要一个规则,即不允许头尾离开真正的头尾节点超过两个节点,这样最多前(后)驱一次,就可以找到真正的头
    这个保证是延时的(已经保证在任何情况下都能正确找到),也就是在任一个找寻过程中如果发现规则不匹配就会纠正
  • 解决了哨兵指针,还有前驱后驱两个指针.这是链表的基石,不能运行延时的
    这时就靠队列特性,队列的特性是不允许中段插入.只能头(尾)插入(假设这个队列同时提供栈功能)
    也就是任意新插入的节点,前驱或者后驱只有一个是有值的
    如果是头插入,则前驱为执行自己,后驱指向头,然后CAS写入当前头的前驱就行
    如果是尾插入,则前驱指向尾,后驱指向自己,只需要CAS写入当前尾的后驱就行
  • 节点删除
    写入时,根据队列特性,其实只需要保证单边就行了.但是删除必须是前驱和后驱都需要保证的
    这时cld的方法是引入逻辑删除(item为null)表示已经被逻辑删除的节点了
    两个节点的前驱后驱呢,分开解决.引入逻辑删除就是在这个分开过程中保持业务完整
    以删除头节点为例
    首先将当前头的Item用CAS置null,从业务上删除它
    其次将当前头(期望被删那个)的后节点的前驱用CAS指向自己(它成为了真正的头,同时调整哨兵)
    最后将真正被删的头的后驱设null,此时被删的头已经彻底失去引用,可以被GC了

// 节点定义
// 给出CAS写进前驱(后驱)的方法  
static final class Node<E> {
	volatile Node<E> prev;
	volatile E item;
	volatile Node<E> next;

	Node() { 
	}

	/**
	 * Constructs a new node.  Uses relaxed write because item can
	 * only be seen after publication via casNext or casPrev.
	 */
	Node(E item) {
		UNSAFE.putObject(this, itemOffset, item);
	}

	boolean casItem(E cmp, E val) {
		return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
	}

	void lazySetNext(Node<E> val) {
		UNSAFE.putOrderedObject(this, nextOffset, val);
	}

	boolean casNext(Node<E> cmp, Node<E> val) {
		return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
	}

	void lazySetPrev(Node<E> val) {
		UNSAFE.putOrderedObject(this, prevOffset, val);
	}

	boolean casPrev(Node<E> cmp, Node<E> val) {
		return UNSAFE.compareAndSwapObject(this, prevOffset, cmp, val);
	}
	......
	//代码太多了
}

悲观锁

悲观锁就是常规理解中的锁(synchronize 或 Lock 这里不谈两者区别与优缺点)
锁的本质是破坏并行,类似于为了预防吃饭被噎死所以强制不准吃饭一样,锁对并行性能的伤害是巨大的

分段锁

因为锁的破坏力巨大,使用务必慎重,一个常规优化方案就是分段锁
分段锁的核心是控制串行范围,也就是部分串行的意思

chm中的槽位线程安全就是用的分段锁

这里对比下HashTable的线程安全方式,是一个全局锁.它的意思是无论添加修改删除容器内的任何元素统统串行

chm的做法是以槽位为单位控制串行,也就是只有哈希冲突(HashCode一致但对象不是同一个)的对象存取之间才保持串行,哈希码不同的对象之间是完全并行的
这样的性能提升是非常明显的(chm的哈希设计,让哈希冲突的实际发生率非常低)

final V putVal(K key, V value, boolean onlyIfAbsent) {
	if (key == null || value == null) throw new NullPointerException();
	int hash = spread(key.hashCode());
	int binCount = 0;
	for (Node<K,V>[] tab = table;;) {
		Node<K,V> f; int n, i, fh;
		if (tab == null || (n = tab.length) == 0)
			tab = initTable();
		else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
			if (casTabAt(tab, i, null,
						 new Node<K,V>(hash, key, value, null)))
				break;                   // no lock when adding to empty bin
		}
		else if ((fh = f.hash) == MOVED)
			tab = helpTransfer(tab, f);
		else {
			V oldVal = null;
			//以槽位为单位锁而不是全局锁
			//但是槽位为空怎么办呢?(Null槽可锁不起来) 这里有一个技巧性的做法  
			//如果槽位为空,先用CAS把槽位补上,之后再以这个槽位控制元素进出
			synchronized (f) {
				.......
			}
		}
	}