上篇博文介绍了StreamInsight基础查询操作中的基础排序部分。这篇文章将主要介绍如何StreamInsight查询中的分组排序(TopK)。

测试数据准备

为了方便测试查询,我们首先准备一个静态的测试数据源:

var weatherData = new[]
{
    new { Timestamp = new DateTime(2010, 1, 1, 0, 00, 00, DateTimeKind.Utc), Temperature = -9.0, StationCode = 71395, WindSpeed = 4}, 
    new { Timestamp = new DateTime(2010, 1, 1, 0, 30, 00, DateTimeKind.Utc), Temperature = -4.5, StationCode = 71801, WindSpeed = 41},
    new { Timestamp = new DateTime(2010, 1, 1, 1, 00, 00, DateTimeKind.Utc), Temperature = -8.8, StationCode = 71395, WindSpeed = 6}, 
    new { Timestamp = new DateTime(2010, 1, 1, 1, 30, 00, DateTimeKind.Utc), Temperature = -4.4, StationCode = 71801, WindSpeed = 39},
    new { Timestamp = new DateTime(2010, 1, 1, 2, 00, 00, DateTimeKind.Utc), Temperature = -9.7, StationCode = 71395, WindSpeed = 9}, 
    new { Timestamp = new DateTime(2010, 1, 1, 2, 30, 00, DateTimeKind.Utc), Temperature = -4.6, StationCode = 71801, WindSpeed = 59},
    new { Timestamp = new DateTime(2010, 1, 1, 3, 00, 00, DateTimeKind.Utc), Temperature = -9.6, StationCode = 71395, WindSpeed = 9},
};

weatherData代表了一系列的天气信息(时间戳、温度、气象站编码以及风速)。

接下去将weatherData转变为点类型复杂事件流:

var weatherStream = weatherData.ToPointStream(Application,
    t => PointEvent.CreateInsert(t.Timestamp, t),
    AdvanceTimeSettings.IncreasingStartTime);

分组排序

问题1:怎样找出过去2小时内具有最大平均值的事件组?

完成这个查询要分2步:第一步将事件归组并求得平均值;第二步计算同一时间段组平均值最大的事件组。具体查询如下:

第一步:将事件归组并求得平均值:

var averageQuery = from e in weatherStream
                   group e by e.StationCode into stationGroups
                   from win in stationGroups.TumblingWindow(
                        TimeSpan.FromHours(2), HoppingWindowOutputPolicy.ClipToWindowEnd)
                   select new
                   {
                       StationCode = stationGroups.Key,
                       AverageTemperature = win.Avg(e => e.Temperature)
                   };

将averageQuery内容导出如下:

第二步:计算同一时间段平均值最大的事件组:

var topKGroupQuery1 = (from win in averageQuery.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                       from e in win
                       orderby e.AverageTemperature descending
                       select e).Take(1);

第二步的结果如下:

问题2:怎样每隔4小时找出过去8小时内具有最大值的事件组?

与问题1类似,要分两个步骤解决该问题:

第一步将事件归组并求得最大值,代码如下:

var maxValueQuery = from e in weatherStream
                    group e by e.StationCode into stationGroups
                    from win in stationGroups.HoppingWindow(
                        TimeSpan.FromHours(8), TimeSpan.FromHours(4),
                        HoppingWindowOutputPolicy.ClipToWindowEnd)
                    select new
                    {
                        StationCode = stationGroups.Key,
                        MaxTemperature = win.Max(e => e.Temperature),
                        MaxWindspeed = win.Max(e => e.WindSpeed)
                    };

LINQPad中的结果如下:

第二步计算同一时间段组最大值事件组,查询如下:

var topKGroupQuery2 = (from win in maxValueQuery.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                       from e in win
                       orderby e.MaxTemperature descending
                       select e).Take(1);

第二步结果为:

下一篇将介绍StreamInsight基础查询操作中的决胜排序(Ranking and Tiebreaking)部分。

作者: StreamInsight 发表于 2011-08-23 22:50 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"