LockFree思想

LockFree思想LockFree思想0x01摘要近期看一些源码,会有一些注释是LockFree。这到底啥玩意儿?之前我也不知道啊,遂赶紧上网查之,总结了一些东西作为记录,与大家分享。0x02LockFree2.1LockFree概念先上一张神图:由上图可以看出,LockFree程序必须满足三个条件:多线程共享内存不能彼此阻塞(死锁)具体来说,如果一个程序是LockFree的,则在运…

大家好,又见面了,我是你们的朋友全栈君。

LockFree思想

0x01 摘要

近期看一些源码,会有一些注释是LockFree。这到底啥玩意儿?之前我也不知道啊,遂赶紧上网查之,总结了一些东西作为记录,与大家分享。

0x02 LockFree

2.1 LockFree概念

先上一张神图:
LockFree

由上图可以看出,LockFree程序必须满足三个条件:

  1. 多线程
  2. 共享内存
  3. 不能彼此阻塞(死锁)

具体来说,如果一个程序是LockFree的,则在运行足够长的一段时间内至少一个线程能取得进展。如果一个程序死锁,这个程序内的所有线程都没法取得进展了,这种情况就不能成为LockFree。在LockFree的程序中,如果一个线程被挂起,决不能影响其他线程继续执行,也就是说是非阻塞的。

因为LockFree能保证有一个线程能执行并取得进展,所以在代码不是无限循环的情况下可以保证所有线程最终能完成自己的工作。

java.util.concurrent.atomic包是LockFree思想实现的例子。

2.2 LockFree编程

2.2.1 简介

LockFree编程的特点:

  • LockFree 技术很容易被错误的使用,代码后期的维护中也不容易意识到,所以非常容易引入 Bug,而且这样的 Bug 还非常难定位。
  • LockFree 技术的细节上依赖于内存系统模型、编译器优化、CPU架构等,而这在使用 Lock 机制时是不相关的,所以也增加了理解和维护的难度。

LockFree编程

2.2.2 原子读改写指令(atomic RMW)

原子操作不可再分割。

RMW的设计可让执行更复杂的事务操作变成原子的,使得多个写入者尝试对相同内存区域进行修改时,保证一次只能执行一个操作。

在x86/64的CPU架构中,通过CAS方式实现。其他架构各有不同实现方式。

2.2.3 ABA问题

上面提到了CAS,那么就会有ABA问题。
详细内容可以看我之前写的一篇文章:Java-并发-CAS中的ABA章节部分。

2.2.4 顺序一致性模型

