威尼斯人线上娱乐

ConcurrentBag的落实原理,HashMap与线程安全

2 4月 , 2019  

目录

Java要点2

HashMap与线程安全

HashMap与线程安全

  • 一、前言
  • 二、ConcurrentBag类
  • 3、
    ConcurrentBag线程安全完毕原理

    • 壹.
      ConcurrentBag的村办字段
    • 二.
      用于数据存储的TrehadLocalList类
    • 3.
      ConcurrentBag完毕新增成分
    • 肆. ConcurrentBag
      怎样促成迭代器方式
  • 四、总结
  • 小编水平有限,就算不当欢迎各位批评指正!

ConcurrentBag的落实原理,HashMap与线程安全。JAVA 集合类

1、HashMap 为啥是线程不安全的

1、HashMap 为啥是线程不安全的


一.JAVA常用集合类功用、差别和总体性

两大类:Collections,Map;Collections分List和Set

List接口与事实上现类

List类似于数组,能够经过索引来访问元素,达成该接口的常用类有ArrayList、LinkedList、Vector、Stack等。

   
HashMap是通过散列表来达成存款和储蓄结构的,具体内容请看本人的另一篇博客《HashMap深度解析》,那么HashMap为何线程不安全呢,首要有五个原因。

   
HashMap是经过散列表来实现存款和储蓄结构的,具体内容请看笔者的另一篇博客《HashMap深度分析》,那么HashMap为何线程不安全啊,首要有五个原因。


ArrayList

ArrayList是动态数组,能够依照插入的因素的数码自动扩容,而使用者不须求通晓其里面是何许时候进行扩张的,把它看做丰裕体量的数组来采纳即可。
ArrayList访问成分的艺术get是常数时间,因为是直接依据下标索引来访问的,而add方法的小时复杂度是O(n),因为急需活动成分,将新成分插入到适合的岗位。
ArrayList是非线程安全的,即它从未一起,不过,能够透过Collections.synchronizedList()静态方法再次来到三个联手的实例,如:

List synList = Collections.synchronizedList(list);

数组扩大体量:ArrayList在插入成分的时候,都会检查当前的数组大小是不是丰富,即使不够,将会扩大容积到近日体积
* 一.伍 +
壹(加壹是为着当前容积为壹时,也能扩大到二),即把原先的因素全体复制到多个两倍大小的新数组,将旧的数组放弃掉(等待垃圾回收),这些操作是比较耗费时间,因而建议在创设ArrayList的时候,依据要插入的因素的多少来起先估价Capacity,并开首化ArrayList,如:

ArrayList list = new ArrayList(100);

如此,在插入小于九十七个因素的时候都以不要求开始展览扩大容积的,能够带来性能的升官,当然,假诺对这一个容积估量大了,大概会拉动1些空中的费用。

首先肯定是多个线程同时去往集合里添加多少,第四个原因:五个线程同时加上相同的key值数据,当八个线程同时遍历完桶内的链表时,发现,未有该key值的多少,那是他们同时创造了三个Entry结点,都丰盛到了桶内的链表上,那样在该HashMap集合中就应运而生了四个Key相同的数量。首个原因:当多少个线程同时检验到size/capacity>负载因子时,在扩容的时候可能会在链表上发出死循环(为何会时有发生死循环,能够看有个别HashMap的死循环相关的博客),也只怕会爆发存款和储蓄非常。

先是肯定是多个线程同时去往集合里添加多少,第一个原因:三个线程同时添加相同的key值数据,当八个线程同时遍历完桶内的链表时,发现,没有该key值的数码,那是她们同时创制了贰个Entry结点,都助长到了桶内的链表上,那样在该HashMap集合中就涌出了多个Key相同的多寡。第二个原因:当四个线程同时检测到size/capacity>负载因马时,在扩大体量的时候或者会在链表上发生死循环(为何会时有产生死循环,能够看有的HashMap的死循环相关的博客),也说不定会发生存款和储蓄格外。

一、前言

小编眼前在做七个档次,项目中为了进步吞吐量,使用了音讯队列,中间实现了生产消费情势,在生产消费者格局中须求有3个凑合,来存款和储蓄生产者所生育的物品,小编利用了最常见的List<T>会晤类型。

是因为生产者线程有广大个,消费者线程也有广大个,所以不可幸免的就时有产生了线程同步的题材。初步小编是应用lock重在字,举办线程同步,然则质量并不是专门理想,然后有网民说能够利用SynchronizedList<T>来代表使用List<T>高达到规定的分数线程安全的指标。于是笔者就替换到了SynchronizedList<T>,不过发现质量依旧不佳,于是查看了SynchronizedList<T>的源代码,发现它正是简简单单的在List<T>提供的API的根底上加了lock,所以品质基本与小编实现格局相差无几。

末尾笔者找到了化解的方案,使用ConcurrentBag<T>类来贯彻,品质有十分大的转移,于是笔者查阅了ConcurrentBag<T>的源代码,完结充裕精致,特此在那记录一下。

LinkedList

LinkedList也落成了List接口,其内部贯彻是运用双向链表来保存成分,由此插入与删除成分的个性都呈现不错。它还提供了一部分任何操作方法,如在头顶、底部插入只怕去除成分,因而,可以用它来完结栈、队列、双向队列。
出于是使用链表保存成分的,所以随便走访成分的时候速度会相比较慢(须要遍历链表找到对象成分),那或多或少比较ArrayList的随机走访要差,ArrayList是接纳数组达成格局,直接使用下标能够访问到成分而不要求遍历。因而,在须要频仍随意访问成分的景色下,建议利用ArrayList。
与ArrayList1样,LinkedList也是非同步的,假诺急需完毕四线程访问,则须求自个儿在表面达成同台方法。当然也能够采纳Collections.synchronizedList()静态方法。

