Running LINQ Queries in Parallel

Just as it is possible to execute a loop in parallel using Parallel.For(), so it is also possible to execute LINQ queries in parallel using the Parallel LINQ API (PLINQ, for short). An example of a simple nonparallel LINQ expression is shown in Listing 21.6; in Listing 21.7, we modify it to run in parallel.

Listing 21.6: LINQ Select()
using System.Collections.Generic;
using System.Linq;
 
public class Program
{
    // ...
    public static List<string>
      Encrypt(IEnumerable<string> data)
    {
        return data.Select(
            item => Encrypt(item)).ToList();
    }
    // ...
}

In Listing 21.6, a LINQ query uses the Select() standard query operator to encrypt each string within a sequence of strings and convert the resultant sequence to a list. This seems like an embarrassingly parallel operation; each encryption is likely to be a high-latency processor-bound operation that could be farmed out to a worker thread on another CPU.

Listing 21.7 shows how to modify Listing 21.6 so that the code that encrypts the strings is executed in parallel.

Listing 21.7: Parallel LINQ Select()
using System.Linq;
 
public class Program
{
    // ...
    public static List<string>
      Encrypt(IEnumerable<string> data)
    {
        return data.AsParallel().Select(
            item => Encrypt(item)).ToList();
    }
    // ...
}

As Listing 21.7 shows, the change to enable parallel support is extremely small! All that it involves is a standard query operator, AsParallel(), which can be found on the static class System.Linq.ParallelEnumerable. This simple extension method tells the runtime that it can execute the query in parallel. The result is that on machines with multiple available CPUs, the total time taken to execute the query can be significantly shorter.

System.Linq.ParallelEnumerable, the engine that was introduced in Microsoft .NET Framework 4.0 to enable PLINQ, includes a superset of the query operators available on System.Linq.Enumerable. Thus, it provides the API that enables the possible performance improvements for all of the common query operators, including those used for sorting, filtering (Where()), projecting (Select()), joining, grouping, and aggregating. Listing 21.8 shows how to do a parallel sort.

Listing 21.8: Parallel LINQ with Standard Query Operators
//...
OrderedParallelQuery<string> parallelGroups =
    data.AsParallel().OrderBy(item => item);
 
// Show the total count of items still
// matches the original count
if (data.Count() != parallelGroups.Sum(
        item => item.Length))
{
    throw new Exception("data.Count() != parallelGroups" +
        ".Sum(item => item.Count()");
}
// ...

As Listing 21.8 shows, invoking the parallel version simply involves a call to the AsParallel() extension method. Notice that the type of the result returned by the parallel standard query operators is either ParallelQuery<T> or OrderedParallelQuery<T>; both inform the compiler that it should continue to use the parallel versions of the standard query operations that are available.

Given that query expressions are simply a syntactic sugar for the method call form of the query used in Listing 21.5 and Listing 21.6, you can just as easily use AsParallel() with the expression form. Listing 21.9 shows an example of executing a grouping operation in parallel using query expression syntax.

Listing 21.9: Parallel LINQ with Query Expressions
//...
ParallelQuery<IGrouping<charstring>> parallelGroups;
parallelGroups =
    from text in data.AsParallel()
    orderby text
    group text by text[0];
 
// Show the total count of items still
// matches the original count
if (data.Count() != parallelGroups.Sum(
        item => item.Count()))
{
    throw new Exception("data.Count() != parallelGroups" +
        ".Sum(item => item.Count()");
}
//...

As you saw in the previous examples, converting a query or iteration loop to execute in parallel is simple. There is one significant caveat, however: As we will discuss in depth in Chapter 22, you must take care not to allow multiple threads to inappropriately access and modify the same memory simultaneously. Doing so will cause a race condition.

As we saw earlier in this chapter, the Parallel.For() and Parallel.ForEach<T> methods gather up any exceptions thrown during the parallel iterations and then throw one aggregating exception containing all of the original exceptions. PLINQ operations are no different. That is, they also have the potential of returning multiple exceptions for exactly the same reason: When the query logic is run on each element in parallel, the code executing on each element can independently throw an exception. Unsurprisingly, PLINQ deals with this situation in exactly the same way as do parallel loops and the TPL: Exceptions thrown during parallel queries are accessible via the InnerExceptions property of the AggregateException. Therefore, wrapping a PLINQ query in a try/catch block with the exception type of System.AggregateException successfully handles any exceptions within each iteration that were unhandled.

