威尼斯人线上娱乐

Net多线程编制程序,中的线程安全集合类

2 4月 , 2019  

1、IProducerConsumerCollection (线程安全接口)
  此接口的装有实现必须都启用此接口的兼具成员,若要从八个线程同时选拔。

Thread Local Storage: Thread-Relative Static Fields and Data Slots



小说摘自msdn library官方文档

能够行使托管线程本地存款和储蓄区 (TLS) 存款和储蓄某一线程和动用程序域所独有的数目。
.NET Framework 提供了二种选用托管 TLS 的措施:线程相关的静态字段和数据槽。

  • 一旦您能够在编写翻译时预料到您的妥当需求,请使用线程相关的静态字段(在
    Visual Basic 中为线程相关的 Shared 字段)。
    线程相关的静态字段可提供最好质量。 它们还具备编写翻译时类型检查的帮助和益处。

  • Net多线程编制程序,中的线程安全集合类。假设不得不在运维时意识你的实际上须求,请使用数据槽。
    数据槽比线程相关的静态字段慢1些且更为费时使用,并且数据存款和储蓄为
    Object.aspx)
    类型,由此必须将其挟持转换为正确的门类才能利用。

在非托管 C++ 中,可以动用 TlsAlloc 来动态分配槽,使用
__declspec(thread) 来证明变量应在线程相关的存款和储蓄区中展开分配。
线程相关的静态字段和数据槽提供了此行为的托管版本。

在 .NET Framework 4中,能够选用
System.Threading.ThreadLocal<T>.aspx)
类创设线程当地对象,在首先次接纳该对象时它将惰式发轫化。
有关详细消息,请参阅推迟早先化.aspx)。

托管 TLS 中多少的唯一性

 

不论是使用线程相关的静态字段依然采纳数据槽,托管 TLS
中的数据都是线程和利用程序域组合所独有的。

  • 在动用程序域内部,贰个线程不能够修改另一个线程中的数据,固然这五个线程使用同叁个字段或槽时也无法。

  • 当线程从多个利用程序域中做客同1个字段或槽时,会在每一个应用程序域中维护2个单身的值。

例如,要是某些线程设置线程相关的静态字段的值,接着它进入另叁个用到程序域,然后搜索该字段的值,则在其次个使用程序域中搜寻的值将分裂于第一个应用程序域中的值。
在其次个利用程序域中为该字段设置1个新值不会影响率先个使用程序域中该字段的值。

未有差距于,当有些线程获取多少个例外选用程序域中的同一命名数据槽时,第二个使用程序域中的数据将一向与第二个使用程序域中的数据毫不相关。

线程相关的静态字段

 

假若您精晓有个别数量连接有些线程和运用程序域组合所独有的,请向该静态字段应用
ThreadStaticAttribute.aspx)
脾性。 与行使别的其余静态字段1样选择该字段。
该字段中的数据是各类使用它的线程所独有的。

线程相关的静态字段的品质优于数据槽,并且拥有编写翻译时类型检查的优点。

请小心,任何类构造函数代码都将在做客该字段的第三个上下文中的第二个线程上运行。
在平等应用程序域内的享有别的线程或左右文中,即使字段是援引类型,它们将被早先化为
null(在 Visual Basic 中为
Nothing);假若字段是值类型,它们将被开头化为它们的私下认可值。
因而,您不应信赖于类构造函数来初叶化线程相关的静态字段。
而应防止开端化线程相关的静态字段并假定它们初阶化为 null
(Nothing) 或它们的私下认可值。

数据槽

 

.NET Framework 提供了线程和利用程序域组合所独有的动态数据槽。
数据槽包蕴三种档次:命名槽和未命名槽。
两者都是通过运用LocalDataStoreSlot.aspx)
结构来促成的。

  • 若要创立命名数据槽,请使用
    Thread.AllocateNamedDataSlot.aspx)

    Thread.GetNamedDataSlot.aspx)
    方法。 若要取得对有些现有命名槽的引用,请将其名称传递给
    GetNamedDataSlot.aspx)
    方法。

  • 若要成立未命名数据槽,请使用
    Thread.AllocateDataSlot.aspx)
    方法。

对于命名槽和未命名槽,请使用
Thread.SetData.aspx)

Thread.GetData.aspx)
方法设置和检索槽中的音讯。
这一个都以静态方法,它们一直功效于方今正在推行它们的线程的数码。

命名槽或许很便宜,因为你能够在须求它时经过将其名目传递给
GetNamedDataSlot.aspx)
方法来查找该槽,而不是维护对未命名槽的引用。
然则,要是另1个零件使用相同的名称来命名其线程相关的存储区,并且有三个线程同时推行来自您的组件和该器件的代码,则那五个零部件只怕会损坏互相的数码。(本方案一经这七个零件在平等应用程序域内运营,并且它们并不用于共享相同数量。)