二、怎样线程安全的行使HashMap

贰、如何线程安全的应用HashMap

二、ConcurrentBag类

ConcurrentBag<T>实现了IProducerConsumerCollection<T>接口,该接口首要用于生产者消费者情势下,可见该类基本正是为生产消费者格局定制的。然后还完结了例行的IReadOnlyCollection<T>类,完成了此类就须求贯彻IEnumerable<T>、IEnumerable、 ICollection类。

ConcurrentBag<T>对外提供的办法未有List<T>那么多,不过同样有Enumerable贯彻的扩张方法。类自身提供的措施如下所示。

名称 说明
Add 将对象添加到 ConcurrentBag 中。
CopyTo 从指定数组索引开始,将 ConcurrentBag 元素复制到现有的一维 Array 中。
Equals(Object) 确定指定的 Object 是否等于当前的 Object。 (继承自 Object。)
Finalize 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 Object。)
GetEnumerator 返回循环访问 ConcurrentBag 的枚举器。
GetHashCode 用作特定类型的哈希函数。 (继承自 Object。)
GetType 获取当前实例的 Type。 (继承自 Object。)
MemberwiseClone 创建当前 Object 的浅表副本。 (继承自 Object。)
ToArray 将 ConcurrentBag 元素复制到新数组。
ToString 返回表示当前对象的字符串。 (继承自 Object。)
TryPeek 尝试从 ConcurrentBag 返回一个对象但不移除该对象。
TryTake 尝试从 ConcurrentBag 中移除并返回对象。

Vector

Vector是ArrayList的线程同步版本,正是说Vector是1起的,扶助多线程访问。除了那个之外,还有1些不暂时,当体积不够时,Vector私下认可扩展一倍容积,而ArrayList是当下体积
* 1.5 + 1

方法一:Hashtable

方法一:Hashtable

三、 ConcurrentBag线程安全完毕原理

Stack

Stack是一种后进先出的数据结构,继承自Vector类,提供了push、pop、peek(得到栈顶成分)等办法。

   
Hashtable是Java低版本中建议来的,由于在那之中间的加锁机制,是的其质量较低,近期早就不常用了。所以当三个线程访问Hashtable的联合方法时,别的线程如若也要拜访同步方法,会被阻塞住。举个例子,当3个线程使用put方法时,另3个线程不但不得以选用put方法,连get方法都不可能,作用相当的低。

   
Hashtable是Java低版本中建议来的,由于其里面包车型地铁加锁机制,是的其质量较低,如今曾经不常用了。所以当2个线程访问Hashtable的联合方法时,别的线程假使也要访问同步方法,会被阻塞住。举个例子,当三个线程使用put方法时,另1个线程不但不可能运用put方法,连get方法都不能,作用非常的低。

1. ConcurrentBag的村办字段

ConcurrentBag线程安全达成首借使透过它的数据存款和储蓄的布局和细颗粒度的锁。

   public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList对象包含每个线程的数据
        ThreadLocal<ThreadLocalList> m_locals;

        // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中
        // 允许在线程局部对象上枚举
        volatile ThreadLocalList m_headList, m_tailList;

        // 这个标志是告知操作线程必须同步操作
        // 在GlobalListsLock 锁中 设置
        bool m_needSync;

}

首要选取大家来看它申明的个人字段,在那之中要求专注的是集聚的数额是存放在在ThreadLocal线程本地存款和储蓄中的。也正是说访问它的种种线程会维护3个友好的集聚数据列表,1个聚众中的数据大概会存放在差别线程的本土存储空间中,所以一旦线程访问自个儿本地存储的靶子,那么是从未难点的,那正是促成线程安全的首先层,动用线程本地存款和储蓄数据

接下来能够见见ThreadLocalList m_headList, m_tailList;本条是存放在着地面列表对象的头指针和尾指针,通过那两个指针,大家就能够通过遍历的方法来走访具有地点列表。它选择volatile修饰,不容许线程实行地面缓存,每一种线程的读写都以直接操作在共享内部存款和储蓄器上,那就保证了变量始终具有一致性。任何线程在别的时间开始展览读写操作均是新型值。对于volatile修饰符,感谢自身是攻城狮建议描述失实。

终极又定义了贰个标志,那几个标志告知操作线程必须开始展览同步操作,那是促成了三个细颗粒度的锁,因为唯有在多少个标准满意的情状下才须求实行线程同步。

Set接口

Set是不能包括重合成分的器皿,其完毕类有HashSet,继承于它的接口有SortedSet接口等。Set中提供了加、减、和交等集合操作函数。Set不可能依照索引随机访问成分,那是它与List的二人命关天不相同。

   
HashTable源码中是采纳synchronized来担保线程安全的,比如上边包车型大巴get方法和put方法:
public synchronized V get(Object key) {}

   
HashTable源码中是选取synchronized来担保线程安全的,比如上面包车型地铁get方法和put方法:
public synchronized V get(Object key) {}

2. 用来数据存款和储蓄的TrehadLocalList类

接下去大家来看一下ThreadLocalList类的布局,该类就是事实上存款和储蓄了数码的职位。实际上它是行使双向链表那种协会举行多少存储。

[Serializable]
// 构造了双向链表的节点
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操作类型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 线程锁定的类
/// </summary>
internal class ThreadLocalList
{
    // 双向链表的头结点 如果为null那么表示链表为空
    internal volatile Node m_head;

    // 双向链表的尾节点
    private volatile Node m_tail;

    // 定义当前对List进行操作的种类 
    // 与前面的 ListOperation 相对应
    internal volatile int m_currentOp;

