Asynchronous Streams

C# 8.0 introduced the ability to program async streams, essentially the ability to leverage the async pattern with iterators. As discussed in Chapter 15, collections in C# are all built on the IEnumerable<T> and IEnumerator<T>, the former with a single GetEnumerator<T>() function that returns an IEnumerator<T> over which you can iterate. And, when building an iterator with yield return, the method needs to return IEnumerable<T> or IEnumerator<T>. In contrast, valid async return types must support a GetAwaiter() method,10 just as Task, Task<T>, and ValueTask<T> do. The conflict, therefore, is that you can’t have both an async method and an iterator. When invoking an async method while iterating over a collection, for example, you can’t yield the results to a calling function prior to the completion of all expected iterations.

To address these problems, the C# team added asynchronous streams (async streams) support in C# 8.0. This feature is specifically designed to enable asynchronous iteration and the building of asynchronous collections and enumerable type methods using yield return.

For example, imagine encrypting content with an async method, EncryptFilesAsync(), given a directory (defaulting to the current directory). Listing 20.6 provides the code.

Listing 20.6: Async Streams
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Threading;
using System.Runtime.CompilerServices;
using AddisonWesley.Michaelis.EssentialCSharp.Shared;
 
public static class Program
{
    public static async void Main(params string[] args)
    {
        string directoryPath = Directory.GetCurrentDirectory();
        string searchPattern = "*";
        // ...
        using Cryptographer cryptographer = new();
 
        IEnumerable<string> files = Directory.EnumerateFiles(
            directoryPath, searchPattern);
 
        // Create a cancellation token source to cancel 
        // if the operation takes more than a minute.
        using CancellationTokenSource cancellationTokenSource =
            new(1000*60);
 
        await foreach ((string fileName, string encryptedFileName)
            in EncryptFilesAsync(files, cryptographer)
            .Zip(files.ToAsyncEnumerable())
            .WithCancellation(cancellationTokenSource.Token)
            )
        {
            Console.WriteLine($"{fileName}=>{encryptedFileName}");
        }
    }
 
    public static async IAsyncEnumerable<string> EncryptFilesAsync(
        IEnumerable<string> files, Cryptographer cryptographer,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        foreach (string fileName in files)
        {
            yield return await EncryptFileAsync(fileName, cryptographer);
            cancellationToken.ThrowIfCancellationRequested();
        }
    }
 
    private static async ValueTask<string> EncryptFileAsync(
        string fileName, Cryptographer cryptographer)
    {
        string encryptedFileName = $"{fileName}.encrypt";
        await using FileStream outputFileStream =
            new(encryptedFileName, FileMode.Create);
 
        string data = await File.ReadAllTextAsync(fileName);
 
        await cryptographer.EncryptAsync(data, outputFileStream);
 
        return encryptedFileName;
    }
    // ...
}

Listing 20.6 begins with a Main() method, inside of which there is a C# 8.0–introduced async foreach statement iterating over an asynchronous method, EncryptFilesAsync(). (We will address the WithCancellation() invocation shortly.) The EncryptFilesAsync() method iterates over each of the specified files with a foreach loop. Inside the foreach loop are two async method invocations. The first is a call to File.ReadAllTextAsync(), which reads in all the content of the file. Once the content is available in memory, the code invokes the EncryptAsync() method to encrypt it before returning the encrypted filename via a yield return statement. Thus, the method provides an example of the need to provide an asynchronous iterator to the caller. The key to making this possible is the EncryptFilesAsync()’s decoration with async and its return of IAsyncEnumerable<T> (where T is a string in this case).

Given a method returning IAsyncEnumerable<T>, you can consume it using an await foreach statement as demonstrated in the Main method of Listing 20.6. Thus, this listing is both producing and consuming an async stream.

The signature for GetAsyncEnumerator() includes a CancellationToken parameter. Because the await foreach loop generates the code that calls GetAsyncEnumerator(), the way to inject a cancellation token and provide cancellation is via the WithCancellation() extension method. As Figure 20.2 shows, there’s no WithCancellation() method on IAsyncEnumerable<T> directly. To support cancellation in an async stream method, add an optional CancellationToken with an EnumeratorCancellationAttribute as demonstrated by the EncryptFilesAsync method declaration:

static public async IAsyncEnumerable<string>

     EncryptFilesAsync(

         string directoryPath = null,

         string searchPattern = "*",

         [EnumeratorCancellation] CancellationToken

             cancellationToken = default)

{ ... }

Figure 20.2: IAsyncEnumerable<T> and related interfaces

In Listing 20.6, you provide an async stream method that returns the IAsyncEnumerable<T> interface. As with the non-async iterators, however, you can also implement the IAsyncEnumerable<T> interface with its GetAsyncEnumerator() method. Of course, any class implementing the interface can then be iterated over with an await foreach statement, as shown in Listing 20.7.

Listing 20.7: Async Streams Invocation with await foreach
public class AsyncEncryptionCollection : IAsyncEnumerable<string>
{
    public async IAsyncEnumerator<string> GetAsyncEnumerator(
        CancellationToken cancellationToken = default)
    {
        // ...
    }
 
    public static async Task Main()
    {
        AsyncEncryptionCollection collection =
            new();
        // ...
 
        await foreach (string fileName in collection)
        {
            Console.WriteLine(fileName);
        }
    }
}

One point of caution: Remember that declaring an async method doesn’t automatically cause the execution to run in parallel. Just because the EncryptFilesAsync() method is asynchronous, that doesn’t mean iterating over each file and invoking File.ReadAllTextAsync() and Cryptographer.EncryptAsync() will happen in parallel. To guarantee that behavior, you need to leverage a Task invocation or something like a System.Threading.Tasks.Parallel.ForEach() (Chapter 21).

The IAsyncEnumerable<T> interface, along with its partner IAsyncEnumerator<T>, are C# 8.0 additions shown in Figure 20.2 that match their synchronous equivalents. Note that both the IAsyncDisposable.DisposeAsync() and IAsyncEnumerator<T>.MoveNextAsync() methods are asynchronous versions of IEnumerators<T> equivalent methods. The Current property isn’t asynchronous. (Also, there’s no Reset() method in the asynchronous implementations.)

________________________________________

10. Or void.
{{ snackbarMessage }}
;