Skip to main content

· 11 min read
Adnan Rafiq
Start and Finish Image

Image by awcreativeut

A blog post is in progress.

A Channel by merriam-webster refers to among other meaings

the bed where a natural stream of water runs

I will start this post by adopting the water channel analogy to explore how it relates with C# System.Threading.Channels.

Analogy

Channel is the path where information flows from one point to another, like a water channel. If you observe the water channel, it is made up of multiple things. The first thing you will observe is water; second, water is produced by source(s), you can think of glaciers, Rain, etc., and thirdly the water ends up at the destination(s), you can think of the Sea, River, etc.

In .NET terminology, the source(s) are referred to as Producer(s), and the destination(s) are referred to as Consumer(s). And the water is referred to as Message(s) or the .NET object.

For a moment, think about the water channel that how it runs. You will observe that one or multiple consumers and producers are concurrently at work in the water channel meaning produced by rains, melting of galciers, and consumed by many in different forms.

Edge Case of flooding and Channel Types

The flood can happen in the water channel for many reasons, such as:

  • The glaciers are melting at a high pace.
  • The river is at capacity.

In .NET, you can map the analogies to Backpressure, which means the consumer is not consuming the messages at the same pace that the producer(s) are producing. The Backpressure is a technique to mitigate the flooding problem by configuring behavior on the producer to produce in a different way.

That brings us to mainly two types of channels in the .NET

  1. Bounded Channel The Bounded Channel has a defined capacity based on your application needs. It allows you to configure the behavior of what to do in case flooding happens.
  2. UnBounded Channel The UnBounded Channel has no defined capacity, but the limitation of memory availability of the process will apply.

Channel is made up of Channel Reader and Channel Writer. Channel class acts as a Facade to synchronize the access to the shared data structure of type T. Channel class offers two factory methods to create the typed channel. You can configure the channel behavior using options classes.

The below code snippet shows factory methods which are creating both types of channels.

Channel Factory Methods to create bounded or unbounded channels

//Create the bounded channel
Channel.CreateBounded<int>(new BoundedChannelOptions(capacity: 4)
{
// This behavior will be triggered when channel writer tries to write
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
});

//Create the unbounded channel
Channel.CreateUnbounded<int>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});

Bounded Channel Mode when its full and backpressure

The bounded channel:

  • Always have a capacity.
  • Allows you to configure the mode of writer when the capacity is full.
  • Allows you to register a callback function when an item is dropped as a result of BoundedChannelFullMode mode.

In the below code snippet, I have implemented BackgroundBoundedProcessingQueue to observe the different modes of the channel writer behavior in a unit test. The unit test ShouldDropItemsWhenChannelIsAtCapacity asserts the dropped items count in DropNewest, DropWrite, and DropOldest modes, and you will notice that there are items in the dropped list. But in Wait mode, the writer wait until the operation is cancelled or the channel capacity becomes available.

I encourage you to paste the whole snippet code in xunit project and observe the logs of the unit test.

A unit test which shows how the bounded channel handles the writes when its full

using System.Threading.Channels;
using Xunit.Abstractions;

namespace ChannelTests;

public class BackgroundBoundedProcessingQueue
{
private readonly Channel<int> _channel;
private readonly ITestOutputHelper _logger;
public readonly List<int> DroppedItems = new();

public BackgroundBoundedProcessingQueue(ITestOutputHelper logger, int capacity,
BoundedChannelFullMode boundedChannelFullMode)
{
_logger = logger;
_channel = Channel.CreateBounded<int>(new BoundedChannelOptions(capacity)
{
FullMode = boundedChannelFullMode
},
droppedItem =>
{
_logger.WriteLine($"{droppedItem} has been dropped.");
DroppedItems.Add(droppedItem);
});
}

public async ValueTask QueueItem(int item, CancellationToken cancellationToken)
{
await _channel.Writer.WriteAsync(item, cancellationToken);
_logger.WriteLine($"{item} has been written to channel");
}

public ValueTask<int> DequeueItem(CancellationToken cancellationToken)
{
return _channel.Reader.ReadAsync(cancellationToken);
}

public async Task ProcessItems(CancellationToken cancellationToken)
{
var items = _channel.Reader.ReadAllAsync(cancellationToken);
await foreach (var item in items.WithCancellation(cancellationToken))
_logger.WriteLine($"{item} has been processed.");
}
}