出现集合

C# 的集合类型中, 都有Synchronized静态方法, 和SyncRoot实例方法

威尼斯人线上娱乐 1威尼斯人线上娱乐 2

一 怎么选用并发集合?

对于ArrayList以及Hashtable
集合类来讲,当须要做到线程安全的时候,最棒使用其自带的属性SyncRoot
来成功,纵然也得以运用其Synchronized()方法来兑现,可是使用质量会更好。

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace ConsoleApp1
{
    public class SafeStack<T> : IProducerConsumerCollection<T>
    {
        // Used for enforcing thread-safety
        private object m_lockObject = new object();

        // We'll use a regular old Stack for our core operations
        private Stack<T> m_sequentialStack = null;

        //
        // Constructors
        //
        public SafeStack()
        {
            m_sequentialStack = new Stack<T>();
        }

        public SafeStack(IEnumerable<T> collection)
        {
            m_sequentialStack = new Stack<T>(collection);
        }

        //
        // Safe Push/Pop support
        //
        public void Push(T item)
        {
            lock (m_lockObject) m_sequentialStack.Push(item);
        }

        public bool TryPop(out T item)
        {
            bool rval = true;
            lock (m_lockObject)
            {
                if (m_sequentialStack.Count == 0) { item = default(T); rval = false; }
                else item = m_sequentialStack.Pop();
            }
            return rval;
        }

        //
        // IProducerConsumerCollection(T) support
        //
        public bool TryTake(out T item)
        {
            return TryPop(out item);
        }

        public bool TryAdd(T item)
        {
            Push(item);
            return true; // Push doesn't fail
        }

        public T[] ToArray()
        {
            T[] rval = null;
            lock (m_lockObject) rval = m_sequentialStack.ToArray();
            return rval;
        }

        public void CopyTo(T[] array, int index)
        {
            lock (m_lockObject) m_sequentialStack.CopyTo(array, index);
        }



        //
        // Support for IEnumerable(T)
        //
        public IEnumerator<T> GetEnumerator()
        {
            // The performance here will be unfortunate for large stacks,
            // but thread-safety is effectively implemented.
            Stack<T> stackCopy = null;
            lock (m_lockObject) stackCopy = new Stack<T>(m_sequentialStack);
            return stackCopy.GetEnumerator();
        }


        //
        // Support for IEnumerable
        //
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }

        // 
        // Support for ICollection
        //
        public bool IsSynchronized
        {
            get { return true; }
        }

        public object SyncRoot
        {
            get { return m_lockObject; }
        }

        public int Count
        {
            get { return m_sequentialStack.Count; }
        }

        public void CopyTo(Array array, int index)
        {
            lock (m_lockObject) ((ICollection)m_sequentialStack).CopyTo(array, index);
        }
    }
}

由来重要有以下几点:

线程安全集合:
BlockingCollection:
1个线程安全集合类,可为任何项目标集纳提供线程安全

SafeStack

  • System.Collections和System.Collections.Generic名称空间中所提供的经文列表、集合和数组都不是线程安全的,若无同步机制,他们不符合于接受并发的下令来足够和删除元素。
  • 威尼斯人线上娱乐,在出现代码中使用上述经典集合供给复杂的一块儿管理,使用起来很不方便人民群众。
  • 运用复杂的联合机制会大大下跌质量。
  • NET Framework
    四所提供的新的集结尽只怕地减小须要利用锁的次数。那几个新的联谊通过利用比较并沟通(compare-and-swap,CAS)指令和内部存款和储蓄器屏障,制止采取互斥的轻重级锁。这对质量有保证。

曾几何时使用线程安全集合 
该小说解释了.net
framework四新引入的七个尤其支持四线程添加和删除操作而陈设的联谊类型。差异于此前版本的中集合类型中的SyncRoot属性
以及 Synchronized()方法,这几个新品类应用了迅猛的锁定和免锁定同步机制

威尼斯人线上娱乐 3威尼斯人线上娱乐 4

注意:

ConcurrentQueue(T)
ConcurrentStack(T)
ConcurrentDictionary(TKey, TValue)
ConcurrentBag(T)
BlockingCollection(T)

using System;
using System.Collections.Concurrent;

namespace ConsoleApp1
{
    class Program
    {
        static void Main()
        {
            TestSafeStack();

            // Keep the console window open in debug mode.
            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }

        // Test our implementation of IProducerConsumerCollection(T)
        // Demonstrates:
        //      IPCC(T).TryAdd()
        //      IPCC(T).TryTake()
        //      IPCC(T).CopyTo()
        static void TestSafeStack()
        {
            SafeStack<int> stack = new SafeStack<int>();
            IProducerConsumerCollection<int> ipcc = (IProducerConsumerCollection<int>)stack;

            // Test Push()/TryAdd()
            stack.Push(10); Console.WriteLine("Pushed 10");
            ipcc.TryAdd(20); Console.WriteLine("IPCC.TryAdded 20");
            stack.Push(15); Console.WriteLine("Pushed 15");

            int[] testArray = new int[3];

            // Try CopyTo() within boundaries
            try
            {
                ipcc.CopyTo(testArray, 0);
                Console.WriteLine("CopyTo() within boundaries worked, as expected");
            }
            catch (Exception e)
            {
                Console.WriteLine("CopyTo() within boundaries unexpectedly threw an exception: {0}", e.Message);
            }

            // Try CopyTo() that overflows
            try
            {
                ipcc.CopyTo(testArray, 1);
                Console.WriteLine("CopyTo() with index overflow worked, and it SHOULD NOT HAVE");
            }
            catch (Exception e)
            {
                Console.WriteLine("CopyTo() with index overflow threw an exception, as expected: {0}", e.Message);
            }

            // Test enumeration
            Console.Write("Enumeration (should be three items): ");
            foreach (int item in stack)
                Console.Write("{0} ", item);
            Console.WriteLine("");

            // Test TryPop()
            int popped = 0;
            if (stack.TryPop(out popped))
            {
                Console.WriteLine("Successfully popped {0}", popped);
            }
            else Console.WriteLine("FAILED to pop!!");

            // Test Count
            Console.WriteLine("stack count is {0}, should be 2", stack.Count);

            // Test TryTake()
            if (ipcc.TryTake(out popped))
            {
                Console.WriteLine("Successfully IPCC-TryTaked {0}", popped);
            }
            else Console.WriteLine("FAILED to IPCC.TryTake!!");
        }
    }
}

与经典集合比较,并发集合会有更大的付出,因而在串行代码中央银行使并发集合无意义,只会扩大额外的开支且运维速度比访问经典集合慢。

IProducerConsumerCollection<T>
概念了操作线程安全集合的不2秘籍,以供产品/使用者利用

Program

 

以身作则请看:
IProducerConsumerCollection<T>
Interface

二、ConcurrentStack类:安全堆栈

二 并发集合

法定示例给的是依照堆栈的线程安全完毕,他持续自该接口。然后加锁lock来促成线程安全,该接口有八个措施:

威尼斯人线上娱乐 5威尼斯人线上娱乐 6

1)ConcurrentQueue:线程安全的先进先出 (FIFO) 集合

[__DynamicallyInvokable]
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
// Methods
[__DynamicallyInvokable]
void CopyTo(T[] array, int index);
[__DynamicallyInvokable]
T[] ToArray();
[__DynamicallyInvokable]
bool TryAdd(T item);
[__DynamicallyInvokable]
bool TryTake(out T item);
}除了CopyTo 之外的方法, 其余的都是该接口自己,基于堆栈的线程安全实现也就是加锁, 那为什么不调用堆栈数据结构中的SyncRoot 属性和Synchronized()方法来加锁实现同步?
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskStack = new ConcurrentStack<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskStack));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskStack, "Processor " + processorId, cts.Token));
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentStack<CustomTask> stack)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                stack.Push(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }

        static async Task TaskProcessor(
            ConcurrentStack<CustomTask> stack, string name, CancellationToken token)
        {
            await GetRandomDelay();
            do
            {
                CustomTask workItem;
                bool popSuccesful = stack.TryPop(out workItem);
                if (popSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
                }

                await GetRandomDelay();
            }
            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}

珍视情势:

参照:
C# Synchronized 和 SyncRoot
达成线程同步的源码分析及泛型集合的线程安全访问
SyncRoot 属性

Program

  • Enqueue(T item);将对象添加到集合结尾。
  • TryDequeue(out T result);
    尝试移除并赶回位于集合起来处的靶子,再次回到值表示操作是不是中标。
  • TryPeek(out T
    result);尝试再次来到集合起来处的对象,但不将其移除,再次回到值表示操作是还是不是中标。

一经调用得是集合类的SyncRoot属性的话,其锁是目的级其他,而static
则是项目级别的。具体的悔过再切磋下。

3、ConcurrentQueue类:安全队列

说明:

BlockingCollection类型这些集合类依然挺有趣的,他达成了IProducerConsumerCollection<T>的持有办法,可以兑现其余自定义类型的线程安全。越发是她的计时不通操作,具体代码示例请看:
如何:在 BlockingCollection
中每个增进和取出项
BlockingCollection
概述

威尼斯人线上娱乐 7威尼斯人线上娱乐 8

  • ConcurrentQueue是一点一滴无锁的,但当CAS操作退步且面临财富争用时,它只怕会自旋并且重试操作。
  • ConcurrentQueue是FIFO集合,有些和出入顺序无关的地方,尽量不要用ConcurrentQueue。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var taskQueue = new ConcurrentQueue<CustomTask>();
            var cts = new CancellationTokenSource();

            var taskSource = Task.Run(() => TaskProducer(taskQueue));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));
            }

            await taskSource;
            cts.CancelAfter(TimeSpan.FromSeconds(2));

            await Task.WhenAll(processors);
        }

        static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                queue.Enqueue(workItem);
                Console.WriteLine("插入Task {0} has been posted ThreadID={1}", workItem.Id, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static async Task TaskProcessor(
            ConcurrentQueue<CustomTask> queue, string name, CancellationToken token)
        {
            CustomTask workItem;
            bool dequeueSuccesful = false;

            await GetRandomDelay();
            do
            {
                dequeueSuccesful = queue.TryDequeue(out workItem);
                if (dequeueSuccesful)
                {
                    Console.WriteLine("读取Task {0} has been processed by {1} ThreadID={2}",
                                        workItem.Id, name, Thread.CurrentThread.ManagedThreadId);
                }

                await GetRandomDelay();
            }
            while (!token.IsCancellationRequested);
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        class CustomTask
        {
            public int Id { get; set; }
        }
    }
}

 

Program

2)ConcurrentStack:线程安全的后进先出 (LIFO) 集合

4、ConcurrentDictionary类
  ConcurrentDictionary类写操作比使用锁的平日字典(Dictionary)要慢的多,而读操作则要快些。由此对字典要大方的线程安全的读操作,ConcurrentDictionary类是最棒的挑叁拣四
  ConcurrentDictionary类的落成接纳了细粒度锁(fine-grained
locking)技术
,那在10二线程写入方面比采纳锁的一般的字典(也被号称粗粒度锁)

重中之重方法及品质:

威尼斯人线上娱乐 9威尼斯人线上娱乐 10

  • Push(T item);将对象插入集合的顶部。
  • TryPop(out T
    result);尝试弹出并重临集合顶部的指标,重返值表示操作是不是成功。
  • TryPeek(out T
    result);尝试再次来到集合起来处的靶子,但不将其移除,再次来到值表示操作是还是不是中标。
  • IsEmpty { get; }提示集合是或不是为空。
  • PushRange(T[] items);将四个对象插入集合的顶部。
  • TryPopRange(T[] items);弹出顶部多少个要素,再次来到结果为弹出成分个数。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var concurrentDictionary = new ConcurrentDictionary<int, string>();
            var dictionary = new Dictionary<int, string>();

            var sw = new Stopwatch();

            sw.Start();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    dictionary[i] = Item;
                }
            }
            sw.Stop();
            Console.WriteLine("Writing to dictionary with a lock: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                concurrentDictionary[i] = Item;
            }
            sw.Stop();
            Console.WriteLine("Writing to a concurrent dictionary: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                lock (dictionary)
                {
                    CurrentItem = dictionary[i];
                }
            }
            sw.Stop();
            Console.WriteLine("Reading from dictionary with a lock: {0}", sw.Elapsed);

            sw.Restart();
            for (int i = 0; i < 1000000; i++)
            {
                CurrentItem = concurrentDictionary[i];
            }
            sw.Stop();
            Console.WriteLine("Reading from a concurrent dictionary: {0}", sw.Elapsed);
        }

        const string Item = "Dictionary item";
        public static string CurrentItem;
    }
}

说明:

Program

  • 与ConcurrentQueue相似地,ConcurrentStack完全无锁的,但当CAS操作败北且面临财富争用时,它或者会自旋并且重试操作。
  • 取得集合是不是带有成分运用IsEmpty属性,而不是因而判断Count属性是还是不是超过零。调用Count比调用IsEmpty开支大。
  • 使用PushRange(T[]
    items)和TryPopRange(T[]
    items)时只顾缓冲引起的额外成本和额外的内存消耗。

5、ConcurrentBag类

 

威尼斯人线上娱乐 11威尼斯人线上娱乐 12

3) ConcurrentBag:成分可另行的严节聚集

namespace ConsoleApp1
{
    class CrawlingTask
    {
        public string UrlToCrawl { get; set; }

        public string ProducerName { get; set; }
    }
}

