锐英源软件
第一信赖

精通

英语

开源

擅长

开发

培训

胸怀四海 

第一信赖

当前位置:锐英源 / 开源技术 / C#开源 / WPF开源 / WPF和MVVM开发实时交易应用
服务方向
人工智能数据处理
人工智能培训
kaldi数据准备
小语种语音识别
语音识别标注
语音识别系统
语音识别转文字
kaldi开发技术服务
软件开发
运动控制卡上位机
机械加工软件
软件开发培训
Java 安卓移动开发
VC++
C#软件
汇编和破解
驱动开发
联系方式
固话:0371-63888850
手机:138-0381-0136
Q Q:396806883
微信:ryysoft

锐英源精品翻译,原文出自codeproject

WPF和MVVM开发实时交易应用

界面的刷新机制决定了界面的刷新性能,也是软件考虑的主要指标,本文对刷新性能的提升有帮助使用,特意翻译,自学和共享,欢迎大家收藏和关注锐英源。另外对于Mvvmlight、Prism和Toolkit.mvvm的开发原理有兴趣的朋友,也可以看这篇翻译文章。如果爱好者对代码有兴趣,但是看不懂,也可以找锐英源软件进行解答。

--------------------------------------

序幕

2009 年 7 月 1 日,我接到一个招聘代理打来的电话,承诺给我一个“我无法拒绝的提议”。我不知道他是如何知道我的姓名、电话号码或工作地点的,但他似乎认为我和 WPF/MVVM 之间存在非常紧密的联系,他在第二个 M 上不断重复提高声音。有一次,我很高兴被钉在某家银行中台 IT 的“不太容易到达的楼层”。我正在从事的项目具有战略意义,但毫无意义,几乎完美无缺,所以我决定不会轻易让步。

为了避免他对我的 MVVM 连接的怀疑,我同意与客户会面,在快速交叉检查后,我的 MVVM 参与变得明显。判决在此之后不久交付,我已签约构建“使用 WPF/MVVM 的实时低延迟多线程前台交易应用程序”

要求

我的新经理很准确、简洁,并且不失时机地向我提出了一系列要求。他们来了:

  • 应用程序需要始终响应,“非常快”地显示交易数据,永不挂起、冻结、崩溃或内存不足。

我已经多次移动办公桌试图隐藏(我之前成功使用的技术),但我的经理每次都找到我并追着我更新。我别无选择,只能继续我的任务。所以我“google了”,显然“响应性和快速”与实时或低延迟有关(我们将在一分钟内详细了解这一点),“挂起、冻结和崩溃”是复杂的研究领域自适应系统,我决定将复合 UI 放在最上面,因为它是现代 UI 系统的必需品。

所以我的技术要求是:

  • 实时可靠性
  • 适应性
  • 可扩展性、可扩展性和模块化(这都是一回事,但听起来更好用一句话放在一起)

实时系统开发速成课程

术语“实时系统”可以指代很多不同的事物。RT 系统的特性之一是它们知道它们运行的​​硬件并使用非常低级的编程语言来访问特定于硬件的调用。它们通常直接在硬件(无操作系统)上运行,或者在实时管理人员或实时操作系统的帮助下运行。现在,因为它们被设计为在特定硬件上运行,所以它们不容易扩展、便携等。这就是为什么通用设计模式不适用于实时系统的原因——它们“太通用了”。

然而,RT 系统的主要特性是它们在设计时具有“实时约束”。这并不意味着它们运行得很快(这只是低延迟),而是所有任务都保证在定义的“实时约束”内完成执行。即我保证在 50 毫秒内显示任何新到达的数据。

因此,每个 RT 系统的核心都是实时调度算法。总体思路是系统中的所有任务都可以拆分为周期性的或零星的。定期是众所周知的,我们知道它们什么时候开始,它们运行多长时间(预算或截止日期),它们的优先级。零星的任务可以随时开始,我们只能在它们需要被中断之前分配估计的预算(处理器时间)。一些更广为人知的算法是“基于优先级的抢先调度程序”和“最早截止日期优先”。顾名思义,最早的截止日期首先是按照截止日期(必须执行任务的时间)来组织任务。基于优先级的算法按照优先级的顺序组织任务,如果具有更高优先级的任务已经到达,则中断任务。基于优先级的算法之一是速率单调调度器。下面是它的工作原理:

速率单调调度器将所有周期性任务放入等待队列。在新周期开始时,调度程序将检查等待队列并将选定的任务移动到“准备运行”队列中。此时,如果“准备运行队列”中有任何任务的期限小于当前正在调度的任务的期限,则调度程序将执行上下文切换。速率单调算法将累积零星任务并将它们作为指定周期任务的批处理运行,因为它们被认为是较低优先级。

