AsyncEnumerable
主要是伴随着对异步迭代器的需求而产生的。之前在编写WikiClientLibrary的时候,遇到了一个和分页相关的问题。比如我们要从维基服务器获取所有页面的列表。一个最简单、使用异步的想法如下所示
public async Task<IEnumerable<Page>> FetchAllPagesAsync(Site site) { JToken json = await site.SendReuestAsync(/* ... */); // 向服务器请求所有页面 IList<Page> result = PagesFromJson(); // 将获取到的JSON转换为页面列表 return result; }
这样,客户程序可以使用循环或者LINQ来使用这些条目序列。
但情况没有这么简单。服务器在一次请求中最多只能返回500条结果,那么对于大部分的维基而言,我们可能需要多次请求才能获取到所有的结果。那么,我们要怎么处理这种情况呢?
一个最直接的想法如下所示
public async Task<IEnumerable<Page>> FetchAllPagesAsync(Site site) { var result = new List<Page>(); do { JToken json = await site.SendReuestAsync(/* ... */); result.AddRange(PagesFromJson(json)); } while (NeedContinue(json)); return result; }
是的,这样整个函数还是异步的,但如果用户仅仅对前10条记录感兴趣呢?比如……
var pages = await FetchAllPagesAsync(); var largePages = pages.Take(10).Count(p => p.ContentLength > 1024);
看起来,问题得到了解决。
但这不是延迟调用(Lazy evaluation)。
使用上面的这种实现方法,我们会将整个列表中的所有条目全部加载,然后返回。例如,如果整个维基站点有10,000个页面条目,假设我们每次请求会得到500个条目,那么在`FetchAllPagesAsync`中需要20次请求才能返回结果,而其中后19次请求的结果实际上会被丢弃。
也许我们可以使用迭代器。假设我们采用阻塞调用来向服务器请求数据:
public IEnumerable<Page> FetchAllPages(Site site) { do { JToken json = site.SendReuest(/* ... */); // 同步/阻塞调用 foreach (var p in PagesFromJson(json)) yield return p; } while (NeedContinue(json)); }
如此一来,如果客户端仅仅使用了返回的序列中的前20项,那么此迭代器就不会向服务器请求更多的结果。
但这是阻塞调用。
我们之所以要使用TPL,是因为在等待服务器返回结果的这段时间中,我们完全可以去做其他的事情。但在上面的实现中,`SendRequest`是阻塞函数。这不是我们想要的。
也许我们可以这样
public async IEnumerable<Task<Page>> FetchAllPagesAsync(Site site);
但我实在想不出来可以怎么写……
也许我们需要一个异步迭代器、异步的IEnumerable
。
问题的提出
以上的尴尬局面归结到现有的IEnumerator
接口,具体来说,IEnumerator.MoveNext
是同步的。
/// <summary>支持对非泛型集合的简单迭代。</summary> public interface IEnumerator { /// <summary>获取集合中的当前元素。</summary> /// <returns>集合中的当前元素。</returns> object Current { get; } /// <summary>将枚举数推进到集合的下一个元素。</summary> /// <returns>如果枚举数已成功地推进到下一个元素,则为 true;如果枚举数传递到集合的末尾,则为 false。</returns> /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception> bool MoveNext(); /// <summary>将枚举数设置为其初始位置,该位置位于集合中第一个元素之前。</summary> /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception> void Reset(); }
注:Reset
的存在感几乎为0。此方法仅为COM兼容性而设置。目前主流的实现方法是直接扔一个NotSupportedException
。
显然,对于我们这里的情况,在调用MoveNext
时,我们需要产生并设置Current
为序列的下一个元素。对于异步调用,我们期望能出现类似于Task<bool> MoveNextAsync(); 这样的函数,来异步地产生下一个元素。基于这样的想法,我们有了下面的接口
/// <summary> /// Asynchronous version of the IEnumerator<T> interface, allowing elements to be /// retrieved asynchronously. /// </summary> /// <typeparam name="T">Element type.</typeparam> public interface IAsyncEnumerator<out T> : IDisposable { /// <summary>Gets the current element in the iteration.</summary> T Current { get; } /// <summary> /// Advances the enumerator to the next element in the sequence, returning the result asynchronously. /// </summary> /// <param name="cancellationToken">Cancellation token that can be used to cancel the operation.</param> /// <returns> /// Task containing the result of the operation: true if the enumerator was successfully advanced /// to the next element; false if the enumerator has passed the end of the sequence. /// </returns> Task<bool> MoveNext(CancellationToken cancellationToken); }
是的,这就是我们期盼中的异步迭代器。与之配套的是可以返回IAsyncEnumerator的`IAsyncEnumerable`。
假设我们已经实现了一个函数IAsyncEnumerable<Page> FetchAllPagesAsync();,那么,也许我们可以这样来使用
var pages = FetchAllPagesAsync(); var largePages = await pages.Take(10).Count(p => p.ContentLength > 1024);
注意到我们拿到的是一个异步迭代器,在向此迭代器请求第一个元素之前,是不会有任何请求发生的。因为此时`IAsyncEnumerator`还没有被创建,更不用说调用`IAsyncEnumerator.MoveNext`了。因此,第一行代码肯定是同步返回的。
事情在第二行变得有意思了。我们假定库为`IAsyncEnumerable`也实现了一套LINQ扩展方法,就像`System.Linq.Enumerable`一样,那么与之对应的Take
和Count
函数的声明应当如下所示
public static IAsyncEnumerable<TSource> Take<TSource>(this IAsyncEnumerable<TSource> source, int count); public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate);
别忘了迭代器的延迟计算特性。直到Count
被调用之前,实际的计算都不会发生。而此处的Count
函数是异步的。这也很好理解,因为从迭代器中产生序列的过程是异步的,因此计算汇总过程也肯定是异步的。而且因为在前面的代码中,我们仅提取序列中的前10项,如果`IAsyncEnumerator`的实现得当的话,那么后面的结果是不会被产生的,回到最初的案例,也就不会出现后19次的网络请求了。
实际上,异步迭代器已经有现成的库可用了,那就是Ix-Async,或者System.Interactive.Async。使用Visual Studio的同学可以使用NuGet进行下载。
[TBC]