public class BoundedChannelTests
{
private readonly ITestOutputHelper _testOutputHelper;

public BoundedChannelTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}

[Theory]
[InlineData(BoundedChannelFullMode.DropNewest)]
[InlineData(BoundedChannelFullMode.DropWrite)]
[InlineData(BoundedChannelFullMode.DropOldest)]
[InlineData(BoundedChannelFullMode.Wait)]
public async Task ShouldDropItemsWhenChannelIsAtCapacity(BoundedChannelFullMode dropMode)
{
var tokenSource = new CancellationTokenSource(2000);
_testOutputHelper.WriteLine($"Bounded Channel is operating in mode {dropMode}");
BackgroundBoundedProcessingQueue queue = new(_testOutputHelper, 3, dropMode);
if (dropMode == BoundedChannelFullMode.Wait)
{
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
foreach (var i in Enumerable.Range(1, 5))
{
//On fifth item it will wait until token is cancelled after 2000 ms
if (i == 5)
_testOutputHelper.WriteLine("5th item will never be written");
await queue.QueueItem(i, tokenSource.Token);
if (i == 3)
{
//Dequeuing the item so 4th item can be written
_testOutputHelper.WriteLine("Dequeuing the item so 4th item can be written");
await queue.DequeueItem(tokenSource.Token);
}
}
});
}
else
{
foreach (var i in Enumerable.Range(1, 5))
await queue.QueueItem(i, tokenSource.Token);
Assert.True(queue.DroppedItems.Count == 2);
}
}
}


UnBounded Channel has no capacity - A unit test to write for x amount of time to the unbounded channel
The Unbounded Channel:
  • Has no capacity but the available memory to the process.
  • It allows you to configure the number of readers and writers using UnboundedChannelOptions.

I encourage to try to run the code snippet to observe the unbounded channel capacity. The below code snippet is a unit test, you can run it by copying the whole thing in xunit project.


using System.Threading.Channels;
using Xunit.Abstractions;

namespace ChannelTests;

public class BackgroundUnBoundedProcessingQueue
{
private readonly Channel<int> _channel;
private readonly ITestOutputHelper _logger;
public BackgroundUnBoundedProcessingQueue(ITestOutputHelper logger)
{
_logger = logger;
_channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false,
AllowSynchronousContinuations = false
});
}
public async ValueTask QueueItem(int item, CancellationToken cancellationToken)
{
await _channel.Writer.WriteAsync(item, cancellationToken);
}
public ChannelReader<int> Reader => _channel.Reader;
}

public class UnBoundedChannelTests
{
private readonly ITestOutputHelper _testOutputHelper;

public UnBoundedChannelTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}

[Fact]
public async Task ShouldAlwaysAcceptItems()
{
var source = new CancellationTokenSource(100);
BackgroundUnBoundedProcessingQueue queue = new(_testOutputHelper);
//Throws Async because we are only writing for 100 ms to prove the capacity has no bounds
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
{
for (var i = 0; i < int.MaxValue; i++)
{
await queue.QueueItem(i, source.Token);
}
});
var totalItems = 0;
while (queue.Reader.TryRead(out var x))
{
totalItems++;
}
_testOutputHelper.WriteLine($"Total items on the channel are {totalItems}");
Assert.True(true);
}
}

The case of Decoupling

The sea (consumer) does not care about how the water is produced. It just consumes it metaphorically. Neither the sea knows about the production source, the glacier metaphorically.

Similarly, the producer and consumer in the .NET channel do not know about each other yet operate on the shared resource, the messages.