众所周知,从 .NET 4.0 开始,微软发布了任务并行库 (TPL),允许开发人员定义任务并编写自定义任务计划程序来运行它们。这是我们实现实时框架的理想场所。我们将定义PeriodicTask 和SporadicTask,它们是继承自 System.Threading.Task 类的类,我们将实现我们自己PriorityTaskScheduler类,它继承自 TaskScheduler类。由于我们可以缓冲到达的市场数据(在 Rx 的帮助下)并定期将其发送到 GUI,我们称这些任务为周期性的。任何用户交互(点击按钮等)都无法预测,因此我们称这些任务是零星的。我们将使用速率单调算法作为基础,唯一的区别在于我们处理零星任务的方式。我的要求是“GUI 需要始终响应”,这就是为什么我认为零星任务比任何周期性任务都具有更高的优先级,并且会首先运行它们。

复杂的自适应系统

虽然我看过很多关于复杂系统应该如何适应不同条件、监控资源、自我验证和恢复组件的研究材料,但我在 UI 世界中并没有看到很多这样的实现。所以我打算在框架中构建一些这样的功能,希望这足以满足我“永不挂起、冻结或崩溃”的要求。

由于我使用的是复合或模块化 UI,因此很容易实现一些组件监控功能。主 shell 与其模块建立连接并定期发送心跳以确保系统的所有部分都响应。如果任何模块无法响应,我们将需要从内存中卸载此屏幕,并在可能的情况下重新加载它的新实例。我将把每个视图作为一个新的 Window 类来运行,以便可以为其分配自己的调度程序。这样,如果一个特定的视图“挂起”,那么 UI 的其余部分仍然是“活跃的”。我们的主外壳“从不挂起或冻结”,只是因为我们没有在它的调度程序上做任何工作。它的工作是加载系统的组件,监控心跳并重新加载其他模块/视图。

每个视图还将被分配一个MaintenanceEngineer类,其工作是监视视图的内存使用情况、发送心跳和监视调度程序队列。它可以决定我们是否落后于定期更新或用户交互(零星任务)。

复合用户界面

当涉及到复合 UI WPF 开发人员时,选择就被宠坏了。出于我的目的,我将选择范围缩小到三个框架,即 Prism、Marlon Grech 的 MeffedMVVM 和 Sacha Barber 的 Cinch。我只是遵循这样一个原则,即使用一种来源是剽窃,但不止一种是研究工作。这很适合我,因为我只是要为我的实时框架挑选一些有用的功能。

Prism

Prism 最好的一点是……嗯……模块化。Bootstrapper这是在加载模块的类的帮助下实现的。我更喜欢 Marlon 使用 MEF 的方式,所以我想我们会从他的 MeffedMVVM 中捏出来。第二个最好的事情是……解耦……通过 EventAggregator类实现。再一次,我更喜欢 Sacha 的Mediator方式,此方式是他从 Karl Shifflett 的作品中提取的类,后者从 Marlon 的作品中提取,Marlon 从 Josh Smith 的作品中提取(但不一定按此顺序)。我还将对其进行一些更改,我们稍后可以讨论。还有一些我不会使用的区域适配器(因为它在我看来是过度设计并且非常无用)和 Unity 我也不会使用,因为我可以使用 MEF 来实例化我的对象而我不会想像 Unity 那样构建长依赖链(我有这个想法,ViewModels应该和Services 完全解耦,只通过Mediator订阅主题的地方进行通信,并向这些主题广播异步消息 - 有点像 GUI 企业总线,请耐心等待,也许我可以稍后解释得更好)。所以事实上,我们从 Prism 学习,也可能很肤浅。

MeffedMVVM

在 MeffedMVVM 中,Marlon 使用 MEF 链接Views和ViewModels. 我将使用他的想法,但我将使用 MEF 来解决ViewModels和Services. 它们不需要相互引用,因为它们都引用了单例Mediator类,该类提供发布/订阅机制并选择 TaskScheduler运行任务。还有在设计时定义的强大的“一个视图 - 一个视图模型”连接,因此无需在运行时动态链接它们。Marlon 还为他的视图定义了“设计时”行为,因此每个控件都有一些可以在 Blend 中呈现的虚拟数据。我将使用这个想法,但将其应用于服务而不是视图。所以……每个服务都有自己的 MockService 版本,通过简单的配置更改,您应该能够使用真实或模拟服务运行 GUI。这允许您在服务器关闭(如他们所做的那样)或服务器端组件未准备好时在 GUI 上运行和工作(我认为这不是 Marlon 的“设计时数据”的意思,但无论如何...... .) 所以我们要从 MeffedMVVM 借用......好吧 MEF 和 MVVM

