精通
英语
和
开源
,
擅长
开发
与
培训
,
胸怀四海
第一信赖
锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容。
环形缓冲是解决一个生产者多个消费者问题的最佳解决方案,以前开发IPTV流媒体直播服务器就用到这样的机制,最近开发通信软件,对于数据组织开发继续加强下,翻译本文提高学习下。
我需要一个环形缓冲区来跟踪时间帧内的事件计数,比如一分钟。因为帧内可能有数以万计的事件,所以将计数量化为帧的某个样本大小是理想的。例如,如果帧是一分钟,量化每秒的计数意味着环形缓冲区只需要管理 60 个样本。作为一个环形缓冲区,帧之外的任何样本都会被丢弃并添加新的样本——这意味着如果帧是 1 分钟,则在 1 分钟之后记录的任何样本都将归零。出于我们的目的,总样本计数保持不变,即在任何给定时间点帧内量化计数的总和。
上面的屏幕截图显示了事件计数快照的条形图,在这种情况下,来自不同客户端的 Web API 请求 - 水平条是每分钟的请求数,垂直轴(名称已编辑)是客户端的名称请求。
全文:
using System; using System.Collections.Generic; using System.Linq; namespace QuantizedTemporalFrameRingBuffer { public class QBuffer { protected int runningCount = 0; protected int[] qFrameCounts; protected int sampleSize; protected Func<DateTime, int> calcQ;
protected Func<DateTime, DateTime, int> resetPastSamples;
protected DateTime lastEvent; ////// Get the frame as copy of the internal frame, mainly for unit tests. /// public int[] Frame => qFrameCounts.ToArray(); public int GetRunningCount() { lock (locker) { return runningCount; } } // Used to allow Capture to be run on a separate thread from CountEvent. // Particularly useful when the counting HTTP events and // separately monitoring the sample set. protected object locker = new object();
public QBuffer(int frameSize, int quantization, Func<DateTime, int> calcQ, Func<DateTime, DateTime, int> resetPastSamples)
{ Assertion.That(frameSize > 0, "frameSize cannot be negative or 0."); Assertion.That(quantization > 0, "quantization cannot be negative or 0."); Assertion.That(frameSize % quantization == 0, "frameSize must be divisible by quantization without a remainder."); Assertion.NotNull(calcQ, "calculation of Q cannot be null."); Assertion.NotNull(resetPastSamples, "reset of past samples cannot be null."); lastEvent = DateTime.Now; sampleSize = frameSize / quantization; qFrameCounts = new int[sampleSize]; this.calcQ = calcQ; this.resetPastSamples = resetPastSamples; } public void Reset(DateTime dt) { int q = calcQ(dt); int resetCount = Math.Min(sampleSize, resetPastSamples(lastEvent, dt)); // We only reset up to the sample size. // This handles situations where the time elapsed between events // is greater than the frame size for the specified quantization. // We effectively drop off the last n quantized samples, // where n is the quantized frame size. // We limit this to the sample size, in case the current event // occurs at some point > frame size. // At all times, the "past samples" are the very next samples. while (resetCount > 0) { int pastQ = (q + resetCount) % sampleSize; runningCount -= qFrameCounts[pastQ]; qFrameCounts[pastQ] = 0; --resetCount; } } public void CountEvent(DateTime dt) { lock (locker) { int q = calcQ(dt); ++runningCount; ++qFrameCounts[q]; lastEvent = dt; } } public (int total, Listsamples) Capture() { lock (locker) { var ret = (runningCount, qFrameCounts.ToList()); return ret; } } } }
该用法需要定义一个回调来确定量化索引 (Q), 并确定有多少过去的事件在环形缓冲区量化样本大小之外。
一个简单的例子是一个 60 秒的环形缓冲区和 1 秒的量化:
using System; namespace QuantizedTemporalFrameRingBuffer { class Program { static void Main(string[] args) { // Create a 1 minute ring buffer quantized at one second. Buffer buffer = new Buffer(60, 1, CalcQ, ResetQ); } static int CalcQ(DateTime dt) { // This is easy, because we're sampling one for 1 minute and quantized at one second, // so Q is by its very nature simply the Second within the time component. return dt.Second; } static int ResetQ(DateTime lastEvent, DateTime now) { // Again, straightforward because we're quantizing per second. return (int)(now - lastEvent).TotalSeconds; } } }
回调的原因是环形缓冲区不需要知道您如何计算量化索引和需要重置的“插槽”数量。例如,您可能需要一个环形缓冲区来跟踪 24 小时内的事件,并在一小时内进行量化。构造函数的前两个参数:
public Buffer(int frameSize, int quantization,
与时间“单元”无关,而是与帧大小和所需量化相关联。用于计算的回调Q和需要重置的插槽数决定了所需的使用情况。
注:委托做为参数,让程序更灵活,是大家需要关注的。
在我将其连接到的 WCF 应用程序中,客户端每分钟的 Web 请求被跟踪如下:
public static Dictionary<string, QBuffer> buffers = new Dictionary<string, QBuffer>();
... QBuffer qbuffer; if (!buffers.TryGetValue(client, out qbuffer)) { qbuffer = new QBuffer(60, 1, dt => dt.Second, (lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds); buffers[tenant] = qbuffer; } DateTime now = DateTime.Now; qbuffer.Reset(now); qbuffer.CountEvent(now);
乍一看,这个Reset电话可能看起来很奇怪。这样做是将过去的事件槽归零,在这种情况下是一分钟。它执行回调:
(lastEvent, eventTime) => ( int )(eventTime - lastEvent).TotalSeconds
因此:
while (resetCount > 0) { int pastQ = (q + resetCount) % sampleSize; runningCount -= qFrameCounts[pastQ]; qFrameCounts[pastQ] = 0; --resetCount; }
并且运行计数会减少每个插槽中的事件计数。
类似地,在检索当前事件帧时,“当前”表示请求获取事件的时间。因此,在检索事件的 API 端点中——请记住,这是我使用它的 WCF 应用程序。
public class ClientHitCount { public string Client { get; set; } public int Count { get; set; } } public Stream GetQBufferRunningCounts() { var clients = GetClientList(); var samples = clients.Select(c => { var buffer = buffers[c]; buffer.Reset(DateTime.Now); return new ClientHitCount() { Client = c, Count = buffers[c].GetRunningCount() }; }); return Response.AsJson(samples); }
同样,请注意Reset首先调用。假设最后一个事件发生在两分钟前——当我们调用获取帧中发生的事件数量时,我们“来不及”两分钟,所以Reset调用的目的是使环形缓冲区与当前时间范围。
可以使用AnyChart在网页上完成非常简单的渲染:
<!DOCTYPE html>
<html height="100%">
<head>
<title>Events</title>
<script src="https://cdn.anychart.com/releases/8.0.0/js/anychart-base.min.js"></script>
</head>
<body height="100%">
<div id="container" style="width: 100%; height: 90vh"></div>
<script>
function getData(callback) {
var xhttp = new XMLHttpRequest();
xhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
callback(this.responseText);
}
};
xhttp.open("GET", "[your endpoint]", true);
xhttp.send();
}
function updateChart(chart, strJson) {
console.log();
let json = JSON.parse(strJson);
let fjson = json.filter(r => r.Count > 0);
// let fjson = json; // use this line to include 0 counts
let data = {rows: fjson.map(r => [r.Client, r.Count])};
chart.data(data);
chart.draw();
}
anychart.onDocumentReady(function() {
var chart = anychart.bar();
chart.container('container');
chart.barsPadding(10);
setInterval(() => {
let data = getData(json => updateChart(chart, json));
}, 1000);
});
</script>
</body>
</html>
一些有用的单元测试。
[TestMethod] public void EventsInOneQIndexTest() { QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds); DateTime now = DateTime.Now; buffer.Reset(now); buffer.CountEvent(now); buffer.Reset(now); buffer.CountEvent(now); buffer.Reset(now); buffer.CountEvent(now); // Note that the count is in the current "second" slot. int index = now.Second; Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3."); Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3."); }
[TestMethod] public void EventsInTwoQIndicesTest() { QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds); DateTime now = DateTime.Now; DateTime next = now.AddSeconds(1); buffer.Reset(now); buffer.CountEvent(now); buffer.Reset(next); buffer.CountEvent(next); buffer.Reset(next); buffer.CountEvent(next); int index = now.Second; Assert.IsTrue(buffer.Frame[index] == 1, "Expected a count of 1."); Assert.IsTrue(buffer.Frame[index + 1] == 2, "Expected a count of 2."); Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3."); }
[TestMethod] public void FrameClearedTest() { QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds); DateTime now = DateTime.Now; DateTime next = now.AddSeconds(1); DateTime frameNext = next.AddSeconds(60); buffer.Reset(now); buffer.CountEvent(now); // 1 at now buffer.Reset(next); buffer.CountEvent(next); // 2 at now + 1 buffer.Reset(next); buffer.CountEvent(next); buffer.Reset(frameNext); buffer.CountEvent(frameNext); // 3 at now + 61 buffer.Reset(frameNext); buffer.CountEvent(frameNext); buffer.Reset(frameNext); buffer.CountEvent(frameNext); int index = frameNext.Second; Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3."); Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3."); }
这里没什么好说的——这是一种简单但有用的量化事件的方法,以减少内存使用并创建所需持续时间的样本帧大小。