关键情势及品质:

CrawlingTask

  • TryPeek(out T
    result);尝试从集合重临一个对象,但不移除该指标,再次回到值表示是还是不是中标收获该目的。
  • TryTake(out T
    result);尝试从集合再次回到一个指标并移除该指标,重返值表示是还是不是成功收获该对象。
  • Add(T item);将目的添加到集合中。
  • IsEmpty { get; }解释同ConcurrentStack

威尼斯人线上娱乐 13威尼斯人线上娱乐 14

说明:

using System.Collections.Generic;

namespace ConsoleApp1
{
    static class Module
    {
        public static Dictionary<string, string[]> _contentEmulation = new Dictionary<string, string[]>();

        public static void CreateLinks()
        {
            _contentEmulation["http://microsoft.com/"] = new[] { "http://microsoft.com/a.html", "http://microsoft.com/b.html" };
            _contentEmulation["http://microsoft.com/a.html"] = new[] { "http://microsoft.com/c.html", "http://microsoft.com/d.html" };
            _contentEmulation["http://microsoft.com/b.html"] = new[] { "http://microsoft.com/e.html" };

            _contentEmulation["http://google.com/"] = new[] { "http://google.com/a.html", "http://google.com/b.html" };
            _contentEmulation["http://google.com/a.html"] = new[] { "http://google.com/c.html", "http://google.com/d.html" };
            _contentEmulation["http://google.com/b.html"] = new[] { "http://google.com/e.html", "http://google.com/f.html" };
            _contentEmulation["http://google.com/c.html"] = new[] { "http://google.com/h.html", "http://google.com/i.html" };

            _contentEmulation["http://facebook.com/"] = new[] { "http://facebook.com/a.html", "http://facebook.com/b.html" };
            _contentEmulation["http://facebook.com/a.html"] = new[] { "http://facebook.com/c.html", "http://facebook.com/d.html" };
            _contentEmulation["http://facebook.com/b.html"] = new[] { "http://facebook.com/e.html" };

            _contentEmulation["http://twitter.com/"] = new[] { "http://twitter.com/a.html", "http://twitter.com/b.html" };
            _contentEmulation["http://twitter.com/a.html"] = new[] { "http://twitter.com/c.html", "http://twitter.com/d.html" };
            _contentEmulation["http://twitter.com/b.html"] = new[] { "http://twitter.com/e.html" };
            _contentEmulation["http://twitter.com/c.html"] = new[] { "http://twitter.com/f.html", "http://twitter.com/g.html" };
            _contentEmulation["http://twitter.com/d.html"] = new[] { "http://twitter.com/h.html" };
            _contentEmulation["http://twitter.com/e.html"] = new[] { "http://twitter.com/i.html" };
        }
    }
}
  • ConcurrentBag为每贰个访问集合的线程维护了三个本地队列,在或许的气象下,它会以无锁的法门访问本地队列。
  • ConcurrentBag在同三个线程添加和删除成分的场馆下效用尤其高。
  • 因为ConcurrentBag有时会须要锁,在劳动者线程和消费者线程完全分开的风貌下成效异常低。
  • ConcurrentBag调用IsEmpty的开发一点都非常大,因为那亟需近期得到这几个严节组的装有锁。

Module

 

威尼斯人线上娱乐 15威尼斯人线上娱乐 16

4)BlockingCollection:实现

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Module.CreateLinks();
            Task t = RunProgram();
            t.Wait();
        }

        static async Task RunProgram()
        {
            var bag = new ConcurrentBag<CrawlingTask>();

            string[] urls = new[] { "http://microsoft.com/", "http://google.com/", "http://facebook.com/", "http://twitter.com/" };

            var crawlers = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string crawlerName = "Crawler " + i.ToString();
                bag.Add(new CrawlingTask { UrlToCrawl = urls[i - 1], ProducerName = "root" });
                crawlers[i - 1] = Task.Run(() => Crawl(bag, crawlerName));
            }

            await Task.WhenAll(crawlers);
        }

        static async Task Crawl(ConcurrentBag<CrawlingTask> bag, string crawlerName)
        {
            CrawlingTask task;
            //尝试从bag中取出对象
            while (bag.TryTake(out task))
            {
                IEnumerable<string> urls = await GetLinksFromContent(task);
                if (urls != null)
                {
                    foreach (var url in urls)
                    {
                        var t = new CrawlingTask
                        {
                            UrlToCrawl = url,
                            ProducerName = crawlerName
                        };
                        //将子集插入到bag中 
                        bag.Add(t);
                    }
                }
                Console.WriteLine("Indexing url {0} posted by {1} is completed by {2}!",
                    task.UrlToCrawl, task.ProducerName, crawlerName);
            }
        }

        static async Task<IEnumerable<string>> GetLinksFromContent(CrawlingTask task)
        {
            await GetRandomDelay();

            if (Module._contentEmulation.ContainsKey(task.UrlToCrawl)) return Module._contentEmulation[task.UrlToCrawl];

            return null;
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(150, 200);
            return Task.Delay(delay);
        }


    }
}

System.Collections.Concurrent.IProducerConsumerCollection<T>
的线程安全集合,提供阻塞和范围功用

Program

重要措施及品质:

6、BlockingCollection类
  BlockingCollection类:
大家能够转移任务存款和储蓄在堵塞集合中的情势。暗中同意情状下它采纳的是ConcurrentQueue容器,但是咱们能够利用别的达成了IProducerConsumerCollection泛型接口的联谊。

  • BlockingCollection(int
    boundedCapacity);boundedCapacity代表集合限制大小。
  • CompleteAdding();将BlockingCollection实例标记为不再接受别的添加。
  • IsCompleted { get; }此聚众是不是已标记为已成功增加并且为空。
  • GetConsumingEnumerable();从集合中移除并回到移除的成分
  • Add(T item);添新币素到集结。
  • TryTake(T item, int millisecondsTimeout, CancellationToken
    cancellationToken);

威尼斯人线上娱乐 17威尼斯人线上娱乐 18

说明:

namespace ConsoleApp1
{
    class CustomTask
    {
        public int Id { get; set; }
    }
}
  • 运用BlockingCollection()构造函数实例化BlockingCollection,意味着不安装boundedCapacity,那么boundedCapacity为默许值:
    int.马克斯Value。
  • 分界:使用BlockingCollection(int
    boundedCapacity),设置boundedCapacity的值,当集合容积达到这一个值得时候,向BlockingCollection添新币素的线程将会被卡住,直到有成分被删去。

CustomTask

分界功用可决定内部存储器中集合最大尺寸,那对于急需处理大批量要素的时候卓殊管用。

威尼斯人线上娱乐 19威尼斯人线上娱乐 20

  • 暗中同意意况下,BlockingCollection封装了叁个ConcurrentQueue。能够在构造函数中内定贰个兑现了IProducerConsumerCollection接口的出现集合,包含:ConcurrentStack、ConcurrentBag。
  • 应用此聚众包罗易于无界定等待的危机,所以接纳TryTake尤其,因为TryTake提供了晚点控制,钦定的时光内得以从集合中移除有些项,则为
    true;不然为 false。
using System;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    static class Module
    {
        public static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }
    }
}

 

Module

5)ConcurrentDictionary:可由多少个线程同时做客的键值对的线程安全集合。

威尼斯人线上娱乐 21威尼斯人线上娱乐 22

主要措施

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Using a Queue inside of BlockingCollection");
            Console.WriteLine();
            Task t = RunProgram();
            t.Wait();

            //Console.WriteLine();
            //Console.WriteLine("Using a Stack inside of BlockingCollection");
            //Console.WriteLine();
            //Task t = RunProgram(new ConcurrentStack<CustomTask>());
            //t.Wait();
        }

        static async Task RunProgram(IProducerConsumerCollection<CustomTask> collection = null)
        {
            var taskCollection = new BlockingCollection<CustomTask>();
            if (collection != null)
                taskCollection = new BlockingCollection<CustomTask>(collection);
            //初始化collection中的数据
            var taskSource = Task.Run(() => TaskProducer(taskCollection));

            Task[] processors = new Task[4];
            for (int i = 1; i <= 4; i++)
            {
                string processorId = "Processor " + i;
                processors[i - 1] = Task.Run(
                    () => TaskProcessor(taskCollection, processorId));
            }

            await taskSource;

            await Task.WhenAll(processors);
        }
        /// <summary>
        /// 初始化collection中的数据
        /// </summary>
        /// <param name="collection"></param>
        /// <returns></returns>
        static async Task TaskProducer(BlockingCollection<CustomTask> collection)
        {
            for (int i = 1; i <= 20; i++)
            {
                await Task.Delay(20);
                var workItem = new CustomTask { Id = i };
                collection.Add(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
            collection.CompleteAdding();
        }
        /// <summary>
        /// 打印collection中的数据
        /// </summary>
        /// <param name="collection"></param>
        /// <param name="name"></param>
        /// <returns></returns>
        static async Task TaskProcessor(
            BlockingCollection<CustomTask> collection, string name)
        {
            await Module.GetRandomDelay();
            foreach (CustomTask item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine("Task {0} has been processed by {1}", item.Id, name);
                await Module.GetRandomDelay();
            }
        }
    }
}
  • AddOrUpdate(TKey key, 电视alue addValue, Func<TKey, 电视机alue,
    电视alue>
    updateValueFactory);如若钦点的键尚不存在,则将键/值对丰富到
    字典中;若是钦命的键已存在,则更新字典中的键/值对。
  • GetOrAdd(TKey key, 电视alue
    value);要是钦赐的键尚不存在,则将键/值对拉长到字典中。
  • TryRemove(TKey key, out TValue
    value);尝试从字典中移除并重临具有钦赐键的值。
  • TryUpdate(TKey key, 电视alue newValue, 电视机alue
    comparisonValue);将内定键的水保值与钦定值实行相比较,倘诺相等,则用第十三个值更新该键。