Canceling a PLINQ Query

As expected, the cancellation request pattern is also available on PLINQ queries. Listing 21.10 (with Output 21.3) provides an example. Like the parallel loops, canceled PLINQ queries throw a System.OperationCanceledException. Also like the parallel loops, executing a PLINQ query is a synchronous operation on the invoking thread. Thus, a common technique is to wrap the parallel query in a task that runs on another thread so that the current thread can cancel it if necessary—the same solution used in Listing 21.5.

Listing 21.10: Canceling a PLINQ Query
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
 
public static class Program
{
    public static List<string> ParallelEncrypt(
        List<string> data,
        CancellationToken cancellationToken)
    {
        int governor = 0;
        return data.AsParallel().WithCancellation(
            cancellationToken).Select(
                (item) =>
                {
                    if (Interlocked.CompareExchange(
                        ref governor, 0, 100) % 100 == 0)
                    {
                        Console.Write('.');
                    }
                    Interlocked.Increment(ref governor);
                    return Encrypt(item);
                }).ToList();
    }
 
    public static async Task Main()
    {
        ConsoleColor originalColor = Console.ForegroundColor;
        List<string> data = Utility.GetData(100000).ToList();
 
        using CancellationTokenSource cts = new();
 
        Task task = Task.Run(() =>
        {
            data = ParallelEncrypt(data, cts.Token);
        }, cts.Token);
 
        Console.WriteLine("Press any key to Exit.");
        Task<int> cancelTask = ConsoleReadAsync(cts.Token);
 
        try
        {
            Task.WaitAny(task, cancelTask);
            // Cancel whichever task has not finished.
            cts.Cancel();
            await task;
 
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("\nCompleted successfully");
        }
        catch (OperationCanceledException taskCanceledException)
        {
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine(
                $"\nCancelled: { taskCanceledException.Message }");
        }
        finally
        {
            Console.ForegroundColor = originalColor;
        }
    }
 
    private static async Task<int> ConsoleReadAsync(
        CancellationToken cancellationToken = default)
    {
        return await Task.Run(async () =>
        {
            const int maxDelay = 1025;
            int delay = 0;
            while (!cancellationToken.IsCancellationRequested)
            {
                if (Console.KeyAvailable)
                {
                    return Console.Read();
                }
                else
                {
                    await Task.Delay(delay, cancellationToken);
                    if (delay < maxDelay) delay *= 2 + 1;
                }
            }
            cancellationToken.ThrowIfCancellationRequested();
            throw new InvalidOperationException(
                "Previous line should throw preventing this from ever executing");
        }, cancellationToken);
    }
 
    private static string Encrypt(string item)
    {
        Cryptographer cryptographer = new();
        return System.Text.Encoding.UTF8.GetString(cryptographer.Encrypt(item));
    }
}
Output 21.3
Press any key to Exit.
.............................................................................................................................................................................................................
.....................................
Cancelled: The query has been canceled via the token supplied to WithCancellation.

As with a parallel loop or task, canceling a PLINQ query requires a CancellationToken, which is available from a CancellationTokenSource. However, rather than overloading every PLINQ query to support the cancellation token, the ParallelQuery<T> object returned by IEnumerable’s AsParallel() method includes a WithCancellation() extension method that simply takes a CancellationToken. As a result, calling Cancel() on the CancellationTokenSource object will request the parallel query to cancel—because it checks the IsCancellationRequested property on the CancellationToken.

As mentioned, canceling a PLINQ query throws an exception in place of returning the complete result. One common technique for dealing with a possibly canceled PLINQ query is to wrap the query in a try block and catch the OperationCanceledException. A second common technique, used in Listing 21.10, is to pass the CancellationToken both to ParallelEncrypt() and as a second parameter on Run(). This causes task.Wait() to throw an AggregateException whose InnerException property will be set to a TaskCanceledException. The aggregating exception can then be caught, just as you would catch any other exception from a parallel operation.

{{ snackbarMessage }}
;