Cinch

现在这是我们要从中提取很多东西的地方。所以...谢谢萨莎。我们从 WeakEventAction、WeakAction、EventToCommandTrigger和稍作修改Mediator 开始。它们非常出色,我无需重复它们的工作原理,因为您可以参考Sacha Barber 在 Cinch上的原始帖子。 

MVVM 之类的

正如您可能已经猜到的那样,我将使用 MVVM,所以有一些相关类SimpleCommand和AttachedProperties,它们现在可能无法追踪这些作者,所以无论如何我都会将它们归因于 Sacha Barber。

另一个值得一提的类是Entity. 是一个古老的 .NET 技巧,自定义PropertyDescriptor和TypedList支持.NET 将数据绑定到网格。因此,所有数据都作为Entity对象传递,如果您需要有关其工作原理的更多详细信息,请搜索“property bag .NET”。

我还使用 Reactive Extensions (Rx) 来限制和合并到达的数据。

设计

嗯……我不得不说这家银行的餐厅绝对糟糕,所以我决定继续我的设计。

因此,在所有常见的 MVVM 角色出现的情况下 -Views、ViewModels和Services--我们也有Shell,它仅加载/卸载模块并监控心跳率/资源等的模块。我们有实现业务特定逻辑的模块。每个模块都有Views、RibbonView、ViewModels和ServiceObservers 。Shell启动Bootstrapper时, 将使用 MEF 发现磁盘上的模块。它将添加RibbonViews到 Shell 的内容,在单独的 Windows 中加载Views(每个都在自己的调度程序上)并创建Services . 它还将实例化Mediator,ViewModels和服务“注册他们的兴趣”到Mediator里。所以会发生什么事,Mediator有一个主题列表,定义了系统中所有可能的流程。如果正在针对特定主题广播消息,它将保留要运行的代表列表。

因此ViewModels和Services都加载到内存中,之间都通过 Mediator消息总线聊天,Mediator 选择一个TaskScheduler保证实时更新,ViewModels更新Views,Shell控制MaintenanceEngineers以确保每个人都玩得很好。

注:这上面说的架构已经超出了MVVM的层次。

勺子?没有勺子 (xaml.cs)

我正在(和我自己)辩论是否Bootstrapper应该解决View 或者ViewModel,这让我们回到旧的论点是 V 应该引用 VM 还是 VM 应该引用视图。老实说,只要你选择你的模式并坚持下去,我认为没有太大的不同。所有 MVVM、MVC 和 MVP 都非常相似,但有一些细微的差别(请参阅我之前的 MVVM 帖子)。无论如何,关键是 MVVM 的主要目标是允许“混合能力”,而这对我来说只意味着一件事——没有 xaml.cs。有很多很好的框架,人们仍然会在 xaml.cs 中编写,因此您需要在 VM 和 xaml.cs 之间切换以查找内容的位置,这会导致一些混乱。

所以我从字面上理解了“没有 xaml.cs”,然后.. 删除了 xaml.cs。因此,如果您查看该项目,您只会发现没有相应 xaml.cs 的 xaml 文件。这是可能的,因为您真正需要从 .cs 文件中唯一需要的是调用 Initialise() 并分配 DataContext,这是我在 .cs 文件中所做的BaseViewModel。因此,每个人都有效地ViewModel创建了它View,初始化了它的内容并分配了 DataContext,这对我来说很有意义,因为它的 VM 的工作是控制 View

在另一次“更新会议”之后,我的经理说服我,我仍然没有看到更大的图景。就是这样——更大的图景:

而且因为我认为绘制“大图”图是孩子们的游戏——我让我四岁和三岁的孩子为我做这件事(嗯..实际上不是我自己做的:()

 

因此,如果我们从后向前开始,数据输入的第一个点就是服务。所有服务都有真实的和模拟的实现类,可以由 MEF 发现和加载。在我的例子中,真正的服务被加载在一个单独的进程中,并通过 WCF NamedPipes 进行通信,其中模拟服务只是被主应用程序引用。您需要做的就是更改 VS 中的解决方案配置以构建真实或模拟版本。这是通过在构建后更改配置文件值的构建后脚本来实现的。这是在不连接到服务器的情况下测试/开发应用程序的好方法。服务接收数据并建立键值对数据合约。这些事件正在被 Rx “爆炸”和观察。然后,它们将被限制并以定期观察的时间间隔将其扔到 GUI 上(这是我们实时约束的第一部分)。通常,服务和服务观察者之间是一对一的连接,但不一定。一个ServiceObserver可以观察多个服务并使用 Rx Merge 扩展来呈现这些事件,就好像它们来自同一个源一样(看看 BaseServiceObserver类)。 如果新版本在前一个版本发送之前到达,ServiceObserver则会合并任何数据。它将构建Entity 并EntityBuilder使用Mediator将这些广播Tasks给任何感兴趣的人。

因此Entities将进入Mediator将找到已注册兴趣的ViewModel,并找到它需要运行的委托。Mediator将创建一个任务来封装要运行的委托以及该委托的新到达数据。Mediator将该任务传递给适当TaskScheduler的。LongRunningTaskScheduler将使用处理器关联,在单独的线程上运行所有任务。因此,所有服务仅在一个处理器上处理其数据,而将其他处理器留给PriorityTaskScheduler. 如果你还记得我们所说的实时系统知道它们运行的​​硬件吗?因此,在我的情况下,我知道我将使用四核机器并使用一系列测试,我确定当所有服务都在一个处理器上运行并且其他三个由PriorityTaskScheduler 运行定期和零星的任务。您可以自行调整应用程序和使用的硬件的性能。(因为 TS 是系统中所有数据流的单点入口,所以很容易在其中记录所有需要的统计信息)

 

PriorityTaskScheduler将启动三个线程(每个内核一个)来处理传入的任务。Tasks将在等待队列中排队,并且每个定义的时间段都移动到准备运行队列(如果需要,执行上下文切换)。现在,我们将运行的委托是在ViewModels注册他们对特定主题的兴趣时定义的方法。它将通知视图 Entity已更新。

传统上,它必须要求INotifyPropertyChanged这样做。这种方法存在问题。任何 Windows 老前辈都知道“UI 元素只能在拥有它们的线程上更新”。这实际上是不正确的,或者我们应该说不完全正确……应该改写为“GUI 框架开发人员希望您在拥有它们的线程上更新 UI 元素”。因此 WPF 开发人员并没有什么不同,因此他们强迫我们 INotifyPropertyChanged 从属性设置器调用并在 Dispatcher.BeginInvoke每次我们需要更新属性时执行。这显然对我们不起作用,因为Entity在许多不同的线程上不断实时更新,所以想象一下我们是否Dispatcher.BeginInvoke一直在调用?GUI 根本不会“非常敏感”。所以我们要让Entity在多个线程上进行更新,并移动INotifyPropertyChanged到一个单独的方法中,只要我们准备好通知View. 我可以看到你们中的一些人开始考虑潜在的竞争条件,这是真的(在一定程度上)我们显示的属性的值可能与属性的当前值不同(另一个线程进来了并更改它)......但是,由于我们保证 INotifyPropertyChanged在指定的实时约束(比如说 50 毫秒)内调用,因此我们保证用户将看到最新的数据,因为它以最大 50 毫秒的延迟到达我们。对 INPC 的实际调用是在内部完成的 NotifyCollection(我们稍后会更详细地了解它)所以 VM 开发人员需要做的就是调用AddOrUpdate方法,就是这样容易。

现在还记得我说过通用设计模式并不总是适用于实时应用程序吗?这肯定是这种情况,因为在纯 WPF UI 架构中,Entity不了解 UI 特定元素,即背景颜色等。但是由于我们不关心模式而是关心实时,我们可以节省大量 UI通过提前做大量工作来缩短线程时间。这就是为什么要Entity知道它的单元格内容、颜色、大小并在后台线程上分配它们只是为了稍后通知 UI 线程。因此,Entity保证 UI 有界元素和 GUI 的脉动不断更新的缓存能够在实时约束内得到最新状态的通知。

这就是数据在 WPF 实时框架中的流动方式。事件(来自服务器)变成实体(合并和限制),然后变成任务(保证实时执行)。

 

通过允许数据自由流动而不用担心锁但保证 RT 约束相反,我们能够实现“非常响应的 GUI”和“非常快速的更新”这几乎是更大的图景。我只想强调一些值得为您未来的项目提升的主要课程。他们来了:

关键类

阻塞循环缓冲区

我从 C++ BOOST 库CircularBuffer类中提取了这个,但将其用作BlockingCollection.NET 中的一个。所以它实际上类似于BlockingCollection。唯一的区别是,它不像BlockingCollection它使用内存效率更高的固定大小数组。

C#
...
        public void Enqueue(T item)
        {
            lock (_lock)
            {
                while (IsFull() || _writingSuspended)
                {
                    Monitor.Wait(_lock);
                }
                _buffer[_writerIndex] = item;
                _writerIndex++;
                _writerIndex %= _size;
                Monitor.Pulse(_lock);
            }
        }
...
        public T Dequeue()
        {
            T item;
            lock (_lock)
            {
                while (IsEmpty())
                {
                    Monitor.Wait(_lock);
                }
                item = _buffer[_readerIndex];
                _readerIndex++;
                _readerIndex %= _size;
                Monitor.Pulse(_lock);
            }
            return item;
        }
...

通知集合

这个类实现INotifyCollectionChanged了,所以我们可以直接OnCollectionChanged加入 NotifyCollectionChangedEventArgs以AddOrUpdate最小化更新次数。这使我们可以将数据存储在其中Dictionary(因此搜索起来更容易和更快)并且只抛出OnCollectionChanged而不搜索和更新集合的元素

C#
public class NotifyCollection : Collection, ITypedList, INotifyCollectionChanged where T : SelfExplained
{
     ...
    public void AddOrUpdate(IEnumerable items, bool isReplace)
    {
        foreach (var item in items)
        {
            if (!Contains(item))
            {
                Add(item);
                OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item));
            }
            else
            {
                if (isReplace)
                {
                    OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Replace, item, item));
                }
                else
                {
                    foreach (var property in item.GetPropertyNames())
                    {
                        item.OnPropertyChanged(property);
                    }
                }
            }
        }
    }
     ...
}

