Custom Kestrel Transports
In this post, a DZone contributor walks us through an experiment he ran, in which he tried to create transports for the .NET Core-based web server, Kestrel.
Join the DZone community and get the full member experience.
Join For FreeWith .NET Core 2.0 finally coming out of preview I felt like now was a good time to share one of my preview experiments with everyone. Custom Kestrel Transports!
Before we begin, the source code for this preview experiment can be found here:
What Is Kestrel?
Kestrel is a new web server written in .NET Core which runs on Windows, Mac, and a dozen Linux distros. It has been written from the ground up with performance in mind and hits over a million requests per second in some of their plaintext benchmarks.
What Are Kestrel Transports?
Because Kestrel has to run in a Linux environment where WinSock doesn't exist, the ASP.NET team has abstracted out how connections are made to the server so they can be implemented on top of sockets for Windows and libuv for Linux and Mac. In fact, libuv was so much faster than traditional Windows sockets that libuv is the default for Windows environments too!
Why Would I Want to Write a Custom One?
There aren't a huge amount of practical reasons why you may want to write your own transport. But the fact you can is AWESOME! You could write a transport that accepts an infrared signal through some custom hardware which allows you to make web requests to your ASP.NET Core application via Infrared. Or you could have an external process communicate with your ASP.NET Core application through various Interprocess Communication methods which don't rely on sockets.
In our case, we wanted to be able to generate emails as part of a scheduled background service. We use razor views to generate our emails and traditionally would use RazorEngine or make a fake HTTP request back to the site's internal loop back address in order to generate the HTML body of the email.
What Is This Experiment?
In this experiment, I chose to make a generic Stream-based transport. This would allow me to make further experiments using this as a base for things like passing AJAX requests through a websocket and processing them on the server like they were actual HTTP requests.
You can view the source for this project in the Kestrel.Transport.Streams GitHub repo.
If you just want to add the Stream Transport to your project, you can find it on NuGet.
If you want to build this project, you will need Visual Studio 2017 version 15.3 as well as the .NET Core 2.0 SDK
How Do You Use It?
First, we need to tell Kestrel to use the Stream Transport. This is done in your Program.cs file.
namespace WebApplication1
{
public class Program
{
public static void Main(string[] args)
{
BuildWebHost(args).Run();
}
public static IWebHost BuildWebHost(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>()
.UseStreamTransport() // <--------- Add this line to enable the StreamTransport
.Build();
}
}
Once Kestrel knows about the StreamTransport, we can create a connection. This is theoretically the same as when a web browser connects to the server in order to make a request, except we are creating the connection ourselves and it doesn't use any sockets. Once we have a connection, we can make a web request through it. This Get
helper method will create an HTTP web request string and write it to the Stream, then it will wait for Kestrel to complete the response and read the HTTP body of the response into a string and return it. From there we can send it out via SMTP, post it to a 3rd party mail sending web API, write it to disk or whatever we want to do with it.
var connection = StreamTransport.CreateConnection();
string emailHtmlBody = await connection.Get("/emails/welcomeemail");
How Does It Work?
Kestrel uses Dependency Injection to configure pretty much everything, including which Transport it uses. A transport is basically a set of classes which implement a set of interfaces. Here is a quick breakdown of the interfaces and what they do:
- ITransportFactory - Creates and configures an instance of an ITransport.
- ITransport - Handles connections and creates instances of IConnectionInformation.
- IConnectionInformation - Represents a single connection and contains 2 IPEndPoints. This class is responsible for the sending and receiving of data.
Let's take a look at the StreamTransport implementations.
StreamTransportFactory.cs
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using System;
namespace Atlas.AspNetCore.Server.Kestrel.Transport.Streams
{
public sealed class StreamTransportFactory : ITransportFactory
{
private readonly PipeFactory _pipeFactory = new PipeFactory();
public static string PairingToken;
public StreamTransportFactory()
{
}
public ITransport Create(IEndPointInformation endPointInformation, IConnectionHandler handler)
{
if (endPointInformation == null)
{
throw new ArgumentNullException(nameof(endPointInformation));
}
if (handler == null)
{
throw new ArgumentNullException(nameof(handler));
}
return new StreamTransport(this, endPointInformation, handler);
}
internal PipeFactory PipeFactory => _pipeFactory;
}
}
The PairingToken
string here is used to store the token that IIS needs to pass to Kestrel for the requests to be valid. This only really applies when Kestrel is being executed behind IIS or IIS Express, which is the case when pressing run in Visual Studio. This token is added to every request in the Get
helper mentioned earlier. Other than that, this class is fairly self-explanatory, it creates a StreamTransport instance.
StreamTransport.cs
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using System.Diagnostics;
using System.Threading.Tasks;
namespace Atlas.AspNetCore.Server.Kestrel.Transport.Streams
{
public sealed class StreamTransport : ITransport
{
private readonly StreamTransportFactory _transportFactory;
private readonly IEndPointInformation _endPointInformation;
private readonly IConnectionHandler _handler;
private static StreamTransport CurrentStreamTransport;
internal StreamTransport(StreamTransportFactory transportFactory, IEndPointInformation endPointInformation, IConnectionHandler handler)
{
Debug.Assert(transportFactory != null);
Debug.Assert(endPointInformation != null);
Debug.Assert(endPointInformation.Type == ListenType.IPEndPoint);
Debug.Assert(handler != null);
_transportFactory = transportFactory;
_endPointInformation = endPointInformation;
_handler = handler;
CurrentStreamTransport = this;
}
public Task BindAsync()
{
return Task.CompletedTask;
}
public Task UnbindAsync()
{
return Task.CompletedTask;
}
public Task StopAsync()
{
return Task.CompletedTask;
}
public static StreamConnection CreateConnection()
{
var connection = new StreamConnection(CurrentStreamTransport, CurrentStreamTransport._handler);
return connection;
}
internal StreamTransportFactory TransportFactory => _transportFactory;
}
}
The ITransport classes are responsible for handling incoming connections and would usually contain a lot more code in the Bind and Unbind methods. As we will be creating connections ourselves, this class is pretty much empty, but we have put the CreateConnection
method here because it is this class' responsibility.
StreamConnection.cs
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.Buffers;
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
namespace Atlas.AspNetCore.Server.Kestrel.Transport.Streams
{
public sealed class StreamConnection : IConnectionInformation
{
public readonly RequestStream RequestStream;
public readonly MemoryStream ResponseStream;
private readonly StreamTransport _transport;
private readonly IConnectionHandler _connectionHandler;
private IConnectionContext _connectionContext;
private IPipeWriter _input;
private IPipeReader _output;
private IList<ArraySegment<byte>> _sendBufferList;
private const int MinAllocBufferSize = 2048;
internal StreamConnection(StreamTransport transport, IConnectionHandler connectionHandler)
{
Debug.Assert(transport != null);
_transport = transport;
_connectionHandler = connectionHandler;
_connectionContext = _connectionHandler.OnConnection(this);
RequestStream = new RequestStream(_connectionContext.Input);
ResponseStream = new MemoryStream();
}
public async Task StartAsync()
{
try
{
_input = _connectionContext.Input;
_output = _connectionContext.Output;
await DoSend();
}
catch (Exception)
{
// TODO: Log
}
}
private void SetupSendBuffers(ReadableBuffer buffer)
{
Debug.Assert(!buffer.IsEmpty);
Debug.Assert(!buffer.IsSingleSpan);
if (_sendBufferList == null)
{
_sendBufferList = new List<ArraySegment<byte>>();
}
// We should always clear the list after the send
Debug.Assert(_sendBufferList.Count == 0);
foreach (var b in buffer)
{
_sendBufferList.Add(GetArraySegment(b));
}
}
private async Task DoSend()
{
Exception error = null;
try
{
while (true)
{
// Wait for data to write from the pipe producer
var result = await _output.ReadAsync();
var buffer = result.Buffer;
if (result.IsCancelled)
{
break;
}
try
{
if (!buffer.IsEmpty)
{
if (buffer.IsSingleSpan)
{
var segment = GetArraySegment(buffer.First);
await ResponseStream.WriteAsync(segment.Array, segment.Offset, segment.Count);
}
else
{
SetupSendBuffers(buffer);
try
{
foreach (var segment in _sendBufferList)
{
await ResponseStream.WriteAsync(segment.Array, segment.Offset, segment.Count);
}
}
finally
{
_sendBufferList.Clear();
}
}
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
_output.Advance(buffer.End);
}
}
}
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.OperationAborted)
{
error = null;
}
catch (ObjectDisposedException)
{
error = null;
}
catch (IOException ex)
{
error = ex;
}
catch (Exception ex)
{
error = new IOException(ex.Message, ex);
}
finally
{
_output.Complete(error);
}
}
private static ArraySegment<byte> GetArraySegment(Buffer<byte> buffer)
{
if (!buffer.TryGetArray(out var segment))
{
throw new InvalidOperationException();
}
return segment;
}
public IPEndPoint RemoteEndPoint => null;
public IPEndPoint LocalEndPoint => null;
public PipeFactory PipeFactory => _transport.TransportFactory.PipeFactory;
public IScheduler InputWriterScheduler => InlineScheduler.Default;
public IScheduler OutputReaderScheduler => TaskRunScheduler.Default;
}
}
This class handles the bulk of the work and the key parts to mention are the_connectionContext.Input
and _connectionContext.Output
. These are the pipes which we actually send the request to and read the response from. If this was using sockets we would need to wait for data to come in, then forward that to the _connectionContext.Input
and also wait for data to arrive from _connectionContext.Output
and send that back out to the socket.
As we don't have any sockets here, we can assume that the entire request has been sent before we tell Kestrel to begin processing the request. Then we only need to wait for Kestrel to complete writing the response to _connectionContext.Output
.
Now, all we need to do is register our new classes into the DI container like this:
services.AddSingleton<ITransportFactory, StreamTransportFactory>();
This, however, will cause us a massive issue. We can only have one implementation of ITransportFactory
registered. So this means that we won't be able to make any requests to the ASP.NET Core application using a web browser. Whoops.
To get around this, we need to make a class which aggregates multiple transports.
TransportFactoryAggregator.cs
using Microsoft.AspNetCore.Server.Kestrel.Internal.System.IO.Pipelines;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
namespace Atlas.AspNetCore.Server.Kestrel.Transport.Streams
{
public sealed class TransportFactoryAggregator<A, B> : ITransportFactory
where A : ITransportFactory
where B : ITransportFactory
{
private readonly PipeFactory _pipeFactory = new PipeFactory();
private ITransportFactory FactoryA;
private ITransportFactory FactoryB;
public TransportFactoryAggregator(A factoryA, B factoryB)
{
this.FactoryA = factoryA;
this.FactoryB = factoryB;
}
public ITransport Create(IEndPointInformation endPointInformation, IConnectionHandler handler)
{
return new TransportAggregator(this.FactoryA.Create(endPointInformation, handler), this.FactoryB.Create(endPointInformation, handler));
}
internal PipeFactory PipeFactory => _pipeFactory;
}
}
TransportAggregator.cs
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using System.Threading.Tasks;
namespace Atlas.AspNetCore.Server.Kestrel.Transport.Streams
{
public class TransportAggregator : ITransport
{
private ITransport TransportA;
private ITransport TransportB;
public TransportAggregator(ITransport A, ITransport B)
{
this.TransportA = A;
this.TransportB = B;
}
public async Task BindAsync()
{
await Task.WhenAll(this.TransportA.BindAsync(), this.TransportB.BindAsync());
}
public async Task StopAsync()
{
await Task.WhenAll(this.TransportA.StopAsync(), this.TransportB.StopAsync());
}
public async Task UnbindAsync()
{
await Task.WhenAll(this.TransportA.UnbindAsync(), this.TransportB.UnbindAsync());
}
}
}
Using these classes, we can now create a helper method to set up the Dependency Injection.
WebHostBuilderSocketExtensions.cs
using Atlas.AspNetCore.Server.Kestrel.Transport.Streams;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Abstractions.Internal;
using Microsoft.Extensions.DependencyInjection;
using System;
namespace Microsoft.AspNetCore.Hosting
{
public static class WebHostBuilderSocketExtensions
{
/// <summary>
/// Specify an additional transport to be used by Kestrel alongside the current transport.
/// </summary>
/// <param name="hostBuilder">
/// The Microsoft.AspNetCore.Hosting.IWebHostBuilder to configure.
/// </param>
/// <returns>
/// The Microsoft.AspNetCore.Hosting.IWebHostBuilder.
/// </returns>
public static IWebHostBuilder UseAdditionalTransportFactory<T>(this IWebHostBuilder hostBuilder)
where T : ITransportFactory
{
return hostBuilder.ConfigureServices(services =>
{
Type iTransportFactoryImplementationType = null;
foreach (var s in services)
{
if (s.ServiceType == typeof(ITransportFactory))
{
iTransportFactoryImplementationType = s.ImplementationType;
break;
}
}
if (iTransportFactoryImplementationType != null)
{
services.AddSingleton(iTransportFactoryImplementationType, iTransportFactoryImplementationType);
services.AddSingleton(typeof(T), typeof(T));
var type = typeof(TransportFactoryAggregator<,>).MakeGenericType(iTransportFactoryImplementationType, typeof(T));
services.AddSingleton(typeof(ITransportFactory), type);
}
});
}
/// <summary>
/// Adds StreamTransports as an additional transport to Kestrel alongside the current transport.
/// </summary>
/// <param name="hostBuilder">
/// The Microsoft.AspNetCore.Hosting.IWebHostBuilder to configure.
/// </param>
/// <returns>
/// The Microsoft.AspNetCore.Hosting.IWebHostBuilder.
/// </returns>
public static IWebHostBuilder UseStreamTransport(this IWebHostBuilder hostBuilder)
{
StreamTransportFactory.PairingToken = hostBuilder.GetSetting("TOKEN");
return hostBuilder.UseAdditionalTransportFactory<StreamTransportFactory>();
}
}
}
The UseStreamTransport
extension method gets the PairingToken
from the hostBuilder and saves it for use later as we need to include this token in all requests.
The UseAdditionalTransportFactory
extension method looks up which implementation is currently registered for ITransportFactory
and then creates a new TransportAggregatorFactory using that implementation with the new implementation passed in as a generic parameter.
Summary
If you got this far, well done! This post turned out to be a bit of a monster. If you clone the Kestrel.Transport.Streams GitHub repo repository you will find a project imaginatively named "WebApplication1." When you run this application, go to the About page. Here you will see the content of the Contact page set as the message that normally appears on the About page. This application uses the standard Razor Pages template. All that was changed was adding .UseStreamTransport()
in program.cs, Layout = "";
was added to contact.cshtml and the 'get' method in about.cshtml.cs was changed to:
public async Task OnGetAsync()
{
var connection = StreamTransport.CreateConnection();
this.Message = await connection.Get("/contact");
}
Now when you visit the about page, the application makes a second internal web request using the streams transport to the Contact page, then returns the result to the About page model.
Why would anyone want to do this? For science of course!
As always, you can find this project on GitHub and also on NuGet.
Published at DZone with permission of Dean North. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments