AsyncEnumerable 随笔

AsyncEnumerable 主要是伴随着对异步迭代器的需求而产生的。之前在编写WikiClientLibrary的时候,遇到了一个和分页相关的问题。比如我们要从维基服务器获取所有页面的列表。一个最简单、使用异步的想法如下所示

public async Task<IEnumerable<Page>> FetchAllPagesAsync(Site site)
{
    JToken json = await site.SendReuestAsync(/* ... */);    // 向服务器请求所有页面
    IList<Page> result = PagesFromJson();                   // 将获取到的JSON转换为页面列表
    return result;
}

这样,客户程序可以使用循环或者LINQ来使用这些条目序列。

但情况没有这么简单。服务器在一次请求中最多只能返回500条结果,那么对于大部分的维基而言,我们可能需要多次请求才能获取到所有的结果。那么,我们要怎么处理这种情况呢?

一个最直接的想法如下所示

public async Task<IEnumerable<Page>> FetchAllPagesAsync(Site site)
{
    var result = new List<Page>();
    do
    {
        JToken json = await site.SendReuestAsync(/* ... */);
        result.AddRange(PagesFromJson(json));
    } while (NeedContinue(json));
    return result;
}

是的,这样整个函数还是异步的,但如果用户仅仅对前10条记录感兴趣呢?比如……

var pages = await FetchAllPagesAsync();
var largePages = pages.Take(10).Count(p => p.ContentLength > 1024);

看起来,问题得到了解决。

但这不是延迟调用(Lazy evaluation)。

使用上面的这种实现方法,我们会将整个列表中的所有条目全部加载,然后返回。例如,如果整个维基站点有10,000个页面条目,假设我们每次请求会得到500个条目,那么在`FetchAllPagesAsync`中需要20次请求才能返回结果,而其中后19次请求的结果实际上会被丢弃。

也许我们可以使用迭代器。假设我们采用阻塞调用来向服务器请求数据:

public IEnumerable<Page> FetchAllPages(Site site)
{
    do
    {
        JToken json = site.SendReuest(/* ... */);          // 同步/阻塞调用
        foreach (var p in PagesFromJson(json))
            yield return p;
    } while (NeedContinue(json));
}

如此一来,如果客户端仅仅使用了返回的序列中的前20项,那么此迭代器就不会向服务器请求更多的结果。

但这是阻塞调用。

我们之所以要使用TPL,是因为在等待服务器返回结果的这段时间中,我们完全可以去做其他的事情。但在上面的实现中,`SendRequest`是阻塞函数。这不是我们想要的。

也许我们可以这样

public async IEnumerable<Task<Page>> FetchAllPagesAsync(Site site);

但我实在想不出来可以怎么写……

也许我们需要一个异步迭代器、异步的IEnumerable

问题的提出

以上的尴尬局面归结到现有的IEnumerator接口,具体来说,IEnumerator.MoveNext是同步的。

/// <summary>支持对非泛型集合的简单迭代。</summary>
public interface IEnumerator
{
    /// <summary>获取集合中的当前元素。</summary>
    /// <returns>集合中的当前元素。</returns>
    object Current { get; }

    /// <summary>将枚举数推进到集合的下一个元素。</summary>
    /// <returns>如果枚举数已成功地推进到下一个元素,则为 true;如果枚举数传递到集合的末尾,则为 false。</returns>
    /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception>
    bool MoveNext();

    /// <summary>将枚举数设置为其初始位置,该位置位于集合中第一个元素之前。</summary>
    /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception>
    void Reset();
}

注:Reset的存在感几乎为0。此方法仅为COM兼容性而设置。目前主流的实现方法是直接扔一个NotSupportedException

显然,对于我们这里的情况,在调用MoveNext时,我们需要产生并设置Current为序列的下一个元素。对于异步调用,我们期望能出现类似于Task<bool> MoveNextAsync(); 这样的函数,来异步地产生下一个元素。基于这样的想法,我们有了下面的接口

  /// <summary>
  ///     Asynchronous version of the IEnumerator&lt;T&gt; interface, allowing elements to be
  ///     retrieved asynchronously.
  /// </summary>
  /// <typeparam name="T">Element type.</typeparam>
  public interface IAsyncEnumerator<out T> : IDisposable
  {
    /// <summary>Gets the current element in the iteration.</summary>
    T Current { get; }

    /// <summary>
    ///     Advances the enumerator to the next element in the sequence, returning the result asynchronously.
    /// </summary>
    /// <param name="cancellationToken">Cancellation token that can be used to cancel the operation.</param>
    /// <returns>
    ///     Task containing the result of the operation: true if the enumerator was successfully advanced
    ///     to the next element; false if the enumerator has passed the end of the sequence.
    /// </returns>
    Task<bool> MoveNext(CancellationToken cancellationToken);
  }

是的,这就是我们期盼中的异步迭代器。与之配套的是可以返回IAsyncEnumerator的`IAsyncEnumerable`。

假设我们已经实现了一个函数IAsyncEnumerable<Page> FetchAllPagesAsync();,那么,也许我们可以这样来使用

var pages = FetchAllPagesAsync();
var largePages = await pages.Take(10).Count(p => p.ContentLength > 1024);

注意到我们拿到的是一个异步迭代器,在向此迭代器请求第一个元素之前,是不会有任何请求发生的。因为此时`IAsyncEnumerator`还没有被创建,更不用说调用`IAsyncEnumerator.MoveNext`了。因此,第一行代码肯定是同步返回的。

事情在第二行变得有意思了。我们假定库为`IAsyncEnumerable`也实现了一套LINQ扩展方法,就像`System.Linq.Enumerable`一样,那么与之对应的TakeCount函数的声明应当如下所示

public static IAsyncEnumerable<TSource> Take<TSource>(this IAsyncEnumerable<TSource> source, int count);

public static Task<int> Count<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate);

别忘了迭代器的延迟计算特性。直到Count被调用之前,实际的计算都不会发生。而此处的Count函数是异步的。这也很好理解,因为从迭代器中产生序列的过程是异步的,因此计算汇总过程也肯定是异步的。而且因为在前面的代码中,我们仅提取序列中的前10项,如果`IAsyncEnumerator`的实现得当的话,那么后面的结果是不会被产生的,回到最初的案例,也就不会出现后19次的网络请求了。

实际上,异步迭代器已经有现成的库可用了,那就是Ix-Async,或者System.Interactive.Async。使用Visual Studio的同学可以使用NuGet进行下载。

[TBC]

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

ERROR: si-captcha.php plugin: GD image support not detected in PHP!

Contact your web host and ask them to enable GD image support for PHP.

ERROR: si-captcha.php plugin: imagepng function not detected in PHP!

Contact your web host and ask them to enable imagepng for PHP.

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据

Content is available under CC BY-SA 3.0 unless otherwise noted.