Many canals of the River and .NET Concurrency

Without going into the chemistry of water molecules, the producers put water into the river (Channel), a distinct function performed concurrently. Similarly, the river is divided into canals (consumers) that consume the water concurrently.

The important thing to notice here, the same water drop is not shared among any involved parties; this complexity is hidden behind the .NET Channel abstraction.

Background processing the work using Channels and Hosted Service

A web application API is a multithreading process with which many users interact concurrently. If an API logic involves an operation to perform a long-running or directly unrelated task, that will not impact the response returned to the user. You would like to offload or perform the task in the background. This leads us to find a mechanism that enables offloading. Channel is the answer.

If you have worked with .NET Framework, such background tasks were usually queued using HostingEnvironment.QueueBackgroundItem method or queue on the Thread Pool using ThreadPool.QueueUserWorkItem. Both these approaches are hard to test and does not provide you any control on when these will be processed.

You can consider the below scenarios if you have yet to work with framework applications.

There are some commonly known requirements in traditional applications, such as:

  • Send out an email.
  • Send an event to the analytics service.
  • Sync the data to a third-party service.

In the below example, I have defined a simple API end point, and a hosted service. Suppose, you have an API call which should complete immediately but queue the work which can eventually complete at later time. To achieve this, I have implemented the IBackgroundTaskQueue to create an abstraction over the UnBoundedChannel, now this abstraction is injected into API endpoint which uses it to queue the work. And the background service is continuously trying to read any available items to process. And as soon as it receives an item, it will start processing.

An exmple of background processing in the hosted service using the UnBounded Channel

using System.Threading.Channels;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton<IBackgroundTaskQueue, LongRunningTaskBackgroundQueue>();
builder.Services.AddHostedService<LongRunningTaskBackgroundService>();

var app = builder.Build();
app.UseHttpsRedirection();

app.MapGet("dowork", async Task<IResult>(IBackgroundTaskQueue backgroundTaskQueue) =>
{
Console.WriteLine($"I was written to queue {DateTime.Now}");
await backgroundTaskQueue.QueueBackgroundWorkItemAsync(_ =>
{
//This wont run untill the reader reads and calls it
return Task.Run(async () =>
{
await Task.Delay(1000);
Console.WriteLine($"I ran {DateTime.Now}");

});
});
return Results.Ok();
});
app.Run();


public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, Task> workItem);

ValueTask<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken);
}

public sealed class LongRunningTaskBackgroundQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, Task>> _queue;

public LongRunningTaskBackgroundQueue()
{
UnboundedChannelOptions options = new()
{
SingleReader = true,
SingleWriter = false
};
_queue = Channel.CreateUnbounded<Func<CancellationToken, Task>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, Task> workItem)
{
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, Task>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem =
await _queue.Reader.ReadAsync(cancellationToken);

return workItem;
}
}

public sealed class LongRunningTaskBackgroundService : BackgroundService
{
private readonly ILogger<LongRunningTaskBackgroundQueue> _logger;
private readonly IBackgroundTaskQueue _taskQueue;

public LongRunningTaskBackgroundService(
IBackgroundTaskQueue taskQueue,
ILogger<LongRunningTaskBackgroundQueue> logger)
{
(_taskQueue, _logger) = (taskQueue, logger);
}

public override async Task StartAsync(CancellationToken cancellationToken)
{
var serviceName = nameof(LongRunningTaskBackgroundService);
_logger.LogInformation("{ServiceName} is starting", serviceName);
await base.StartAsync(cancellationToken);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var serviceName = nameof(LongRunningTaskBackgroundService);
_logger.LogInformation("{ServiceName} is running", serviceName);

await ProcessTaskQueueAsync(stoppingToken);
}

private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
//Let the server start
//https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-7.0&tabs=visual-studio&viewFallbackFrom=aspnetcore-3.0#backgroundservice-base-class
await Task.Yield();

while (!stoppingToken.IsCancellationRequested)
try
{
//Waiting for 5 seconds for each pass - you can remove it
await Task.Delay(5000, stoppingToken);

var task = await _taskQueue.DequeueAsync(stoppingToken);

await task(stoppingToken);
}
catch (OperationCanceledException operationCanceledException)
{
_logger.LogInformation(operationCanceledException,
"Operation was cancelled because host is shutting down");
}
catch (AggregateException aggregateException)
{
_logger.LogError(aggregateException.Flatten(), "Aggregate exception occurred");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing task work item");
}
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
//Dump all the queue tasks in local file or log it.
var serviceName = nameof(LongRunningTaskBackgroundService);
_logger.LogInformation("{ServiceName} is stopping", serviceName);

await base.StopAsync(cancellationToken);
}
}

