锐英源软件
第一信赖

精通

英语

开源

擅长

开发

培训

胸怀四海 

第一信赖

当前位置:锐英源 / 开源技术 / C#开源 / C#算法开源英语 / .NET时间帧量化线程安全环形缓冲区
联系方式
固话:0371-63888850
手机:138-0381-0136
Q Q:396806883
微信:ryysoft
服务方向
人工智能数据处理
人工智能培训
kaldi数据准备
小语种语音识别
语音识别标注
语音识别系统
语音识别转文字
kaldi开发技术服务
软件开发
运动控制卡上位机
机械加工软件
软件开发培训
Java 安卓移动开发
VC++
C#软件
汇编和破解
驱动开发

锐英源精品原创,禁止全文或局部转载,禁止任何形式的非法使用,侵权必究。点名“简易百科”和闲暇巴盗用锐英源原创内容。

.NET时间帧量化线程安全环形缓冲区


背景

环形缓冲是解决一个生产者多个消费者问题的最佳解决方案,以前开发IPTV流媒体直播服务器就用到这样的机制,最近开发通信软件,对于数据组织开发继续加强下,翻译本文提高学习下。

 

 

我需要一个环形缓冲区来跟踪时间帧内的事件计数,比如一分钟。因为帧内可能有数以万计的事件,所以将计数量化为帧的某个样本大小是理想的。例如,如果帧是一分钟,量化每秒的计数意味着环形缓冲区只需要管理 60 个样本。作为一个环形缓冲区,帧之外的任何样本都会被丢弃并添加新的样本——这意味着如果帧是 1 分钟,则在 1 分钟之后记录的任何样本都将归零。出于我们的目的,总样本计数保持不变,即在任何给定时间点帧内量化计数的总和。

上面的屏幕截图显示了事件计数快照的条形图,在这种情况下,来自不同客户端的 Web API 请求 - 水平条是每分钟的请求数,垂直轴(名称已编辑)是客户端的名称请求。

环形缓冲区代码

全文:

C#
using System;
using System.Collections.Generic;
using System.Linq;

namespace QuantizedTemporalFrameRingBuffer
{
  public class QBuffer
  {
    protected int runningCount = 0;
    protected int[] qFrameCounts;
    protected int sampleSize;
    protected Func<DateTime, int> calcQ;
protected Func<DateTime, DateTime, int> resetPastSamples;
protected DateTime lastEvent; /// /// Get the frame as copy of the internal frame, mainly for unit tests. /// public int[] Frame => qFrameCounts.ToArray(); public int GetRunningCount() { lock (locker) { return runningCount; } } // Used to allow Capture to be run on a separate thread from CountEvent. // Particularly useful when the counting HTTP events and // separately monitoring the sample set. protected object locker = new object();
  public QBuffer(int frameSize, int quantization,
              Func<DateTime, int> calcQ, Func<DateTime, DateTime, int> resetPastSamples)
    {
      Assertion.That(frameSize > 0, "frameSize cannot be negative or 0.");
      Assertion.That(quantization > 0, "quantization cannot be negative or 0.");
      Assertion.That(frameSize % quantization == 0, 
                "frameSize must be divisible by quantization without a remainder.");
      Assertion.NotNull(calcQ, "calculation of Q cannot be null.");
      Assertion.NotNull(resetPastSamples, "reset of past samples cannot be null.");

      lastEvent = DateTime.Now;
      sampleSize = frameSize / quantization;
      qFrameCounts = new int[sampleSize];
      this.calcQ = calcQ;
      this.resetPastSamples = resetPastSamples;
    }

    public void Reset(DateTime dt)
    {
      int q = calcQ(dt);
      int resetCount = Math.Min(sampleSize, resetPastSamples(lastEvent, dt));

      // We only reset up to the sample size.
      // This handles situations where the time elapsed between events 
      // is greater than the frame size for the specified quantization.
      // We effectively drop off the last n quantized samples, 
      // where n is the quantized frame size.
      // We limit this to the sample size, in case the current event 
      // occurs at some point > frame size.
      // At all times, the "past samples" are the very next samples.
      while (resetCount > 0)
      {
        int pastQ = (q + resetCount) % sampleSize;
        runningCount -= qFrameCounts[pastQ];
        qFrameCounts[pastQ] = 0;
        --resetCount;
      }
    }

    public void CountEvent(DateTime dt)
    {
      lock (locker)
      {
        int q = calcQ(dt);
        ++runningCount;
        ++qFrameCounts[q];

        lastEvent = dt;
      }
    }

    public (int total, List samples) Capture()
    {
      lock (locker)
      {
        var ret = (runningCount, qFrameCounts.ToList());

        return ret;
      }
    }
  }
}

初始化

该用法需要定义一个回调来确定量化索引 (Q), 并确定有多少过去的事件在环形缓冲区量化样本大小之外。

一个简单的例子是一个 60 秒的环形缓冲区和 1 秒的量化:

C#
using System;

namespace QuantizedTemporalFrameRingBuffer
{
  class Program
  {
    static void Main(string[] args)
    {
      // Create a 1 minute ring buffer quantized at one second.
      Buffer buffer = new Buffer(60, 1, CalcQ, ResetQ);
    }

    static int CalcQ(DateTime dt)
    {
      // This is easy, because we're sampling one for 1 minute and quantized at one second, 
      // so Q is by its very nature simply the Second within the time component.
      return dt.Second;
    }

    static int ResetQ(DateTime lastEvent, DateTime now)
    {
      // Again, straightforward because we're quantizing per second.
      return (int)(now - lastEvent).TotalSeconds;
    }
  }
}

回调的原因是环形缓冲区不需要知道您如何计算量化索引和需要重置的“插槽”数量。例如,您可能需要一个环形缓冲区来跟踪 24 小时内的事件,并在一小时内进行量化。构造函数的前两个参数:

C#
public Buffer(int frameSize, int quantization, 

与时间“单元”无关,而是与帧大小和所需量化相关联。用于计算的回调Q和需要重置的插槽数决定了所需的使用情况。

注:委托做为参数,让程序更灵活,是大家需要关注的。

一个真实的例子

在我将其连接到的 WCF 应用程序中,客户端每分钟的 Web 请求被跟踪如下:

C#
public static Dictionary<string, QBuffer> buffers = new Dictionary<string, QBuffer>();
...
QBuffer qbuffer;

if (!buffers.TryGetValue(client, out qbuffer))
{
  qbuffer = new QBuffer(60, 1, dt => dt.Second, 
            (lastEvent, eventTime) => (int)(eventTime - lastEvent).TotalSeconds);
  buffers[tenant] = qbuffer;
}

DateTime now = DateTime.Now;
qbuffer.Reset(now);
qbuffer.CountEvent(now);

乍一看,这个Reset电话可能看起来很奇怪。这样做是将过去的事件槽归零,在这种情况下是一分钟。它执行回调:

C#
(lastEvent, eventTime) => ( int )(eventTime - lastEvent).TotalSeconds

因此:

  1. 如果我们仍在当前槽中(最后一个事件与当前事件发生在同一秒内),则没有任何内容被触及。
  2. 如果经过的秒数大于0,则这些时隙归零:
C#
while (resetCount > 0)
{
  int pastQ = (q + resetCount) % sampleSize;
  runningCount -= qFrameCounts[pastQ];
  qFrameCounts[pastQ] = 0;
  --resetCount;
}

并且运行计数会减少每个插槽中的事件计数。

类似地,在检索当前事件帧时,“当前”表示请求获取事件的时间。因此,在检索事件的 API 端点中——请记住,这是我使用它的 WCF 应用程序。

C#
public class ClientHitCount
{
  public string Client { get; set; }
  public int Count { get; set; }
}

public Stream GetQBufferRunningCounts()
{
  var clients = GetClientList();

  var samples = clients.Select(c =>
  {
    var buffer = buffers[c];
    buffer.Reset(DateTime.Now);

    return new ClientHitCount() { Client = c, Count = buffers[c].GetRunningCount() };
  });

  return Response.AsJson(samples);
}

同样,请注意Reset首先调用。假设最后一个事件发生在两分钟前——当我们调用获取帧中发生的事件数量时,我们“来不及”两分钟,所以Reset调用的目的是使环形缓冲区与当前时间范围。

一个简单的页面

可以使用AnyChart在网页上完成非常简单的渲染:

<!DOCTYPE html>
<html height="100%">
<head>
<title>Events</title>
<script src="https://cdn.anychart.com/releases/8.0.0/js/anychart-base.min.js"></script>
</head>
<body height="100%">
<div id="container" style="width: 100%; height: 90vh"></div>
<script>
function getData(callback) {
var xhttp = new XMLHttpRequest();
xhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
callback(this.responseText);
}
};

