精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
最近开发项目要缓冲数据,让数据库保存缓冲数据,涉及到多线程,从codeproject找了个例子太复杂,自己开发只需要缓冲跨线程传递,不过抱着学习态度,翻译学习下。要点是生产者消费者多线程模型多任务模型的思路,对不知道怎么锻炼多线程能力的朋友帮助极大。
多核时代——每个人都听说过——Microsoft .NET、Intel TBB 中的并行扩展,甚至新的 C++ 标准都包括对线程的基本支持。现在我们的小型台式机(笔记本电脑)或小型服务器拥有多核可能性——软件将探索这些优势。
数据和任务并行性——管道处理。数据被分成块,在多个阶段并行处理。每个阶段可以由多个正在运行的线程执行。这个概念类似于流水线、超标量处理器架构。每个处理阶段可以存在任意数量的任务。
当任务耗时并且可以轻松并行化(任务袋模式)时,该解决方案是可用的。当数据从外部(文件、数据库)到达并且读取不是处理器负荷操作时,它也可以使用。在这种情况下,额外的线程可以对已经读取的数据块执行处理(当下一个块读取正在进行时)。
每个项目都分配了数据和 ID。ID 应是唯一且连续的(对于OrderedItemsPool 类)。这是某种票证算法。下面的代码展示了 3 个阶段处理(一个生产者,多个处理任务,一个消费者)的示意性解决方案。
static void Producer(ItemsPoolbufout) { try { int i = 0; while (i < maxNumberOfItems) { ItemData v; v.Value = bufout.DequeueEmpty(); v.Value = … v.Id = i; bufout.EnqueueData(v); ++i; } bufout.EnqueueEndOfData(i); } catch (Exception e) { bufout.SetError("Producer problem", e); } }
static void Processing(ItemsPoolsInOut<t1,> bufsInOut)
{ try { T2 outData; ItemDatainData; while (bufsInOut.DequeueEmptyAndData(out inData, out outData)) { outData = performSomeProcessing(inData.Value); bufsInOut.EnqueueDataAndEmpty(outData, inData); } } catch (Exception e) { bufsInOut.SetError("Processing problem", e); } }
static void Consumer(ItemsPool<t2> bufin)
{ try { ItemDatadata; while (bufin.DequeueData(out data)) { //process data.Value ... bufin.EnqueueEmpty(data.Value); } } catch (Exception e) { bufin.SetError("Consumer problem", e); } } static void Main(string[] args) { ItemsPool inputBuffers = new OrderedItemsPool (inBuffersNumber); ItemsPool outputBuffers = new OrderedItemsPool (outBuffersNumber); Thread first = new Thread(() => Producer(inputBuffers)); first.Start(); for (int i = 0; i < calcTaskNumber; ++i) Task.Create((o) =>
{ Processing(new ItemsPoolsInOut<t1,>(inputBuffers, outputBuffers)); });
Thread consumer = new Thread(() => Consumer(outputBuffers)); consumer.Start(); consumer.Join(); if (outputBuffers.IsError) { Console.WriteLine(outputBuffers.Error); } }
一般使用重锁。:) 解决方案是基于监视器。我们这里有 2 个监视器——第一个用于对空项堆栈的操作,第二个用于对数据项队列的操作。
当生产者线程需要新的空项目(DequeueEmpty 方法)时,它可以从空项目池中返回或创建,但对要创建的项目的最大数量有限制(内存限制)。当没有空闲的空缓冲区并且无法创建更多项目时,线程必须等待空监视器的条件变量。当不再需要 item 时,必须将其返回到空池(EnqueueEmpty 方法)或显式释放(DropEmpty 方法)。返回或释放由消费者线程完成,而由生产者线程出队变成空。
当生产者用数据填充空项时,应将其放入数据队列(EnqueueData 方法)。另一方面,消费者线程使用DequeueData 方法获取项目。当不存在数据或读取已订购(OrderedItemsPool 类)并且适当的项目尚未到达时,读取可能会被阻塞。在这种情况下,线程等待完整监视器的条件变量。
在OrderedItemsPool 类的情况下,项目存储在优先级队列(SortedList 使用的类)中,对于UnorderedItemsPool 类使用普通Queue 类。
当所有项目都产生后,线程应标记数据结束。这是使用 EnqueueEndOfData 方法完成的。信息在所有任务之间传播,它们可以停止处理——当没有更多要处理的项目时DequeueData 方法返回false
EndOfData 是id号最大的特殊物品。EndOfData存储后 ,无法存储具有较大 id 的项目(EndOfDataException 将出现)。
在多个线程中完成的工作会遇到错误传播问题。当一个线程出现异常并且该线程是处理链的一部分时,应通知其他线程(任务)。否则会出现死锁。因此,应使用SetError 方法捕获异常并将其传递到通信链中。之后,ErrorInProcessingException 将出现在所有处理数据的任务中作为一个问题的信号。原始异常存储为内部异常。
IsError 可以将使用池的错误和Error 属性通知主线程。属性的值应从主线程正在等待的线程使用的池中获取(参见下面的代码):
consumer.Join(); if (outputBuffers.IsError) { Console.WriteLine(outputBuffers.Error); }
当任务位于处理链的中间时,重要的是先获取输出项,然后再输入项,以避免死锁。当没有可用的输出项时会出现死锁(并且在处理链中的顺序应保留 -OrderedItemsPool 类)。当输入池的项目多于输出池时,这种情况很容易看到。添加了帮助类ItemsPoolsInOut 作为此类案例(DequeueEmptyAndData 方法)的示例解决方案。
此外,有序项目池(OrderedItemsPool 类)永远不会出现在无序项目池(UnorderedItemsPool 类)之后,否则可能会发生死锁。
所提出的解决方案的用途是处理大文件,即加密和压缩。加密是用RijndaelManaged 类实现的,而压缩是用DeflateStream 类实现的。数据以 512 KB 的块读取、处理并写入输出文件。第二个程序允许对数据进行解压缩和解密。
工作分为 4 组:读取器(一个线程)、加密器(多个任务、多个线程)、压缩器(多个任务、多个线程)和写入器(一个线程)。第二个程序(解压缩器、解密器)看起来类比。
注意每组任务使用不同的TaskManager 对象,否则可能会死锁。此外,读取器和写入器是单独的线程。
当然,将压缩(解压缩)和加密(解密)放在一个任务中(没有额外的缓冲区和同步)可能更明智,但这是一个示例,只是为了展示可能性。
此外,使用 deflate 的压缩是废话——我知道——这只是一个例子——如果需要真正的压缩,请替换算法。:)
该代码使用 Microsoft Parallel Extensions Jun08 CTP,因此需要下载并安装此库。
实现的附加说明 -DeflateStream 类构造函数具有类leaveOpen 中不存在的参数CryptoStream ,因此streams 不能重用。这在设计上似乎很不一致——也许一些微软架构师可以解释为什么?