WpfGridColumn

这就是绑定魔法发生的地方

C#
...
        protected override FrameworkElement GenerateEditingElement(DataGridCell cell, object dataItem)
        {
            TextBox editElement = new TextBox {BorderThickness = new Thickness(0.0), Padding = new Thickness(0.0)};

            System.Windows.Data.Binding textBinding = new System.Windows.Data.Binding(cell.Column.Header + ".DisplayValue")
                                                          {Source = dataItem};
            editElement.SetBinding(TextBox.TextProperty, textBinding);

            System.Windows.Data.Binding backgroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Background")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.BackgroundProperty, backgroundBinding);

            System.Windows.Data.Binding foreGroundBinding = new System.Windows.Data.Binding(cell.Column.Header + ".Foreground")
                                                                {Source = dataItem};
            editElement.SetBinding(TextBlock.ForegroundProperty, foreGroundBinding);

            return editElement;
        }
...

Mediator

关于这门课已经说得够多了。你知道它是做什么的......将任务分发给注册用户......并选择正确的TaskScheduler

C#
...
        public bool Broadcast(string key, params object[] message)
        {
            List wr;
            lock (_registeredHandlers)
            {
                if (!_registeredHandlers.TryGetValue(key, out wr))
                    return false;
            }

            foreach (var cb in wr)
            {
                Delegate action = cb.GetMethod();
                if (action == null) continue;
                switch (cb.TaskType)
                {
                    case TaskType.Background:
                        // check if already running
                        if (!_baseQueue.ContainsKey(cb.Target.Target))
                        {
                            var bgTask = _backgroundTaskFactory.StartNew(() => action.DynamicInvoke(message));
                            _baseQueue.Add(cb.Target.Target, bgTask);
                        }
                        break;

                    case TaskType.Periodic:
                        var periodicTask = new PeriodicTask(() => action.DynamicInvoke(message), _budget);
                        periodicTask.Start(_preemptiveScheduler);
                        break;

                    case TaskType.Sporadic:
                        // one Periodic to run all sporadics
                        var sporadicTask = new SporadicTask(() => action.DynamicInvoke(message), _budget);
                        sporadicTask.Start(_preemptiveScheduler);
                        break;
                    case TaskType.LongRunning:
                        //UI
                        _staTaskFactory.StartNew(() => action.DynamicInvoke(message));
                        break;
                }

            }

            lock (_registeredHandlers)
            {
                wr.RemoveAll(wa => wa.HasBeenCollected);
            }

            return true;
        }
...

优先任务计划程序

实现速率单调算法。等待队列,准备运行队列..我们不是已经讨论过了吗?