xhttp.open("GET", "[your endpoint]", true);
xhttp.send();
}

function updateChart(chart, strJson) {
console.log();
let json = JSON.parse(strJson);
let fjson = json.filter(r => r.Count > 0);
// let fjson = json; // use this line to include 0 counts
let data = {rows: fjson.map(r => [r.Client, r.Count])};
chart.data(data);
chart.draw();
}

anychart.onDocumentReady(function() {
var chart = anychart.bar();
chart.container('container');
chart.barsPadding(10);

setInterval(() => {
let data = getData(json => updateChart(chart, json));
}, 1000);
});
</script>
</body>
</html>

单元测试

一些有用的单元测试。

Q 测试中的事件

C#
[TestMethod]
public void EventsInOneQIndexTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(now);
  buffer.CountEvent(now);

  // Note that the count is in the current "second" slot.
  int index = now.Second; 

  Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

两个连续 Q 检验中的事件

C#
[TestMethod]
public void EventsInTwoQIndicesTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  DateTime next = now.AddSeconds(1);

  buffer.Reset(now);
  buffer.CountEvent(now);

  buffer.Reset(next);
  buffer.CountEvent(next);

  buffer.Reset(next);
  buffer.CountEvent(next);

  int index = now.Second;

  Assert.IsTrue(buffer.Frame[index] == 1, "Expected a count of 1.");
  Assert.IsTrue(buffer.Frame[index + 1] == 2, "Expected a count of 2.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

帧清除测试

C#
[TestMethod]
public void FrameClearedTest()
{
  QBuffer buffer = new QBuffer(60, 1, dt => dt.Second, 
          (lastEvent, currentEvent) => (int)(currentEvent - lastEvent).TotalSeconds);

  DateTime now = DateTime.Now;
  DateTime next = now.AddSeconds(1);
  DateTime frameNext = next.AddSeconds(60);
  buffer.Reset(now);
  buffer.CountEvent(now);       // 1 at now

  buffer.Reset(next);
  buffer.CountEvent(next);      // 2 at now + 1
  
  buffer.Reset(next);
  buffer.CountEvent(next);

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext); // 3 at now + 61

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext);

  buffer.Reset(frameNext);
  buffer.CountEvent(frameNext);

  int index = frameNext.Second;

  Assert.IsTrue(buffer.Frame[index] == 3, "Expected a count of 3.");
  Assert.IsTrue(buffer.GetRunningCount() == 3, "Expected a count of 3.");
}

结论

这里没什么好说的——这是一种简单但有用的量化事件的方法,以减少内存使用并创建所需持续时间的样本帧大小。

友情链接
版权所有 Copyright(c)2004-2021 锐英源软件
公司注册号:410105000449586 豫ICP备08007559号 最佳分辨率 1024*768
地址:郑州大学北校区院(文化路97号院)内劳动服务器公司办公楼一层