锐英源软件
第一信赖

精通

英语

开源

擅长

开发

培训

胸怀四海 

第一信赖

当前位置:锐英源 / 开源技术 / C#开源 / C#算法开源英语 / .NET多线程缓冲池
联系方式
固话:0371-63888850
手机:138-0381-0136
Q Q:396806883
微信:ryysoft
服务方向
人工智能数据处理
人工智能培训
kaldi数据准备
小语种语音识别
语音识别标注
语音识别系统
语音识别转文字
kaldi开发技术服务
软件开发
运动控制卡上位机
机械加工软件
软件开发培训
Java 安卓移动开发
VC++
C#软件
汇编和破解
驱动开发

.NET多线程缓冲池


背景

最近开发项目要缓冲数据,让数据库保存缓冲数据,涉及到多线程,从codeproject找了个例子太复杂,自己开发只需要缓冲跨线程传递,不过抱着学习态度,翻译学习下。要点是生产者消费者多线程模型多任务模型的思路,对不知道怎么锻炼多线程能力的朋友帮助极大。

 

 

多核时代——每个人都听说过——Microsoft .NET、Intel TBB 中的并行扩展,甚至新的 C++ 标准都包括对线程的基本支持。现在我们的小型台式机(笔记本电脑)或小型服务器拥有多核可能性——软件将探索这些优势。

提出解决方案的职责

  • 管理数据项池(可能保留或不保留项目顺序)
  • 管理空项目池——重用项目(当这些是大缓冲区时很有用)

特征

  • 多生产者-消费者模式的有界缓冲区问题解决方案。也可以归类为共享内存异步消息传递
  • 易于使用的解决方案——所有锁定机制都隐藏在里面
  • 版本:
    • unordered – 获取物品的顺序无关紧要 – 多个消费者(一个或多个消费者)
    • ordered– 应保留物品的取用顺序 – 一位消费者(一位或多位生产者)
  • 限制内存使用
  • 避免频繁的内存分配和释放——更可预测的运行时间(内存分配时间不可预测)
  • 使用 Parallel Extensions 运行线程的数量可以轻松自动地适应硬件(内核数量)

数据和任务并行性——管道处理。数据被分成块,在多个阶段并行处理。每个阶段可以由多个正在运行的线程执行。这个概念类似于流水线、超标量处理器架构。每个处理阶段可以存在任意数量的任务。

 

 

当任务耗时并且可以轻松并行化(任务袋模式)时,该解决方案是可用的。当数据从外部(文件、数据库)到达并且读取不是处理器负荷操作时,它也可以使用。在这种情况下,额外的线程可以对已经读取的数据块执行处理(当下一个块读取正在进行时)。

主意

每个项目都分配了数据和 ID。ID 应是唯一且连续的(对于OrderedItemsPool 类)。这是某种票证算法。下面的代码展示了 3 个阶段处理(一个生产者,多个处理任务,一个消费者)的示意性解决方案。

static void Producer(ItemsPool bufout)
{
    try
    {
        int i = 0;
        while (i < maxNumberOfItems)
        {
            ItemData v;
            v.Value = bufout.DequeueEmpty();
            v.Value = …
            v.Id = i;
            bufout.EnqueueData(v);
            ++i;
        }
        bufout.EnqueueEndOfData(i);
    }
    catch (Exception e)
    {
        bufout.SetError("Producer problem", e);
    }
}

static void Processing(ItemsPoolsInOut<t1,> bufsInOut)
{
    try
    {
        T2 outData;
        ItemData inData;

        while (bufsInOut.DequeueEmptyAndData(out inData, out outData))
        {
            outData = performSomeProcessing(inData.Value);
            bufsInOut.EnqueueDataAndEmpty(outData, inData);
        }
    }
    catch (Exception e)
    {
        bufsInOut.SetError("Processing problem", e);
    }
}

static void Consumer(ItemsPool<t2> bufin)
{
    try
    {
        ItemData data;

        while (bufin.DequeueData(out data))
        {
            //process data.Value

            ...

            bufin.EnqueueEmpty(data.Value);
        }
    }
    catch (Exception e)
    {
        bufin.SetError("Consumer problem", e);
    }
}

static void Main(string[] args)
{
    ItemsPool inputBuffers = new OrderedItemsPool(inBuffersNumber);
    ItemsPool outputBuffers = new OrderedItemsPool(outBuffersNumber);

    Thread first = new Thread(() => Producer(inputBuffers));
    first.Start();

    for (int i = 0; i < calcTaskNumber; ++i)
        Task.Create((o) => 
{ Processing(new ItemsPoolsInOut<t1,>(inputBuffers, outputBuffers)); });
    Thread consumer = new Thread(() => Consumer(outputBuffers));
    consumer.Start();

    consumer.Join();

    if (outputBuffers.IsError)
    {
        Console.WriteLine(outputBuffers.Error);
    }
}