It was the most thrilling improvement I have felt when compared to .NET Framework. I asked myself, how would I test the above, and I came up with the below test.

The below test:

  • Is starting the web server.
  • Is starting the hosted service which continuously tries to read from the channel.
  • Verifies that work item is written to channel, and then read by hosted service, and it starts the work.

That is beautiful .NET


using Microsoft.AspNetCore.Mvc.Testing;
using Microsoft.Extensions.DependencyInjection;
using Xunit.Abstractions;

namespace ChannelTests;


public class BackgroundProcessingTest : IClassFixture<WebApplicationFactory<Program>>
{
private readonly WebApplicationFactory<Program> _applicationFactory;
private readonly ITestOutputHelper _testOutputHelper;

public BackgroundProcessingTest(ITestOutputHelper testOutputHelper,
WebApplicationFactory<Program> applicationFactory)
{
_testOutputHelper = testOutputHelper;
_applicationFactory = applicationFactory;
}

[Fact]
public async Task ShouldProcessTaskInBackgroundWhenWrittenToQueue()
{
var backgroundTaskQueue = _applicationFactory.Services.GetRequiredService<IBackgroundTaskQueue>();
var expectedCount = 2;
var countdownEvent = new CountdownEvent(expectedCount);
for (var i = 0; i < 2; i++)
await backgroundTaskQueue.QueueBackgroundWorkItemAsync(token =>
{
//decrement the count
countdownEvent.Signal();
_testOutputHelper.WriteLine(
$"I was called by the background service, should be called {countdownEvent.CurrentCount} times more.");
return Task.CompletedTask;
});

//Waiting for 12 seconds because we intentionally waiting 5 secs in background service
//These waits can be removed its only to show you how its working
var countReached = countdownEvent.Wait(TimeSpan.FromSeconds(12));
Assert.True(countReached);
}
}

Feedback

I would love to hear your feedback, feel free to share it on Twitter.

· 4 min read
Adnan Rafiq
Start and Finish Image

Image by @claybanks

Overview

I am working on migrating the .NET Framework application to the .NET6. Since the application was initially written in the .NET Framework 2.0 thus it contains the legacy approaches to get the data from the database. We were using the old version of Microsoft Enterprise Library Data Access package to get the data from the database which is not compatible with the .NET Standard 2.0. So I decided to generate the code for stored procedures using the Dapper and Handlebars templates.

I faced two problems:

  • SQL Server system tables does not know about stored procedure parameter nullability
  • When you are using conditional logic inside the stored procedure, sql server does not give you correct count of the result sets.

· 3 min read
Adnan Rafiq
Start and Finish Image

Image by awcreativeut

CORS are best described on MDN

Cross-Origin Resource Sharing (CORS) is an HTTP-header based mechanism that allows a server to indicate any origins (domain, scheme, or port) other than its own from which a browser should permit loading resources. CORS also relies on a mechanism by which browsers make a "preflight" request to the server hosting the cross-origin resource, in order to check that the server will permit the actual request. In that preflight, the browser sends headers that indicate the HTTP method and headers that will be used in the actual request.

How to configure CORS in .NET6 API?

CORS in .NET6 API can be configured using CORS policies.

· 2 min read
Adnan Rafiq
Start and Finish Image

Image by @claybanks

Authorization Policy

Authorizing the resource access is essential part of any API. The .NET provides you a perfect mental model which is easier to reason about. It has this flow:

  1. What is the name of your policy as string.
  2. What requirement the user must satisfy to qualify which implements the IAuthorizationRequirement interface.
  3. What is your handler responsible to evaluate, which inherits the AuthorizationHandler<UniqueIdHeaderRequirement> and register it.

Then Authorize attribute allows you to set a policy name when used on controller or action method. But if you are fan of Minimal API then fluent style is the way to go using RequireAuthorization.

· 3 min read
Adnan Rafiq
An image of alphabets
Image by @linharex

Problem

When the application expects Unicode characters as input from the user, it is best to normalize it before storing it in the database, especially when you plan to use the information for comparison.

Suppose the application asks the user to upload a file with the same name as their first name, which contains the character é. If you validate the file name using the string comparison (===) operator or comparing length, it will fail if different Unicode code points represent the input.

You validate the client-side and server-side using C# as a best practice. It would be best to normalize the string before comparing; otherwise, the validation will fail either at the server or client side.

Browser's behavior is different for the Unicode characters; some do the normalization, and some do not. I recently had to fix an issue where string comparison without normalization only failed when the user uploaded the file using Chrome or Firefox on Mac. One such example is on here.

Normalize unicode strings for correct comparison

Some unicode character like ñ can be represented by using one code point (\u00F1) or two code points (\u006E\u0303). Such characters visually looks exactly the same but will have different string length. Thus string equality comparison and length tests will fail. This MDN and .NET article(s) describe it beautifully. If you are expecting unicode characters as an input from the user, store it after normalizing.

· 6 min read
Adnan Rafiq

What is an Adapter Pattern?

The Adapter Pattern connects two incompatible objects by exposing an interface compatible with the Client. The object refers to a class, web service, REST API, process, or physical device depending upon your context.

Consider a C# Web Application displaying Weather Updates on its landing by utilizing the third-party REST API.

In this application, there are three participating objects in the Adapter Pattern:

  1. A Weather REST API (Adaptee) has the functionality of weather updates. The Weather REST API only understands JSON.
  2. The C# MVC Web Application (Client) displays weather updates on its landing page using the Weather Web Service.
  3. The Adapter enables the Client to utilize the Adaptee functionality. It does that by doing two things:
    1. It converts the Client's input to the format acceptable to Adaptee i.e. JSON to C# Object(s).
    2. It converts the Adaptee's output to the format acceptable to the Client i.e. C# Object(s) to JSON.

· 15 min read
Adnan Rafiq
Start and Finish Image

Image by awcreativeut

The word Host will repeatedly appear in the post so let's briefly understand what it means?

What is Host?

The Host is a container which offers rich built-in services such as Dependency Injection, Configuration, Logging, Host Services and others. The NET 6 offers Generic DefaultHost which can be configured to handle the activities as per your use case. Two major variations of the Host are:

  • Console Host - CLI based applications.
  • Web Host - Web API & Applications.

Think of it as Airbnb Host who keeps the property ready to serve when the guests arrive. The property offers a different set of services and allows you to bring your own services. The lifetime of such services depends upon the contract, which the Host controls.

Basic Host Example : Create, configure, build, and run the Host
var host = Host.CreateDefaultBuilder(args) //WebHost.CreateDefaultBuilder(args)  
.ConfigureLogging( (context, builder) => builder.AddConsole())
.Build(); // Build the host, as per configurations.

await host.RunAsync();

What is Hosted Service?

A service that performs the work in the background mostly does not offer an interface to interact. In technical terms, any reference type object which implements the IHostedService interface is a background/hosted/worker service.

Terms such as Worker, Windows Service, and Background Task refer to HostedService based on context. In Windows Server, Widows Service is how you deploy a Hosted Service. Background Task or Hosted service runs as part of .NET Web Host, and it runs in the same operating system process.