Program

说明:

7、使用ThreadStatic特性
  ThreadStatic天性是最简易的TLS使用,且只支持静态字段,只须要在字段上标记那么些特点就能够了

  • ConcurrentDictionary对于读操作是截然无锁的。当三个职责或线程向里面添英镑素或改动数据的时候,ConcurrentDictionary使用细粒度的锁。使用细粒度的锁只会锁定真正必要锁定的有个别,而不是整个字典。

威尼斯人线上娱乐 23威尼斯人线上娱乐 24

 

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        //TLS中的str变量
        //可以看到,str静态字段在两个线程中都是独立存储的,互相不会被修改。
        [ThreadStatic]
        static string str = "hehe";

        static void Main(string[] args)
        {
            //另一个线程只会修改自己TLS中的hehe
            Thread th = new Thread(() => { str = "Mgen"; Display(); });
            th.Start();
            th.Join();
            Display();
        }
        static void Display()
        {
            Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, str);
        }

    }
}

6)IProducerConsumerCollection:概念供生产者/消费者用来操作线程安全集合的主意。
此接口提供1个集合的意味(为劳动者/消费者集合),从而更高级别抽象如
System.Collections.Concurrent.BlockingCollection<T>能够应用集合营为基础的存储机制。

Program

 

八、使用命名的LocalDataStoreSlot类型
  显明ThreadStatic天性只援救静态字段太受限制了。.NET线程类型中的LocalDataStoreSlot提供更好的TLS支持。咱们先来看看命名的LocalDataStoreSlot类型,能够通过Thread.AllocateNamedDataSlot来分配三个命名的半空中,通过Thread.FreeNamedDataSlot来销毁四个命名的空中。空间数据的得到和设置则透过Thread类型的GetData方法和SetData方法。

叁.常用方式

威尼斯人线上娱乐 25威尼斯人线上娱乐 26

壹)并行的劳动者-消费者方式

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            //创建Slot
            LocalDataStoreSlot slot = Thread.AllocateNamedDataSlot("slot");

            //设置TLS中的值
            Thread.SetData(slot, "hehe");

            //修改TLS的线程
            Thread th = new Thread(() =>
            {
                Thread.SetData(slot, "Mgen");
                Display();
            });

            th.Start();
            th.Join();
            Display();

            //清除Slot
            Thread.FreeNamedDataSlot("slot");
        }

        //显示TLS中Slot值
        static void Display()
        {
            LocalDataStoreSlot dataslot = Thread.GetNamedDataSlot("slot");
            Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, Thread.GetData(dataslot));
        }

    }
}

定义:

Program

生成者和买主是此格局中的两类对象模型,消费者依赖于劳动者的结果,生产者生成结果的同时,消费者应用结果。

玖、使用未命名的LocalDataStoreSlot类型
  线程同样支撑未命名的LocalDataStoreSlot,未命名的LocalDataStoreSlot不须求手动清除,分配则须要Thread.AllocateDataSlot方法。注意由于未命名的LocalDataStoreSlot没盛名称,由此不能够运用Thread.GetNamedDataSlot方法,只可以在五个线程中援引同一个LocalDataStoreSlot才方可对TLS空间实行操作,将方面包车型地铁命名的LocalDataStoreSlot代码改成未命名的LocalDataStoreSlot执行

威尼斯人线上娱乐 27 

威尼斯人线上娱乐 28威尼斯人线上娱乐 29

图壹 并行的劳动者-消费者方式

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        //静态LocalDataStoreSlot变量
        static LocalDataStoreSlot slot;

        static void Main(string[] args)
        {
            //创建Slot
            slot = Thread.AllocateDataSlot();

            //设置TLS中的值
            Thread.SetData(slot, "hehe");

            //修改TLS的线程
            Thread th = new Thread(() =>
            {
                Thread.SetData(slot, "Mgen");
                Display();

            });

            th.Start();
            th.Join();
            Display();
        }

        //显示TLS中Slot值
        static void Display()
        {
            Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, Thread.GetData(slot));
        }

    }
}