C#
...
       public PriorityTaskScheduler()
        {
            //TODO: create category
            //_messageCounter = new PerformanceCounter();
            _tsp = Convert.ToInt32(ConfigurationManager.AppSettings["TASK_SCHEDULER_PERIOD"]);
            _buferSize = Convert.ToInt32(ConfigurationManager.AppSettings["BUFFER_SIZE"]);
            _threshold = Convert.ToDouble(ConfigurationManager.AppSettings["SUSPENSION_THRESHOLD"]);
            _readyToRunQueue = new BlockingCircularBuffer(_buferSize);
            _waitingQueue = new ConcurrentQueue();

            var executor = new System.Timers.Timer(_tsp);
            executor.Elapsed += WaitingQueueToReadyToRun;
            executor.Start();

            for (int i = 0; i < Environment.ProcessorCount; i++)
            {
                _backgroundTaskFactory.StartNew(() =>
                {
                    while (true)
                    {
                        var task = _readyToRunQueue.Dequeue();
#if DEBUG
                        _log.Debug("Message Dequeued");
                        //_messageCounter.Decrement();
#endif    
                        if (TryExecuteTask(task))
                        {
                            var span = DateTime.UtcNow - task.TimeCreated;
                            if (DateTime.UtcNow > task.Deadline)
                            {
                                _log.Warn(String.Format("Real-time Deadline exceeded : {0}", span.TotalMilliseconds));
                                //throw new ApplicationException("Real-time Deadline exceeded");
                            }
#if DEBUG
                            _log.Debug("Message Done");
#endif
                        }
                    }
                });
            }    
        }

        private void WaitingQueueToReadyToRun(object sender, System.Timers.ElapsedEventArgs e)
        {
            Task task;
            while (_waitingQueue.TryDequeue(out task))
            {
                // check budget and invoke a context switch
                PeriodicTask headTask;
                if (task is PeriodicTask)
                {
                    headTask = (PeriodicTask)task;
                    var nextToRun = _readyToRunQueue.Peek();

                    if ((nextToRun != null) && (nextToRun.Status == TaskStatus.WaitingToRun)
                        && headTask.Deadline < nextToRun.Deadline)
                    {
                        _log.Info("Context switching at: " + DateTime.UtcNow);
                        var dequeuedTask = _readyToRunQueue.Dequeue();
                        _readyToRunQueue.Enqueue(headTask);
                        _readyToRunQueue.Enqueue(dequeuedTask);
                    }

                    _readyToRunQueue.Enqueue(headTask);
                }  
            }
        }

        protected override void QueueTask(Task task)
        {
#if DEBUG
            _log.Debug("Message Enqueued");
            //_messageCounter.Increment();
#endif
            // jump the queue
            if (task is SporadicTask)
            {
                TryExecuteTask(task);
                _log.Info("Sporadic jumped the queue at: " + DateTime.UtcNow);
                return;
            }
            
            _waitingQueue.Enqueue(task); 
        }
...

BaseServiceObserver

使用 Rx 合并数据

C#
...
        public virtual void AddServicesToObserve(IEnumerable services)
        {
             _entityIndex = new ConcurrentDictionary<ulong, Entity>();

            var observer = services.Select(s => 
                Observable..FromEvent<EventArgs<DataRecord>>(h => s.DataReceived += h, h => s.DataReceived -= h))
                .ToList()
                .Merge();

            observer.Select(x => x.EventArgs.Value)
                    .Where(x => x.DataRecordKey != null)
                    .BufferWithTime(TimeSpan.FromMilliseconds(_od))
                    .Where(x => x.Count > 0)
                    .Subscribe(DataReceived, LogError);
        }
...

基础视图模型

实现View且调用Initialise(),这样我们就可以摆脱 xaml.cs

C#
...
        protected BaseViewModel(string view, bool vmResolvesView, bool signedMaintenanceContract)
        {
            Log = LogManager.GetLogger(view);

            // ViewModel resolves the view
            if (vmResolvesView)
            {
                var attr = (ViewAttribute)GetType().GetCustomAttributes(typeof(ViewAttribute), true).FirstOrDefault();
                if (attr == null) throw new ApplicationException("ViewAttribute is missing");

                ViewReference = (FrameworkElement)Activator.CreateInstance(attr.ViewType);

                Uri resourceLocater = new Uri("/" + attr.ViewType.Module.ScopeName.Replace(".dll", "") + ";component/Views/" +
                                                attr.ViewType.Name + ".xaml", UriKind.Relative);
                Application.LoadComponent(ViewReference, resourceLocater);
                ViewReference.DataContext = this;
            }
            // View already resolved the viewModel

            if (ViewReference is Window)
            {
                ((Window)ViewReference).Left = Convert.ToDouble(ViewReference.GetValue(WindowProperties.LeftProperty));
                ((Window)ViewReference).Top = Convert.ToDouble(ViewReference.GetValue(WindowProperties.TopProperty));
                ((Window)ViewReference).Width = Convert.ToDouble(ViewReference.GetValue(WindowProperties.WidthProperty));
                ((Window)ViewReference).Height = Convert.ToDouble(ViewReference.GetValue(WindowProperties.HeightProperty));
            }

            if (signedMaintenanceContract)
                DispatcherFacade = new MaintenanceEngineer(view, Log, Dispatcher);

        }