这个怎么运作

一般使用重锁。:) 解决方案是基于监视器。我们这里有 2 个监视器——第一个用于对空项堆栈的操作,第二个用于对数据项队列的操作。

当生产者线程需要新的空项目(DequeueEmpty 方法)时,它可以从空项目池中返回或创建,但对要创建的项目的最大数量有限制(内存限制)。当没有空闲的空缓冲区并且无法创建更多项目时,线程必须等待空监视器的条件变量。当不再需要 item 时,必须将其返回到空池(EnqueueEmpty 方法)或显式释放(DropEmpty 方法)。返回或释放由消费者线程完成,而由生产者线程出队变成空。

当生产者用数据填充空项时,应将其放入数据队列(EnqueueData 方法)。另一方面,消费者线程使用DequeueData 方法获取项目。当不存在数据或读取已订购(OrderedItemsPool 类)并且适当的项目尚未到达时,读取可能会被阻塞。在这种情况下,线程等待完整监视器的条件变量。

在OrderedItemsPool 类的情况下,项目存储在优先级队列(SortedList 使用的类)中,对于UnorderedItemsPool 类使用普通Queue 类。

敲定

当所有项目都产生后,线程应标记数据结束。这是使用 EnqueueEndOfData 方法完成的。信息在所有任务之间传播,它们可以停止处理——当没有更多要处理的项目时DequeueData 方法返回false

EndOfData 是id号最大的特殊物品。EndOfData存储后 ,无法存储具有较大 id 的项目(EndOfDataException 将出现)。

错误

在多个线程中完成的工作会遇到错误传播问题。当一个线程出现异常并且该线程是处理链的一部分时,应通知其他线程(任务)。否则会出现死锁。因此,应使用SetError 方法捕获异常并将其传递到通信链中。之后,ErrorInProcessingException 将出现在所有处理数据的任务中作为一个问题的信号。原始异常存储为内部异常。

IsError 可以将使用池的错误和Error 属性通知主线程。属性的值应从主线程正在等待的线程使用的池中获取(参见下面的代码):

consumer.Join();
if (outputBuffers.IsError)
{
   Console.WriteLine(outputBuffers.Error);
}

死锁

当任务位于处理链的中间时,重要的是先获取输出项,然后再输入项,以避免死锁。当没有可用的输出项时会出现死锁(并且在处理链中的顺序应保留 -OrderedItemsPool 类)。当输入池的项目多于输出池时,这种情况很容易看到。添加了帮助类ItemsPoolsInOut 作为此类案例(DequeueEmptyAndData 方法)的示例解决方案。

此外,有序项目池(OrderedItemsPool 类)永远不会出现在无序项目池(UnorderedItemsPool 类)之后,否则可能会发生死锁。

问题

  • 未返回池(或显式删除)的项目 - 可能导致死锁。通过从空池中赋值可以轻松解决该问题(不幸的是内存限制)。
  • 重锁 - .NET 监视器比进程间互斥锁或信号量更轻,但仍然有很多同步 - 是否有更无锁的解决方案可用?
  • 发生异常时后台(分离)线程的问题
  • 处理停止未实现

例子

所提出的解决方案的用途是处理大文件,即加密和压缩。加密是用RijndaelManaged 类实现的,而压缩是用DeflateStream 类实现的。数据以 512 KB 的块读取、处理并写入输出文件。第二个程序允许对数据进行解压缩和解密。

工作分为 4 组:读取器(一个线程)、加密器(多个任务、多个线程)、压缩器(多个任务、多个线程)和写入器(一个线程)。第二个程序(解压缩器、解密器)看起来类比。

注意每组任务使用不同的TaskManager 对象,否则可能会死锁。此外,读取器和写入器是单独的线程。

当然,将压缩(解压缩)和加密(解密)放在一个任务中(没有额外的缓冲区和同步)可能更明智,但这是一个示例,只是为了展示可能性。

此外,使用 deflate 的压缩是废话——我知道——这只是一个例子——如果需要真正的压缩,请替换算法。:)

该代码使用 Microsoft Parallel Extensions Jun08 CTP,因此需要下载并安装此库。

实现的附加说明 -DeflateStream 类构造函数具有类leaveOpen 中不存在的参数CryptoStream ,因此streams 不能重用。这在设计上似乎很不一致——也许一些微软架构师可以解释为什么?

友情链接
版权所有 Copyright(c)2004-2021 锐英源软件
公司注册号:410105000449586 豫ICP备08007559号 最佳分辨率 1024*768
地址:郑州大学北校区院(文化路97号院)内劳动服务器公司办公楼一层