说明:

Program

  • 并发集合用在此形式下格外确切,因为并发集合协理此格局中目的的并行操作。
  • 若不选择并发集合,那么就要加入一起机制,从而使程序变得比较复杂,难于保养和精通,同时大大下降质量。
  • 上海教室为劳动者消费者形式示意图,纵轴为时间轴,生成者与买主的并不在一条时间线上,但互相有6续,目的在于注明生成者首发生结果,而后消费者才真的使用了生成者发生的多寡。

10、使用.NET 4.0的ThreadLocal<T>类型
  .NET
四.0在线程方面插手了好多事物,在那之中就包含ThreadLocal<T>类型,他的面世更大的简化了TLS的操作。ThreadLocal<T>类型和Lazy<T>惊人相似,构造函数参数是Func<T>用来创设对象(当然也足以精通成靶子的暗中同意值),然后用Value属性来得到只怕设置那一个目的。
  ThreadLocal的操作或多或少有点像上面的未命名的LocalDataStoreSlot,但ThreadLocal感觉更简洁更好通晓。

2)流程格局

威尼斯人线上娱乐 30威尼斯人线上娱乐 31

定义:

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static ThreadLocal<string> local;

        static void Main(string[] args)
        {
            //创建ThreadLocal并提供默认值
            local = new ThreadLocal<string>(() => "hehe");

            //修改TLS的线程
            Thread th = new Thread(() =>
            {

                local.Value = "Mgen";
                Display();
            });

            th.Start();
            th.Join();
            Display();
        }

        //显示TLS中数据值
        static void Display()
        {
            Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
        }

    }
}

流程由八个阶段构成,每一种阶段由一种类的生产者和顾客构成。一般来讲前3个品级是后二个品级的生成者;依靠相邻四个阶段之间的缓冲区队列,每一个阶段能够并发执行。

Program

威尼斯人线上娱乐 32 

 

图二 并行的流程格局

说明:

  • 常使用BlockingCollection<T>作为缓冲罐区队列。
  • 流程的快慢近似等于流水生产线最慢阶段的进度。
  • 上海教室为流水生产线情势示意图,前一等级为后一等级的生成者,那里展现了极致简练和大旨的流程形式,更复杂的格局能够认为是各样阶段都不外乎了对数据更加多的处理进程。

四 使用方法

仅以ConcurrentBag和BlockingCollection为例,别的的面世集合与之相似。

ConcurrentBag

1 List<string> list = ......
2 ConcurrentBag<string> bags = new ConcurrentBag<string>();
3 Parallel.ForEach(list, (item) => 
4 {
5     //对list中的每个元素进行处理然后,加入bags中
6     bags.Add(itemAfter);
7 });

BlockingCollection—生产者消费者形式

 1 public static void Execute()
 2 {
 3             //调用Invoke,使得生产者任务和消费者任务并行执行
 4             //Producer方法和Customer方法在Invoke中的参数顺序任意,不论何种顺序都会获得正确的结果
 5             Parallel.Invoke(()=>Customer(),()=>Producer());
 6             Console.WriteLine(string.Join(",",customerColl));
 7 }
 8 
 9 //生产者集合
10 private static BlockingCollection<int> producerColl = new BlockingCollection<int>();
11  //消费者集合
12 private static BlockingCollection<string> customerColl = new BlockingCollection<string>();
13 
14 public static void Producer()
15 {
16             //循环将数据加入生成者集合
17             for (int i = 0; i < 100; i++)
18             {
19                 producerColl.Add(i);
20             }
21 
22             //设置信号,表明不在向生产者集合中加入新数据
23             //可以设置更加复杂的通知形式,比如数据量达到一定值且其中的数据满足某一条件时就设置完成添加
24             producerColl.CompleteAdding();
25 }
26 
27 public static void Customer()
28 {
29             //调用IsCompleted方法,判断生产者集合是否在添加数据,是否还有未"消费"的数据
30             //注意不要使用IsAddingCompleted,IsAddingCompleted只表明集合标记为已完成添加,而不能说明其为空
31             //而IsCompleted为ture时,那么IsAddingCompleted为ture且集合为空
32             while (!producerColl.IsCompleted)
33             {
34                 //调用Take或TryTake "消费"数据,消费一个,移除一个
35                 //TryAdd的好处是提供超时机制
36                 customerColl.Add(string.Format("消费:{0}", producerColl.Take()));
37             }
38 }

转发与引用请注脚出处。

日子仓促,水平有限,如有不当之处,欢迎指正。


相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图