...

维护引擎

监控调度程序队列并在队列增长时调用 DoEvents 扩展

C#
...
        private long _operationsQueueCount = 0;
        private readonly ConcurrentDictionary<DispatcherOperation, object> _operations = new ConcurrentDictionary<DispatcherOperation, object>();  

        private void Heartbeat(long l)
        {
            var heartbeat = new Heartbeat(_dynamicViewName, String.Format("{0} View heartbeat sent at: {1}", _dynamicViewName, DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, false);
            Action w = () => Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            AddToDispatcherQueue(w);
        }

        private void MonitorDispatcherQueue(long l)
        {
            if (_operationsQueueCount != 0)
                _log.Info(String.Format("Dispatcher Operations In Queue {0}, ", _operationsQueueCount));

            if (_operationsQueueCount > _qs)
            {
                _log.Info("Pushing all Dispatcher operations");
                Application.Current.DoEvents();
                _operations.Clear();
                Interlocked.Exchange(ref _operationsQueueCount, 0);
            }
        }


        #region IDispatcherFacade Members

        public void AddToDispatcherQueue(Delegate workItem)
        {
            var operation = _dispatcher.BeginInvoke(DispatcherPriority.Background, workItem);

            operation.Completed += (s, o) =>
            {
                Interlocked.Decrement(ref _operationsQueueCount);
                object t;
                _operations.TryRemove((DispatcherOperation)s, out t);
            };
            _operations.TryAdd(operation, null);
            Interlocked.Increment(ref _operationsQueueCount);

        }
        #endregion
...

实时服务

一些 WCF 的东西

C#
...
        public RealService()
        {
            // out channel
            _channelFactory = new ChannelFactory(
                new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/SubscriptionService");
            _proxy = _channelFactory.CreateChannel();

            // in channel
            ServiceHost host = new ServiceHost(new ConnectionListener(this));
            host.AddServiceEndpoint(typeof(IRemotePublishingService),
                 new NetNamedPipeBinding(NetNamedPipeSecurityMode.None) { MaxReceivedMessageSize = 5000000, MaxBufferSize = 5000000 },
                "net.pipe://WPFRealTime/Bond/PublishingService");
            host.Open();
        }

        private void SendData(DataRecord data)
        {
            EventHandler<EventArgs<DataRecord>> handler = DataReceived;
            if (handler != null)
            {
                handler(this, new EventArgs(data));
            }
        }

        #region IService Members
        [RegisterInterest(Topic.BondServiceGetData, TaskType.Background)]
        public void GetData()
        {
            try
            {
                _proxy.GetData(new RequestRecord() { AssetType = AssetType });
            }
            catch (Exception ex)
            {
                throw new ApplicationException("WCF channel is closed", ex);
            }
        }

        public event EventHandler<EventArgs<DataRecord>> DataReceived;
...

BondServiceObserver

请注意,在“GUI 拥有”实体传递给 Dispatcher 之前,我们可以更改实体

C#
[Export(typeof(BaseServiceObserver))]
public class BondServiceObserver : BaseServiceObserver
{
    public BondServiceObserver()
    {
        AddEventExploder(EventExploder);
    }

    public override void AddServicesToObserve(IEnumerable services)
    {
        //pass filter
        base.AddServicesToObserve(services.Where(s => s.AssetType == AssetType.Bond));
    }

    public void EventExploder(IEnumerable messages)
    {
        // apply rules
        RuleEngine.ApplyRules(messages);

        // broadcast news or updates using Mediator EventExploder
        Mediator.GetInstance.Broadcast(Topic.BondServiceDataReceived, messages);
    }
}

BondViewModel

示例视图模型

C#
...
        public BondViewModel() : base("Bond Module", true, true)
        {
            DynamicViewName = "Bond Module";
            _grid = GetRef("MainGrid");
 
            // InputManager.Current.PreProcessInput += new PreProcessInputEventHandler(Current_PreProcessInput);
            // InputManager.Current.PostProcessInput += new ProcessInputEventHandler(Current_PostProcessInput);

            Entities = new NotifyCollection<Entity>(EntityBuilder.LoadMetadata(AssetType.Common, AssetType.Bond));
                          CreateColumnsCommand = new SimpleCommand<Object, EventToCommandArgs>((parameter) => true, CreateColumns);  
		}

        [RegisterInterest(Topic.BondServiceDataReceived, TaskType.Periodic)]
        private void DataReceived(IEnumerable entities)
        {
            Action w = () => Entities.AddOrUpdate(entities, false);
            DispatcherFacade.AddToDispatcherQueue(w);
        }

        [RegisterInterest(Topic.BondModuleHang, TaskType.Sporadic)]
        private void Hang()
        {
            ProcessOnDispatcherThread(() => Thread.Sleep(10000));
        }

        [RegisterInterest(Topic.BondModuleOpen, TaskType.Sporadic)]
        private void OpenClose(bool  state)
        {
            ProcessOnDispatcherThread(() =>
            {
                if (state)
                    ((Window)ViewReference).Show();
                else
                    ((Window)ViewReference).Hide();
            });
        }
...

引导程序

使用 MEF 发现和加载组件

C#
...
        private readonly Action<Lazy<IDynamicViewModel>> _createView = ((t) =>
        {
            var vm = t.Value;

            Mediator.GetInstance.Register(vm);
            Window view = (Window)((BaseViewModel)vm).ViewReference;
            RunningWindows.TryAdd(vm.DynamicViewName, view);
            var heartbeat = new Heartbeat(vm.GetType().ToString(), String.Format("{0} View loaded at: {1}", vm.GetType().ToString(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

            Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            //view.Show();
            view.Closed += (sender, e) => view.Dispatcher.InvokeShutdown();
            Dispatcher.Run();
        });

        private void InjectDynamicViewModels(bool multiDispatchers)
        {
            foreach (var lazy in DynamicViewModels)
            {
                Lazy localLazy = lazy;
                if (multiDispatchers)
                {
                    Mediator.GetInstance.Broadcast(Topic.BootstrapperLoadViews, localLazy); 
                }
                else
                {
                    CreateView(localLazy);
                }
                
            }
        }

        private void InjectServices()
        {
            foreach (var lazy in Services)
            {
                var service = lazy.Value;
                Mediator.GetInstance.Register(service);
                var heartbeat = new Heartbeat(service.GetType().ToString(), String.Format("{0} Service loaded at: {1}", service.GetType(), DateTime.UtcNow.ToLongTimeString()), DateTime.UtcNow, true);

                Mediator.GetInstance.Broadcast(Topic.ShellStateUpdated, heartbeat);
            }
            //inject service observers
            foreach (var lazy in ServiceObservers)
            {
                lazy.Value.AddServicesToObserve(Services.Select(s => s.Value));
            }
        }

        private void InjectStaticViewModels(MainWindow mainWindow)
        {
            foreach (var vm in StaticViewModels)
            {
                Mediator.GetInstance.Register(vm);
                mainWindow.RibbonRegion.Items.Add(new TabItem { Content = ((BaseViewModel)vm).ViewReference, Header = vm.StaticViewName });
            }
        }
...

压轴

我的要求是构建一个“使用 WPF/MVVM 的实时低延迟多线程前台交易应用程序”,它“需要始终响应,非常快速地显示交易数据,永不挂起、冻结、崩溃或运行记不清”。

我通过使用 .NET Parallel Task Framework 实现速率单调调度算法实现了这一点,因此它是实时的。

我在不止一个 Dispatcher 上运行 GUI,因此它的响应速度非常快。

我允许数据自由流动并在非 UI 线程上修改“GUI 元素”,因此它可以非常快速地更新 GUI。

我无耻地从其他 WPF 开发人员那里窃取组件,即 Sacha Barber、Marlon Grech、Kent Boogaart

我认为我失败的唯一地方是提供一种通用机制来处理零星事件。因为它是由开发人员将事件标记为零星的。最好有一种机制,让每个用户操作在进入 WPF 事件路由并作为零星事件处理之前都会被自动拾取。我想用System.Windows.Input.InputManager这个但失败了。因此,如果有人对如何拦截 WPF 中的任何用户输入并将其作为零星事件在TaskScheduler里进行处理有任何想法,我想听听您的意见。

 

友情链接
版权所有 Copyright(c)2004-2021 锐英源软件
公司注册号:410105000449586 豫ICP备08007559号 最佳分辨率 1024*768
地址:郑州大学北校区院(文化路97号院)内