顺序一致性内存模型有两大特性

  1. 一个线程内部的所有操作必须按照程序的顺序来执行
  2. (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。

2.2.5 代码示例

以Stack为例子,实现一个LockFree的Stack。

这个例子中使用单链表实现栈,有两个方法:

  • push:栈顶方向压入元素
  • pop:栈顶弹出一个元素
2.2.5.1 非LockFree版本
public class SimpleStack<T>
{ 
   
    private class Node<TNode>
    { 
   
        public Node<TNode> next;
        public TNode item;
        @Override
        public String toString()
        { 
   
            return item.toString();
        }
    }

    private Node<T> head;

    public SimpleStack()
    { 
   
        head = new Node<T>();
    }

    /** * 头插法 * @param item */
    public void push(T item)
    { 
   
        Node<T> node = new Node<T>();
        node.item = item;
        node.next = head.next;
        head.next = node;
    }

    /** * 删除栈顶元素并返回该元素 * 没有元素是返回null * @return */
    public T pop()
    { 
   
        Node<T> node = head.next;
        if (node == null)
            return null;
        head.next = node.next;
        return node.item;
    }
}

测试代码如下:

// push1000个元素,然后再多线程中pop元素观察结果
public class SimpleStackTest
{ 
   
    private static final Logger logger = LoggerFactory.getLogger(SimpleStackTest.class);

    public static void main(String[] args)
    { 
   
        SimpleStack<Integer> stack = new SimpleStack<Integer>();

        for (int i = 1; i <= 1000; i++)
        { 
   
            stack.push(i);
        }

        boolean[] poppedItems = new boolean[1001];
        BlockingQueue workQueue = new LinkedBlockingDeque<>();

        ExecutorService executorService = new ThreadPoolExecutor(5,10,1, TimeUnit.MINUTES,workQueue, new ThreadPoolExecutor.DiscardPolicy());

        AtomicInteger count = new AtomicInteger();
        for(int i = 0 ; i < 10 ; i++){ 
   
            executorService.submit(() -> { 
   
                for (int j = 0; j < 100; j++) { 
   
                    count.incrementAndGet();
                    int item = stack.pop();
                    if (poppedItems[item] == true) { 
   
                        logger.error("Thread {}: Item {} was popped before!", Thread.currentThread().getName(), item);
                    }
                    else { 
   
                        poppedItems[item] = true;
                    }
                }
            });
        }

        try { 
   
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) { 
   
            logger.error("an exception happened:", e);
        }
        executorService.shutdownNow();

        logger.info("Done, count={}" , count);
    }
}

运行结果如下:

2018-12-10 10:45:47.782 ERROR [pool-1-thread-1] demos.lockfree.nolockstackdemo.SimpleStackTest,41 - Thread pool-1-thread-1: Item 878 was popped before!
2018-12-10 10:45:52.786 INFO  [main] demos.lockfree.nolockstackdemo.SimpleStackTest,58 - Done, count=1000

也就是说出现了同一个元素被pop多次的情况!

2.2.5.2 同步锁版本

在push和pop方法上加入了synchronized修饰。

/** * 头插法 * @param item */
public synchronized void push(T item)
{ 
   
    Node<T> node = new Node<T>();
    node.item = item;
    node.next = head.next;
    head.next = node;
}

/** * 删除栈顶元素并返回该元素 * 没有元素是返回null * @return */
public synchronized T pop()
{ 
   
    Node<T> node = head.next;
    if (node == null)
        return null;
    head.next = node.next;
    return node.item;
}

运行结果正确:

2018-12-10 11:10:53.091 INFO  [main] demos.lockfree.simplelockstackdemo.SimpleLockStackTest,59 - Done, count=1000
2.2.5.3 LockFree版本

但是,以上采用的同步锁方式在高并发场景下,大量出现线程竞争的情况,效率并不高。

下面采用lockfree思想,即Java cas的方法来实现。

  • CASStack
import sun.misc.Unsafe;
import java.lang.reflect.Field;

public class CASStack<T>
{ 
   

    private static class Node<TNode>
    { 
   
        public Node<TNode> next;
        public TNode item;

        @Override
        public String toString()
        { 
   
            return item.toString();
        }
    }

    private Node<T> head;

    public CASStack()
    { 
   
        head = new Node<T>();
    }

    /** * 头插法 * * @param item */
    public void push(T item)
    { 
   
        Node<T> node = new Node<T>();
        node.item = item;
        int count = 0;
        Node<T> oldNextNode;
        do { 
   
            oldNextNode = head.next;
            node.next = oldNextNode;
            count++;
            if(count > 1){ 
   
                System.out.println(Thread.currentThread().getName() + " repeated CAS push");
            }
        }while (!U.compareAndSwapObject(head, NEXT_OFFSET, oldNextNode, node));
    }

    /** * 删除栈顶元素并返回该元素 * 没有元素是返回null * * @return */
    public  T pop()
    { 
   
        Node<T> node;
        int count = 0;
        do{ 
   
            node = head.next;
            if (node == null)
                return null;
            count++;
            if(count > 1){ 
   
                System.out.println(Thread.currentThread().getName() + " repeated CAS pop");
            }
        }
        while (!U.compareAndSwapObject(head, NEXT_OFFSET, node, node.next));
        return node.item;
    }

    private static Unsafe U;
    private static final long NEXT_OFFSET;
    static { 
   
        try { 
   
            Field f ;
            f = Unsafe.class.getDeclaredField("theUnsafe");
            f.setAccessible(true);
            U=(Unsafe)f.get(null);

            Class<?> nodeClass = Node.class;
            NEXT_OFFSET = U.objectFieldOffset(nodeClass.getDeclaredField("next"));
        }
        catch (Exception e) { 
   
            throw new Error(e);
        }
    }

}
  • 测试代码
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/** * Created by chengc on 2018/12/10. */
public class CASStackTest
{ 
   
    private static final Logger logger = LoggerFactory.getLogger(CASStackTest.class);

    public static void main(String[] args)
    { 
   
        CASStack<Integer> stack = new CASStack<Integer>();

        BlockingQueue putQueue = new LinkedBlockingDeque<>();
        ExecutorService putThreadPool = new ThreadPoolExecutor(5,10,1, TimeUnit.MINUTES, putQueue, new ThreadPoolExecutor.DiscardPolicy());

        CountDownLatch pushCountDownLatch = new CountDownLatch(10);
        AtomicInteger pushCount = new AtomicInteger();
        for(int i = 0 ; i < 10 ; i++) { 
   
            final int finalI = i;
            putThreadPool.submit(() -> { 
   
                for (int j = 1; j <= 100; j++) { 
   
                    stack.push(j + finalI*100);
                    pushCount.incrementAndGet();
                }
                pushCountDownLatch.countDown();
            });
        }
        try { 
   
            pushCountDownLatch.await();
        }
        catch (InterruptedException e) { 
   
            e.printStackTrace();
        }
        logger.info("push done, pushCount={}" , pushCount);
        putThreadPool.shutdown();

        boolean[] poppedItems = new boolean[1001];
        BlockingQueue popQueue = new LinkedBlockingDeque<>();
        ExecutorService popThreadPool = new ThreadPoolExecutor(5,10,1, TimeUnit.MINUTES, popQueue, new ThreadPoolExecutor.DiscardPolicy());

        AtomicInteger popCount = new AtomicInteger();
        CountDownLatch popCountDownLatch = new CountDownLatch(10);

        for(int i = 0 ; i < 10 ; i++){ 
   
            popThreadPool.submit(() -> { 
   
                for (int j = 0; j < 100; j++) { 
   
                    int item = stack.pop();
                    if (poppedItems[item] == true) { 
   
                        logger.error("Thread {}: Item {} was popped before!", Thread.currentThread().getName(), item);
                    }
                    else { 
   
                        poppedItems[item] = true;
                    }
                    popCount.incrementAndGet();
                }
                popCountDownLatch.countDown();
            });
        }

        try { 
   
            popCountDownLatch.await();
        }
        catch (InterruptedException e) { 
   
            e.printStackTrace();
        }
        popThreadPool.shutdown();

        logger.info("Pop done, popCount={}" , popCount);
    }
}
  • 运行结果
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-1 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-5 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-4 repeated CAS push
pool-1-thread-3 repeated CAS push
pool-1-thread-3 repeated CAS push
pool-1-thread-3 repeated CAS push
pool-1-thread-3 repeated CAS push
2018-12-10 13:29:10.164 INFO  [main] demos.lockfree.casstackdemo.CASStackTest,46 - push done, pushCount=1000
pool-2-thread-2 repeated CAS pop
pool-2-thread-4 repeated CAS pop
pool-2-thread-4 repeated CAS pop
pool-2-thread-3 repeated CAS pop
pool-2-thread-3 repeated CAS pop
pool-2-thread-1 repeated CAS pop
pool-2-thread-1 repeated CAS pop
pool-2-thread-2 repeated CAS pop
pool-2-thread-2 repeated CAS pop
2018-12-10 13:29:10.170 INFO  [main] demos.lockfree.casstackdemo.CASStackTest,80 - Pop done, popCount=1000

可以看到,虽然cas过程中有若干次冲突,但是占比并不大。最终结果正确。

0x03 LockLess

LockLess,即无锁编程,是一种用于在不使用锁的情况下安全地操作共享数据的编程思想。有无锁算法可用于传递消息,共享列表和数据队列以及其他任务。

无锁编程非常复杂。 例如所有纯功能性质的数据结构本质上都是无锁的,因为它们是不可变的。

0x04 WaitFree

WaitFree即无等待,他是一个比LockFree更强的场景。

如果一个程序被设计为WaitFree的,这意味着无论线程执行的时间和顺序如何,每个线程都可以保证在任意时间段内都能取得一定进展。

所以我们,在WaitFree场景中,可以说线程都是独立运行的。

注意,所有WaitFree的程序都是LockFree的。

java.util.concurrent.ConcurrentLinkedQueue是WaitFree思想实现的一个例子。

0x05 更多资料

Lock-Free Programming

0xFF 参考文档

What’s the difference between lockless and lockfree?

Examples/Illustration of Wait-free And Lock-free Algorithms

Non-blocking algorithm

Lock-Free 编程

lock free的理解

An Introduction to Lock-Free Programming

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/161675.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 数学的本质

    数学的本质数学的本质李国伟现代数学在方法上的特征现代数学在方法上最明显的特色是它的演绎性,就是由基本定义与公理出发,经逻辑推论到所有定理的发展方式。采取这种方法并非偶然,而是有内在的需求。我们要把一套概念讲清楚,必须用比较简单的概念来解释,但是这些概念又需要再加澄清,如此继续下去,如果不曾周而复始得到一个什么也说不清的恶性循环,便会无限延伸下去,达到一个不可知的前端。人类寻求知识的目的在组织自己

    2022年6月16日
    44
  • Java面向对象编程三大特征 – 封装

    Java面向对象编程三大特征 – 封装本文关键字:Java、面向对象、三大特征、封装。封装是面向对象编程中的三大特征之一,在对封装性进行解释时我们有必要先了解一些面向对象的思想,以及相关的概念。

    2022年7月25日
    3
  • 实现数组转对象(reduce)「建议收藏」

    实现数组转对象(reduce)「建议收藏」//数组转对象letarr=[{id:1,name:’张三’},{id:2,name:’李四’},{id:3,name:’王五’}];lettransToObj=function(arr){if(!Array.isArray(arr))throw’参数错误’;returnarr.reduce((pre,cur,index,arr)=>{…

    2022年9月11日
    0
  • jar中没有主清单属性啥意思啊_maven打jar包无主清单属性

    jar中没有主清单属性啥意思啊_maven打jar包无主清单属性在maven-assembly-plugin插件配置中添加,如下内容:maven-assembly-plugin

    2022年9月5日
    2
  • Java Metric 入门详解

    Java Metric 入门详解翻译自:http://metrics.dropwizard.io/3.1.0/getting-started/    待完成……

    2022年6月17日
    41
  • 单源最短路径dijkstra算法_dijkstra是谁

    单源最短路径dijkstra算法_dijkstra是谁年轻的探险家来到了一个印第安部落里。在那里他和酋长的女儿相爱了,于是便向酋长去求亲。酋长要他用 10000 个金币作为聘礼才答应把女儿嫁给他。探险家拿不出这么多金币,便请求酋长降低要求。酋长说:”嗯,如果你能够替我弄到大祭司的皮袄,我可以只要 8000 金币。如果你能够弄来他的水晶球,那么只要 5000 金币就行了。”探险家就跑到大祭司那里,向他要求皮袄或水晶球,大祭司要他用金币来换,或者替他弄来其他的东西,他可以降低价格。探险家于是又跑到其他地方,其他人也提出了类似的要求,或者直接用金币换,或

    2022年8月9日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号