    // 这个列表元素的计数
    private int m_count;

    // The stealing count
    // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数
    internal int m_stealCount;

    // 下一个列表 可能会在其它线程中
    internal volatile ThreadLocalList m_nextList;

    // 设定锁定是否已进行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有当列表从空变为非空统计是底层
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 构造器
    /// </summary>
    /// <param name="ownerThread">拥有这个集合的线程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一个新的item到链表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新计数.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 因为进行初始化了,所以将空状态改为非空状态
        }
        else
        {
            // 使用头插法 将新的元素插入链表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新计数以避免此添加同步时溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 从列表的头部删除一个item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 双向链表删除头结点数据的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表头部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 从列表的尾部获取一个item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

从上面包车型客车代码中大家能够进一步证实以前的眼光,便是ConcurentBag<T>在五个线程中贮存数据时,使用的是双向链表ThreadLocalList福寿绵绵了一组对链表增加和删除改查的法子。

HashSet

HashSet达成了Set接口,其里面是利用HashMap达成的。放入HashSet的靶子最棒重写hashCode、equals方法,因为暗中认可的那两个方法很恐怕与您的事体逻辑是差异等的,而且,要同时重写那多个函数,假如只重写当中三个,很简单产生不测的题材。
记住上边几条规则:

十一分对象,hashCode一定相等。
不等对象,hashCode不肯定不等于。
多少个指标的hashCode相同,不自然相等。
五个目的的hashCode分歧,一定相等。

public synchronized V put(K key, V value) {}

public synchronized V put(K key, V value) {}

三. ConcurrentBag兑现新增成分

接下去大家看一看ConcurentBag<T>是何等新增成分的。

/// <summary>
/// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里
/// 这是避免内存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此时必须持有全局锁
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 从头线程列表开始枚举 找到那些已经被关闭的线程
    // 将它所在的列表对象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地帮助方法,通过线程对象检索线程线程本地列表
/// </summary>
/// <param name="forceCreate">如果列表不存在,那么创建新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 获取用于更新操作的 m_tailList 锁
        lock (GlobalListsLock)
        {
            // 如果头列表等于空,那么说明集合中还没有元素
            // 直接创建一个新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中
                // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程
                // 然后将已停止线程的数据 移交到当前线程管理
                list = GetUnownedList();
                // 如果没有 那么就新建一个列表 然后更新尾指针的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add)
    ThreadLocalList list = GetThreadList(true);
    // 实际的数据添加操作 在AddInternal中执行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁
        // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁
        if (list.Count < 2 || m_needSync)
        {
            // 将其重置为None 以避免与窃取线程的死锁
            list.m_currentOp = (int)ListOperation.None;
            // 锁定当前对象
            Monitor.Enter(list, ref lockTaken);
        }
        // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中
        // 如果已经锁定 那么说明线程安全  可以更新Count 计数
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

从地点代码中,我们得以很明亮的理解Add()办法是什么样运作的,在这之中的机要正是GetThreadList()形式,通过该办法能够获得当前线程的数据存款和储蓄列表对象,借使不设有多少存款和储蓄列表,它会活动创造或然通过GetUnownedList()情势来寻觅那个被终止不过还蕴藏有数量列表的线程,然后将数据列表再次来到给当下线程中,防止了内存泄漏。

在数量增进的进程中,达成了细颗粒度的lock壹起锁,所以品质会很高。删除和任何操作与新增类似,本文不再赘言。

TreeSet

TreeSet同样的Set接口的完成类,同样无法存放相同的目标。它与HashSet分歧的是,TreeSet的元素是比照顺序排列的,因而用TreeSet存放的对象急需贯彻Comparable接口。

方法二:SynchronizedMap

方法二:SynchronizedMap

四. ConcurrentBag 怎么着达成迭代器方式

看完上边的代码后,作者很奇异ConcurrentBag<T>是何许促成IEnumerator来完成迭代访问的,因为ConcurrentBag<T>是经过分散在不一样线程中的ThreadLocalList来囤积数据的,那么在达成迭代器方式时,进度会比较复杂。

末尾再查看了源码之后,发现ConcurrentBag<T>为了达成迭代器格局,将分在分歧线程中的数据全都存到二个List<T>集结中,然后回来了该副本的迭代器。所以每一回访问迭代器,它都会新建二个List<T>的副本,那样固然浪费了迟早的存款和储蓄空间,不过逻辑上更为简约了。

/// <summary>
/// 本地帮助器方法释放所有本地列表锁
/// </summary>
private void ReleaseAllLocks()
{
    // 该方法用于在执行线程同步以后 释放掉所有本地锁
    // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 从冻结状态解冻包的本地帮助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先释放掉 每个线程中 本地变量的锁
    // 然后释放全局锁
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地帮助器函数等待所有未同步的操作
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操作完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地帮助器方法获取所有本地列表锁
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;

    // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁
    while (currentList != null)
    {
        // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 这将强制同步任何将来的添加/执行操作
    m_needSync = true;

    // 获取所有列表的锁
    AcquireAllLocks();

    // 等待所有操作完成
    WaitAllOperations();
}

/// <summary>
/// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。
/// 这不是线程安全, 应该被称为冻结/解冻袋块
/// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 创建一个新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先冻结整个 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 然后ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由地点的代码可精通,为了得到迭代器对象,总共举行了三步关键的操作。

  1. 使用FreezeBag()方法,冻结一切ConcurrentBag<T>相会。因为急需转移集合的List<T>副本,生成副本时期不能够有别的线程更改损坏数据。
  2. ConcurrrentBag<T>生成List<T>副本。因为ConcurrentBag<T>储存数据的办法比较新鲜,直接促成迭代器方式困难,思量到线程安全和逻辑,最棒的主意是生成三个副本。
  3. 成就上述操作之后,就可以动用UnfreezeBag()措施解冻整个集合。

那么FreezeBag()方法是何等来冻结一切集合的啊?也是分为三步走。

  1. 率先取得全局锁,通过Monitor.Enter(GlobalListsLock, ref lockTaken);那般一条语句,那样任何线程就无法冻结集合。
  2. 接下来拿走具有线程中ThreadLocalList的锁,通过`AcquireAllLocks()方法来遍历获取。那样任何线程就无法对它进行操作损坏数据。
  3. 等候已经进入了操作流程线程停止,通过WaitAllOperations()艺术来促成,该方法会遍历每2个ThreadLocalList对象的m_currentOp性格,确认保证全部处在None操作。

达成上述流程后,那么便是当真的冷冻了总体ConcurrentBag<T>会见,要解冻的话也接近。在此不再赘言。

Map接口

Map集合提供了如约“键值对”存款和储蓄成分的艺术,一个键唯一映射三个值。集合中“键值对”全部作为1个实体成分时,类似List集合,不过如若分别来年,Map是一个两列成分的集合:键是1列,值是壹列。与Set集合壹样,Map也尚无提供随机走访的力量,只能通过键来访问对应的值。
Map的每一个要素都是二个Map.Entry,那些实体的协会是< Key, Value
>样式。

   
调用synchronizedMap()方法后会重临三个SynchronizedMap类的对象,而在SynchronizedMap类中使用了synchronized同步关键字来保管对Map的操作是线程安全的。

   
调用synchronizedMap()方法后会重返三个SynchronizedMap类的对象,而在SynchronizedMap类中动用了synchronized同步关键字来确定保障对Map的操作是线程安全的。

四、总结

下边给出一张图,描述了ConcurrentBag<T>是哪些存款和储蓄数据的。通过各样线程中的ThreadLocal来促成线程当地存款和储蓄,每一个线程中都有那般的结构,互不干扰。然后各类线程中的m_headList连接指向ConcurrentBag<T>的首先个列表,m_tailList本着最后一个列表。列表与列表之间通过m_locals
下的 m_nextList频频,构成三个单链表。

多少存储在各样线程的m_locals中,通过Node类构成1个双向链表。
PS:
要注意m_tailListm_headList并不是储存在ThreadLocal中,而是拥有的线程共享壹份。

威尼斯人线上娱乐 1

如上正是有关ConcurrentBag<T>类的落到实处,笔者的片段笔录和分析。

HashMap

HashMap完结了Map接口,但它是非线程安全的。HashMap允许key值为null,value也足以为null。

源码如下

源码如下

作者水平有限,假设不当欢迎各位批评指正!

附上ConcurrentBag<T>源码地址:戳一戳

Hashtable

Hashtable也是Map的贯彻类,继承自Dictionary类。它与HashMap不一致的是,它是线程安全的。而且它差别意key为null,value也不能够为null。
出于它是线程安全的,在成效上紧跟于HashMap。

private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}
private static class SynchronizedMap<K,V>

    implements Map<K,V>, Serializable {

    // use serialVersionUID from JDK 1.2.2 for interoperability

    private static final long serialVersionUID =1978198479659022715L;

    private final Map<K,V> m;     // Backing Map

        final Object      mutex;    // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {

            if (m==null)

                throw new NullPointerException();

            this.m = m;

            mutex = this;

        }

    SynchronizedMap(Map<K,V> m, Object mutex) {

            this.m = m;

            this.mutex = mutex;

        }

    public int size() {

        synchronized(mutex) {return m.size();}

        }

    public boolean isEmpty(){

        synchronized(mutex) {return m.isEmpty();}

        }

    public boolean containsKey(Object key) {

        synchronized(mutex) {return m.containsKey(key);}

        }

    public boolean containsValue(Object value){

        synchronized(mutex) {return m.containsValue(value);}

        }

    public V get(Object key) {

        synchronized(mutex) {return m.get(key);}

        }

    public V put(K key, V value) {

        synchronized(mutex) {return m.put(key, value);}

        }

    public V remove(Object key) {

        synchronized(mutex) {return m.remove(key);}

        }

    public void putAll(Map<? extends K, ? extends V> map) {

        synchronized(mutex) {m.putAll(map);}

        }

    public void clear() {

        synchronized(mutex) {m.clear();}

    }

    private transient Set<K> keySet = null;

    private transient Set<Map.Entry<K,V>> entrySet = null;

    private transient Collection<V> values = null;

    public Set<K> keySet() {

            synchronized(mutex) {

                if (keySet==null)

                    keySet = new                                     SynchronizedSet<K>(m.keySet(),mutex); 

                return keySet;

            }

    }

    public Set<Map.Entry<K,V>> entrySet() {

            synchronized(mutex) {

                if (entrySet==null)

                    entrySet = new SynchronizedSet<Map.Entry<K,V>>(m.entrySet(), mutex);

                return entrySet;

            }

    }

    public Collection<V> values() {

            synchronized(mutex) {

                if (values==null)

                    values = new SynchronizedCollection<V>(m.values(), mutex);

                return values;

            }

        }

    public boolean equals(Object o) {

            if (this == o)

                return true;

            synchronized(mutex) {return m.equals(o);}

        }

    public int hashCode() {

            synchronized(mutex) {return m.hashCode();}

        }

    public String toString() {

        synchronized(mutex) {return m.toString();}

        }

        private void writeObject(ObjectOutputStream s) throws IOException {

        synchronized(mutex) {s.defaultWriteObject();}

        }

}

List总结

ArrayList内部贯彻选用动态数组,当容量不够时,自动扩大容积至(当前体积*一.5+1)。元素的各类遵照插入的顺序排列。暗许初叶容量为十。
contains复杂度为O(n),add复杂度为分摊的常数,即添加n个成分须要O(n)时刻,remove为O(n),get复杂度为O(1)
私自访问作用高,随机插入、删除功效低。ArrayList是非线程安全的。

LinkedList内部使用双向链表完结,随机走访效用低,随机插入、删除功用高。能够看作储藏室、队列、双向队列来利用。LinkedList也是非线程安全的。

Vector跟ArrayList是周围的,内部贯彻也是动态数组,随机走访成效高。Vector是线程安全的。

Stack是栈,继承于Vector,其种种操作也是依照Vector的种种操作,因此当中间贯彻也是动态数组,先进后出。Stack是线程安全的。

方法三:ConcurrentHashMap

方法三:ConcurrentHashMap

List使用情形

对此急需飞速插入、删除成分,应该使用LinkedList
对于须求快捷随机访问元素,应该选用ArrayList
假使List必要被多线程操作,应该运用Vector,倘若只会被单线程操作,应该接纳ArrayList
Set总结

HashSet内部是运用HashMap完毕的,HashSet的key值是不容许再一次的,倘若放入的靶子是自定义对象,那么最CANON够同时重写hashCode与equals函数,那样就能自定义添加的目的在怎么的情景下是壹律的,即能担保在作业逻辑下能添加对象到HashSet中,保障工作逻辑的不利。其它,HashSet里的要素不是遵照顺序存款和储蓄的。HashSet是非线程安全的。

TreeSet存款和储蓄的因素是按顺序存款和储蓄的,尽管是储存的要素是自定义对象,那么必要贯彻Comparable接口。TreeSet也是非线程安全的。

LinkedHashSet继承自HashSet,它与HashSet分化的是,LinkedHashSet存款和储蓄成分的顺序是依据成分的插入顺序存款和储蓄的。LinkedHashSet也是非线程安全的。

    ConcurrentHashMap是java.util.concurrent包中的三个类,

    ConcurrentHashMap是java.util.concurrent包中的二个类,

Map总结

HashMap存款和储蓄键值对。当程序试图将三个key-value对放入 HashMap
中时,程序首先依照该key的hashCode()重返值决定该Entry的积存地方:若是八个Entry的key的hashCode()
重临值相同,这它们的囤积地方相同。如若那八个Entry的key通过equals比较重返true,新添加Entry的value将覆盖集合中原来Entry的
value,但key不会覆盖。假如那七个Entry的key通过equals
相比较重回false,新增进的Entry将与聚集中原来Entry形成Entry
链,而且新加上的 Entry 位于 Entry
链的头顶。看上面HashMap添加键值对的源代码:

    public V put(K key, V value) {
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
    }
    void addEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    if (size++ >= threshold)
        resize(2 * table.length);
    }

HashMap允许key、value值为null。HashMap是非线程安全的。

Hashtable是HashMap的线程安全版本。而且,key、value都不一样意为null。

哈希值的运用不相同: Hashtable直接动用对象的hashCode,如下代码:

int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;

而HashMap重新总计hash值,如下代码:

  int hash = hash(key.hashCode());
  int i = indexFor(hash, table.length);
  static int hash(int h) {
  // This function ensures that hashCodes that differ only by
  // constant multiples at each bit position have a bounded
  // number of collisions (approximately 8 at default load factor).
  h ^= (h >>> 20) ^ (h >>> 12);
  return h ^ (h >>> 7) ^ (h >>> 4);
  }
  static int indexFor(int h, int length) {
  return h & (length-1);
  }

扩容区别: Hashtable中hash数组私下认可大小是1一,扩大的法子是
old*二+一。HashMap中hash数组的私下认可大小是16,而且必然是二的指数。

首先,大家先来打听一下以此集合的规律。hashtable是做了伙同的,但是品质降低了,因为
hashtable每趟同步施行的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的二个桶,锁的粒度大大减小,如下图:

先是,咱们先来精通一下那几个集合的原理。hashtable是做了合伙的,但是品质下落了,因为
hashtable每一次同步施行的时候都要锁住整个结构。于是ConcurrentHashMap
修改了其锁住整个结构的格式,改为了只锁住HashMap的三个桶,锁的粒度大大减小,如下图:

贰.并发相关的集合类

出现功效的升官是索要从尾部JVM指令级别开头再度设计,优化,立异同步锁的编写制定才能兑现的,java.util.concurrent
的指标正是要落到实处 Collection
框架对数据结构所实行的现身操作。通过提供一组可信的、高品质并发营造块,开发职员能够增强并发类的线程安全、可伸缩性、品质、可读性和可信性。JDK
伍.0 中的并发立异有以下三组:

• JVM 级别更改。当先2/4现代总计机对出现对某一硬件级别提供支持,日常以
compare-and-swap (CAS)指令方式。CAS
是1种低级别的、细粒度的技巧,它同意多个线程更新三个内存地方,同时能够检查测试别的线程的争论并举办复苏。它是累累高质量并发算法的基本功。在
JDK 5.0 此前,Java
语言中用于协调线程之间的拜会的惟1原语是同步,同步是更重量级和粗粒度的。公开
CAS 能够支付中度可伸缩的面世 Java 类。那几个改动重要由 JDK
库类使用,而不是由开发职员使用。

• 低级实用程序类 — 锁定和原子类。使用 CAS 作为并发原语,ReentrantLock
类提供与 synchronized
原语相同的锁定和内部存款和储蓄器语义,可是这么能够更好地控制锁定(如计时的锁定等待、锁定轮询和可间歇的锁定等待)和提供更好的可伸缩性(竞争时的高品质)。半数以上开发人士将不再直接行使
ReentrantLock 类,而是选拔在 ReentrantLock 类上营造的高级类。

 威尼斯人线上娱乐 2

 威尼斯人线上娱乐 3

高档实用程序类。那一个类完毕并发创设块,每一个总括机科学文本中都会讲述这一个类

非随机信号、互斥、闩锁、屏障、沟通程序、线程池和线程安全集合类等。超过2/四开发人士都得以在应用程序中用这个类,来替换许多(若是或不是整套)同步、wait()
和 notify() 的使用,从而增强品质、可读性和不利。

 

 

Hashtable

Hashtable
提供了壹种易于使用的、线程安全的、关联的map功用,那自然也是便于的。然则,线程安全性是凭代价换成的――
Hashtable 的兼具办法都以同台的。
此时,无竞争的联合会导致可观的性质代价。 Hashtable 的后继者 HashMap
是作为JDK1.2中的集合框架的1有些现身的,它经过提供贰个不相同台的基类和三个同步的包装器
Collections.synchronizedMap ,消除了线程安全性难点。
通过将着力的效果从线程安全性中分离开来, Collections.synchronizedMap
允许需求联合的用户能够拥有二头,而不供给一块的用户则无需为共同付出代价。
Hashtable 和 synchronizedMap 所利用的拿走同步的简短方法(同步 Hashtable
中只怕联合的 Map
包装器对象中的各样方法)有八个重大的贫乏。首先,那种格局对于可伸缩性是一种障碍,因为2遍只可以有3个线程能够访问hash表。
同时,那样仍不足以提供真正的线程安全性,许多公用的混合操作照旧供给额外的一路。即便诸如
get() 和 put()
之类的简练操作能够在不供给非常同步的事态下安全地形成,但要么有局地公用的操作体系,例如迭代或然put-if-absent(空则放入),供给外表的壹块,以制止数据争用。

还要ConcurrentHashMap的读取操作大概是完全的出现操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作大致从未锁,所以比起此前的Hashtable大大变快了(那一点在桶愈来愈多时显示得更简明些)。唯有在求size等操作时才要求锁定任何表。笔者以为ConcurrentHashMap是线程安全的联谊中最快速的。

与此同时ConcurrentHashMap的读取操作大约是截然的产出操作。所以ConcurrentHashMap
读操作的加锁加锁粒度变小,个体操作差不多从未锁,所以比起在此之前的Hashtable大大变快了(这点在桶越多时表现得更醒目些)。唯有在求size等操作时才要求锁定任何表。笔者觉得ConcurrentHashMap是线程安全的聚众中最高效的。

减小锁粒度

抓好 HashMap
的并发性同时还提供线程安全性的1种方法是屏弃对整个表使用二个锁的法子,而利用对hash表的各样bucket都采纳1个锁的办法(恐怕,更常见的是,使用一个锁池,每一个锁负责维护多少个bucket)
。这表示八个线程能够而且地访问二个 Map
的不如部分,而不用争用单个的集结范围的锁。这种艺术能够平昔进步插入、检索以及移除操作的可伸缩性。不幸的是,那种并发性是以自然的代价换到的――那使得对任何
集合举行操作的某个情势(例如 size() 或 isEmpty()
)的贯彻更为不便,因为那一个办法需要三遍拿走许多的锁,并且还设有再次来到不正确的结果的高风险。然则,对于1些意况,例如落实cache,那样做是二个很好的妥洽――因为检索和插入操作相比频繁,而
size() 和 isEmpty() 操作则少得多。

威尼斯人线上娱乐,   
而在迭代时,ConcurrentHashMap使用了差异于守旧集合的便捷战败迭代器的另1种迭代格局,大家称为弱壹致迭代器。在那种迭代格局中,当iterator被创立后集合再爆发变更就不再是抛出
ConcurrentModificationException,取而代之的是在改动时new新的数量从而不影响原有的数量,iterator完结后再将头指针替换为新的数目,那样iterator线程能够应用原来老的数码,而写线程也能够出现的形成改变,更要紧的,那保险了多个线程并发执行的接二连三性和增添性,是性质提高的主要。

   
而在迭代时,ConcurrentHashMap使用了分裂于古板集合的快速战败迭代器的另1种迭代格局,大家称为弱壹致迭代器。在那种迭代情势中,当iterator被成立后集合再发生转移就不再是抛出
ConcurrentModificationException,取而代之的是在改变时new新的多少从而不影响原有的多少,iterator完结后再将头指针替换为新的多寡,那样iterator线程能够采取原来老的数据,而写线程也能够出现的形成改变,更主要的,那保险了多个线程并发执行的三番五次性和扩张性,是性质进步的要紧。

ConcurrentHashMap

util.concurrent 包中的 ConcurrentHashMap 类(也将出以往JDK 一.5中的
java.util.concurrent 包中)是对 Map 的线程安全的贯彻,比起
synchronizedMap
来,它提供了好得多的并发性。多少个读操作差不离总能够并发地执行,同时进行的读和写操作平日也能并发地执行,而还要开始展览的写操作还能够常常地涌出进行(相关的类也提供了近乎的多少个读线程的并发性,可是,只同意有二个移动的写线程)
。ConcurrentHashMap 被规划用来优化检索操作;实际上,成功的 get()
操作完毕今后常常根本不会有锁着的财富。要在不选取锁的地方下得到线程安全性需求自然的技巧性,并且须求对Java内部存款和储蓄器模型(Java
Memory Model)的底细有深入的理解。

在Java 七中采纳的是对Segment(Hash表的3个桶)加锁的章程

在Java 7中动用的是对Segment(Hash表的贰个桶)加锁的方式

CopyOnWriteArrayList

在那多少个遍历操作大大地多于插入或移除操作的产出应用程序中,壹般用
CopyOnWriteArrayList 类替代 ArrayList
。倘若是用以存放贰个侦听器(listener)列表,例如在AWT或Swing应用程序中,大概在大规模的JavaBean中,那么这种景色很宽泛(相关的
CopyOnWriteArraySet 使用三个 CopyOnWriteArrayList 来落到实处 Set 接口) 。
若是你正在利用1个平时的 ArrayList
来存放在二个侦听器列表,那么一旦该列表是可变的,而且只怕要被多少个线程访问,您
就务供给么在对其开始展览迭代操作时期,要么在迭代前实行的克隆操作时期,锁定任何列表,那三种做法的支付都十分大。当对列表执行会引起列表发生变化的操作时,
CopyOnWriteArrayList
并不是为列表创设一个全新的副本,它的迭代器肯定能够回到在迭代器被成立时列表的情状,而不会抛出
ConcurrentModificationException
。在对列表实行迭代此前不要克隆列表或许在迭代之间锁
定列表,因为迭代器所观察的列表的副本是不变的。换句话说,
CopyOnWriteArrayList
含有对贰个不足变数组的贰个可变的引用,因而,只要保留好可怜引用,您就足以获得不可变的线程安全性的功利,而且并非锁
定列表。

ConcurrentHashMap中驷不如舌实体类正是几个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

ConcurrentHashMap中任重(英文名:rèn zhòng)而道远实体类就是八个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)。

BlockingQueue

闭塞队列(BlockingQueue)是3个支撑多少个叠加操作的队列。那多少个附加的操作是:在队列为空时,获取成分的线程会等待队列变为非空。当队列满时,存储成分的线程会等待队列可用。阻塞队列常用于生产者和买主的场合,生产者是往队列里添比索素的线程,消费者是从队列里拿成分的线程。阻塞队列就是生产者存放成分的器皿,而顾客也只从容器里拿成分。

堵塞队列提供了两种处理措施:

措施处理格局 抛出极度 归来特殊值 直白不通 逾期退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
反省措施 element() peek() 不可用 不可用

抛出卓殊:是指当阻塞队列满时候,再往队列里插入成分,会抛出IllegalStateException(“Queue
full”)分外。当队列为空时,从队列里拿走成分时会抛出NoSuchElementException至极。
回来特殊值:插入方法会再次回到是或不是中标,成功则赶回true。移除方法,则是从队列里拿出二个因素,假使未有则赶回null
直白不通:当阻塞队列满时,假诺劳动者线程往队列里put成分,队列会直接不通生产者线程,直到得到数码,只怕响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
过期退出:当阻塞队列满时,队列会阻塞生产者线程1段时间,借使跨越一定的小时,生产者线程就会退出。

  1. Java里的封堵队列

JDK柒提供了多少个闭塞队列。分别是

ArrayBlockingQueue :二个由数组结构构成的有界阻塞队列。
LinkedBlockingQueue :二个由链表结构构成的有界阻塞队列。
PriorityBlockingQueue :贰个支撑先行级排序的无界阻塞队列。
DelayQueue:一个利用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储成分的堵截队列。
LinkedTransferQueue:三个由链表结构重组的无界阻塞队列。
LinkedBlockingDeque:2个由链表结构组成的双向阻塞队列。

Segment的源码

Segment的源码

ArrayBlockingQueue

ArrayBlockingQueue是三个用数组完毕的有界阻塞队列。此行列遵照先进先出(FIFO)的条件对成分举办排序。私下认可情况下不保障访问者公平的走访队列,所谓公平访问队列是指阻塞的装有生产者线程或消费者线程,当队列可用时,能够依照阻塞的先后顺序访问队列,即先阻塞的劳动者线程,能够先往队列里插入成分,先堵塞的买主线程,能够先从队列里拿走成分。

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

static final class Segment<K,V> extends
ReentrantLock implements Serializable {

LinkedBlockingQueue

LinkedBlockingQueue是贰个用链表完结的有界阻塞队列。此行列的暗许和最大尺寸为Integer.MAX_VALUE。此行列遵照先进先出的原则对成分进行排序。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你能够从队列的双边插入和移出元素。双端队列因为多了二个操作队列的进口,在十2线程同时入队时,也就收缩了大体上的竞争。比较别的的堵塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等措施,以First单词结尾的主意,表示插入,获取(peek)或移除双端队列的首先个因素。以Last单词结尾的章程,表示插入,获取或移除双端队列的末梢1个成分。别的插入方法add等同于addLast,移除方法remove等效于removeFirst。不过take方法却一样takeFirst,不了然是还是不是Jdk的bug,使用时依旧用带有First和Last后缀的格局更明了。在初阶化LinkedBlockingDeque时得以早先化队列的体积,用来防备其再扩大容积时过渡膨胀。其余双向阻塞队列能够选拔在“工作窃取”情势中。

           private static final long serialVersionUID
= 2249069246763182397L;

           private static final long serialVersionUID
= 2249069246763182397L;

PriorityBlockingQueue

PriorityBlockingQueue是二个援助先行级的无界队列。暗中同意处境下成分运用自然顺序排列,也得以通过比较器comparator来钦赐元素的排序规则。成分遵照升序排列。

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

           static final int MAX_SCAN_RETRIES
=Runtime.getRuntime().

DelayQueue

DelayQueue是2个帮衬延时获取成分的无界阻塞队列。队列使用PriorityQueue来完成。队列中的成分必须实现Delayed接口,在开立成分时得以钦点多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中领取成分。我们得以将DelayQueue运用在偏下应用场景:

缓存系统的统筹:能够用DelayQueue保存缓存成分的有效期,使用叁个线程循环查询DelayQueue,1旦能从DelayQueue中收获成分时,表示缓存有效期到了。
定时任务调度。使用DelayQueue保存当天将会执行的职务和施行时间,一旦从DelayQueue中获取到职责就从头实施,从比如TimerQueue便是使用DelayQueue实现的。
队列中的Delayed必须达成compareTo来钦赐成分的壹一。比如让延时时刻最长的放在队列的终极。达成代码如下:

    public int compareTo(Delayed other) {
       if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask x = (ScheduledFutureTask)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
   else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -
                  other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

怎么样落到实处Delayed接口

咱俩得以参照ScheduledThreadPoolExecutor里ScheduledFutureTask类。那个类达成了Delayed接口。首先:在指标创立的时候,使用time记录前对象如曾几何时候能够动用,代码如下:

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

下一场利用getDelay能够查询当前成分还索要延时多长期,代码如下:

    public long getDelay(TimeUnit unit) {
                 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

透过构造函数能够见见延迟时间参数ns的单位是皮秒,自个儿设计的时候最棒应用阿秒,因为getDelay时能够内定任意单位,1旦以微秒作为单位,而延时的时光又准确不到皮秒就劳动了。使用时请注意当time小于当前时光时,getDelay会重临负数。

何以促成延时队列

延时队列的实现很简短,当消费者从队列里获取成分时,假若成分未有直达延时时间,就短路当前线程。

    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                else if (leader != null)
                    available.await();

                availableProcessors() > 1 ? 64 : 1;

                availableProcessors() > 1 ? 64 : 1;

卡住队列的完成原理

设若队列是空的,消费者会平昔守候,当生产者添韩成分时候,消费者是什么知道当前队列有成分的啊?要是让您来设计阻塞队列你会怎么着设计,让劳动者和买主能够高作用的举办电视发表呢?让大家先来探视JDK是如何兑现的。

采取公告方式完毕。所谓文告形式,就是当生产者往满的行列里添比索素时会阻塞住生产者,当消费者消费了三个队列中的成分后,会打招呼劳动者当前队列可用。通过翻看JDK源码发现ArrayBlockingQueue使用了Condition来实现,代码如下:

    private final Condition notFull;
    private final Condition notEmpty;

    public ArrayBlockingQueue(int capacity, boolean fair) {
    //省略其他代码
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

    public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
    }

    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return extract();
            } finally {
        lock.unlock();
    }
        }

        private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

当大家往队列里插入一个要素时,倘使队列不可用,阻塞生产者首要通过LockSupport.park(this);来兑现

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)

                    reportInterruptAfterWait(interruptMode);
    }

接轨进入源码,发现调用setBlocker先保存下就要阻塞的线程,然后调用unsafe.park阻塞当前线程。

    public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

unsafe.park是个native方法,代码如下:

park那个方法会阻塞当前线程,唯有以下两种景况中的1种产生时,该情势才会回到。

与park对应的unpark执行或曾经实施时。注意:已经实施是指unpark先实行,然后再进行的park。
线程被搁浅时。
借使参数中的time不是零,等待了点名的微秒数时。
发生十分现象时。这几个卓殊事先不或许显明。
我们继承看一下JVM是什么贯彻park方法的,park在分歧的操作系统使用分歧的办法贯彻,在linux下是选取的是系统方法pthread_cond_wait落到实处。达成代码在JVM源码路径src/os/linux/vm/os_linux.cpp里的
os::Platform伊芙nt::park方法,代码如下:

    void os::PlatformEvent::park() {
         int v ;
     for (;;) {
             v = _Event ;
     if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
     }
     guarantee (v >= 0, "invariant") ;
     if (v == 0) {
     // Do this the hard way by blocking ...
     int status = pthread_mutex_lock(_mutex);
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
     while (_Event < 0) {
     status = pthread_cond_wait(_cond, _mutex);
     // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
     // Treat this the same as if the wait was interrupted
     if (status == ETIME) { status = EINTR; }
     assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;

     // In theory we could move the ST of 0 into _Event past the unlock(),
     // but then we'd need a MEMBAR after the ST.
     _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);
     assert_status(status == 0, status, "mutex_unlock");
     }
     guarantee (_Event >= 0, "invariant") ;
     }

 }

pthread_cond_wait是3个十二线程的原则变量函数,cond是condition的缩写,字面意思能够知晓为线程在等候贰个标准发生,这些规则是三个全局变量。那一个措施接收八个参数,2个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是采纳pthread_cond_signal达成的。park
在windows下则是利用WaitForSingleObject完结的。

           transient volatile HashEntry<K,V>[]
table;

           transient volatile HashEntry<K,V>[]
table;

三.某些常用集合类的内部贯彻格局

           transient int count;

           transient int count;

           transient int modCount;

           transient int modCount;

           transient int threshold;

           transient int threshold;

           final float loadFactor;

           final float loadFactor;

      }

      }

HashEntry的源码

HashEntry的源码

static final class HashEntry<K,V> { 

static final class HashEntry<K,V> { 

    final K key; 

    final K key; 

    final int hash; 

    final int hash; 

    volatile V value; 

    volatile V value; 

    final HashEntry<K,V> next; 

    final HashEntry<K,V> next; 

在Java 第88中学丢掉了Segment的定义,利用CAS算法做了新办法

在Java 第88中学丢掉了Segment的定义,利用CAS算法做了新措施

CAS算法选用“数组+链表+红黑树”的办法贯彻

CAS算法接纳“数组+链表+红黑树”的法门完结

                                      ————亓慧杰

                                      ————亓慧杰


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图