Giter Club home page Giter Club logo

interprocess's Introduction

Interprocess

Publish Workflow Latest NuGet License: MIT .NET Platform .NET Core

Cloudtoid Interprocess is a cross-platform shared memory queue for fast communication between processes (Interprocess Communication or IPC). It uses a shared memory-mapped file for extremely fast and efficient communication between processes and it is used internally by Microsoft.

  • Fast: It is extremely fast.
  • Cross-platform: It supports Windows, and Unix-based operating systems such as Linux, MacOS, and FreeBSD.
  • API: Provides a simple and intuitive API to enqueue/send and dequeue/receive messages.
  • Multiple publishers and subscribers: It supports multiple publishers and subscribers to a shared queue.
  • Efficient: Sending and receiving messages is almost heap memory allocation free reducing garbage collections.
  • Developer: Developed by a guy at Microsoft.

NuGet Package

The NuGet package for this library is published here.

Note: To improve performance, this library only supports 64-bit CLR with 64-bit processor architectures. Attempting to use this library on 32-bit processors, 32-bit operating systems, or on WOW64 may throw a NotSupportedException.

Usage

This library supports .NET Core 3.1+ and .NET 6+. It is optimized for .NET dependency injection but can also be used without DI.

Usage without DI

Creating a message queue factory:

var factory = new QueueFactory();

Creating a message queue publisher:

var options = new QueueOptions(
    queueName: "my-queue",
    bytesCapacity: 1024 * 1024);

using var publisher = factory.CreatePublisher(options);
publisher.TryEnqueue(message);

Creating a message queue subscriber:

options = new QueueOptions(
    queueName: "my-queue",
    bytesCapacity: 1024 * 1024);

using var subscriber = factory.CreateSubscriber(options);
subscriber.TryDequeue(messageBuffer, cancellationToken, out var message);

Usage with DI

Adding the queue factory to the DI container:

services
    .AddInterprocessQueue() // adding the queue related components
    .AddLogging(); // optionally, we can enable logging

Creating a message queue publisher using an instance of IQueueFactory retrieved from the DI container:

var options = new QueueOptions(
    queueName: "my-queue",
    bytesCapacity: 1024 * 1024);

using var publisher = factory.CreatePublisher(options);
publisher.TryEnqueue(message);

Creating a message queue subscriber using an instance of IQueueFactory retrieved from the DI container:

var options = new QueueOptions(
    queueName: "my-queue",
    bytesCapacity: 1024 * 1024);

using var subscriber = factory.CreateSubscriber(options);
subscriber.TryDequeue(messageBuffer, cancellationToken, out var message);

Sample

To see a sample implementation of a publisher and a subscriber process, try out the following two projects. You can run them side by side and see them in action:

Please note that you can start multiple publishers and subscribers sending and receiving messages to and from the same message queue.

Performance

A lot has gone into optimizing the implementation of this library. For instance, it is mostly heap-memory allocation free, reducing the need for garbage collection induced pauses.

Summary: A full enqueue followed by a dequeue takes ~250 ns on Linux, ~650 ns on MacOS, and ~300 ns on Windows.

Details: To benchmark the performance and memory usage, we use BenchmarkDotNet and perform the following runs:

Method Description
Message enqueue Benchmarks the performance of enqueuing a message.
Message enqueue and dequeue Benchmarks the performance of sending a message to a client and receiving that message. It is inclusive of the duration to enqueue and dequeue a message.
Message enqueue and dequeue - no message buffer Benchmarks the performance of sending a message to a client and receiving that message. It is inclusive of the duration to enqueue and dequeue a message and memory allocation for the received message.

You can replicate the results by running the following command:

dotnet run Interprocess.Benchmark.csproj -c Release

You can also be explicit about the .NET SDK and Runtime(s) versions:

dotnet run Interprocess.Benchmark.csproj -c Release -f net7.0 --runtimes net7.0 net6.0 netcoreapp3.1

On Windows

Host:

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22000
Intel Core i9-10900X CPU 3.70GHz, 1 CPU, 20 logical and 10 physical cores
.NET SDK=6.0.201
  [Host]   : .NET 6.0.3 (6.0.322.12309), X64 RyuJIT
  .NET 6.0 : .NET 6.0.3 (6.0.322.12309), X64 RyuJIT

Results:

Method Mean (ns) Error (ns) StdDev (ns) Allocated
Message enqueue 192.7 3.61 3.21 -
Message enqueue and dequeue 305.6 5.96 6.62 -
Message enqueue and dequeue - no message buffer 311.5 5.90 9.85 32 B

On MacOS

Host:

BenchmarkDotNet=v0.13.1, OS=macOS Big Sur 11.6 (20G165) [Darwin 20.6.0]
Intel Core i5-8279U CPU 2.40GHz (Coffee Lake), 1 CPU, 8 logical and 4 physical cores
.NET SDK=5.0.401
  [Host]        : .NET 5.0.10 (5.0.1021.41214), X64 RyuJIT
  .NET 5.0      : .NET 5.0.10 (5.0.1021.41214), X64 RyuJIT

Results:

Method Mean (ns) Error (ns) StdDev (ns) Allocated
Message enqueue 487.50 4.75 3.96 -
Message enqueue and dequeue 666.10 10.91 10.20 -
Message enqueue and dequeue - no message buffer 689.33 13.38 15.41 32 B

On Ubuntu (through WSL)

Host:

BenchmarkDotNet=v0.13.2, OS=ubuntu 20.04
Intel Core i9-10900X CPU 3.70GHz, 1 CPU, 20 logical and 10 physical cores
.NET SDK=6.0.403
  [Host]   : .NET 6.0.11 (6.0.1122.52304), X64 RyuJIT AVX2
  .NET 6.0 : .NET 6.0.11 (6.0.1122.52304), X64 RyuJIT AVX2

Results:

Method Mean (ns) Error (ns) StdDev (ns) Allocated
Message enqueue 5.3 - - -
Message enqueue and dequeue 169.9 3.08 4.01 -
Message enqueue and dequeue - no message buffer 179.4 1.91 1.60 32 B

Implementation Notes

This library relies on Named Semaphores To signal the existence of a new message to all message subscribers and to do it across process boundaries. Named semaphores are synchronization constructs accessible across processes.

.NET Core 3.1 and .NET 6/7 do not support named semaphores on Unix-based OSs (Linux, macOS, etc.). Instead we are using P/Invoke and relying on operating system's POSIX semaphore implementation. (Linux and MacOS implementations).

This implementation will be replaced with System.Threading.Semaphore once .NET adds support for named semaphores on all platforms.

How to Contribute

  • Create a branch from main.
  • Ensure that all tests pass on Windows, Linux, and MacOS.
  • Keep the code coverage number above 80% by adding new tests or modifying the existing tests.
  • Send a pull request.

Author

Pedram Rezaei is a software architect at Microsoft with years of experience building highly scalable and reliable cloud-native applications for Microsoft.

What is next

Here are a couple of items that we are working on.

  • Create a marketing/documentation website
  • Once .NET supports named semaphores on Linux, then start using them.

interprocess's People

Contributors

prezaei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

interprocess's Issues

Unhandled exception when multi subscribers

under window 10 C# net6
version: 1.0.169

Created a simple publisher/subscriber test and working perfectly for 1P1C mode (P-producer, C- consumer)
however, when trying 1P2C mode, get the following exception after a short while in one of consumers

Unhandled exception. System.InvalidOperationException: This is unexpected and can be a serious bug. We take a lock on this message prior to this point which should ensure that the HeadOffset is left unchanged.
   at Cloudtoid.Interprocess.Subscriber.TryDequeueImpl(Nullable`1 resultBuffer, CancellationToken cancellation, ReadOnlyMemory`1& message)
   at Cloudtoid.Interprocess.Subscriber.TryDequeueCore(Nullable`1 resultBuffer, CancellationToken cancellation, ReadOnlyMemory`1& message)
   at Cloudtoid.Interprocess.Subscriber.TryDequeue(Memory`1 resultBuffer, CancellationToken cancellation, ReadOnlyMemory`1& message)

Got into "System.ArgumentException: The initial count cannot be greater than 32767." while bytesCapacity in options is not

Problem description

At first, I import Cloudtoid.Interprocess to my ASP.NET WEB API backend application, every time it run using var publisher = factory.CreatePublisher(options); it got into the exception below:

Unhandled exception. System.ArgumentException: The initial count cannot be greater than 32767. (Parameter 'initialCount')
   at Cloudtoid.Interprocess.Semaphore.MacOS.Interop.CreateOrOpenSemaphore(String name, UInt32 initialCount)
   at Cloudtoid.Interprocess.Semaphore.MacOS.SemaphoreMacOS..ctor(String name, Boolean deleteOnDispose)
   at Cloudtoid.Interprocess.InterprocessSemaphore.CreateReleaser(String name)
   at Cloudtoid.Interprocess.Publisher..ctor(QueueOptions options, ILoggerFactory loggerFactory)
   at Cloudtoid.Interprocess.QueueFactory.CreatePublisher(QueueOptions options)
   ...

I move the code into a vary simple project which just contains a single Program.cs:

using Cloudtoid.Interprocess;
using Microsoft.Extensions.Logging;

var factory = new QueueFactory();
var options = new QueueOptions(
    queueName: "sample-queue", bytesCapacity: 24);// just 24
using var publisher = factory.CreatePublisher(options);

It got into the same exception.

Reproduction

I pushed that simple project to this GitHub repository

Additional information

Cloudtoid.Interprocess version 1.0.175

My OS information:

# uname -a
Darwin hostname.lan 22.4.0 Darwin Kernel Version 22.4.0: Mon Mar  6 20:59:28 PST 2023; root:xnu-8796.101.5~3/RELEASE_ARM64_T6000 arm64

My dotnet information:

# dotnet --info
.NET SDK:
 Version:   7.0.203
 Commit:    5b005c19f5

Runtime Environment:
 OS Name:     Mac OS X
 OS Version:  13.3
 OS Platform: Darwin
 RID:         osx.13-arm64
 Base Path:   /usr/local/share/dotnet/sdk/7.0.203/

Host:
  Version:      7.0.5
  Architecture: arm64
  Commit:       8042d61b17

.NET SDKs installed:
  7.0.203 [/usr/local/share/dotnet/sdk]

.NET runtimes installed:
  Microsoft.AspNetCore.App 7.0.5 [/usr/local/share/dotnet/shared/Microsoft.AspNetCore.App]
  Microsoft.NETCore.App 7.0.5 [/usr/local/share/dotnet/shared/Microsoft.NETCore.App]

Other architectures found:
  None

Environment variables:
  DOTNET_ROOT       [/opt/homebrew/opt/dotnet/libexec]

global.json file:
  Not found

Learn more:
  https://aka.ms/dotnet/info

Download .NET:
  https://aka.ms/dotnet/download

Benchmarks are incorrect

You're using GlobalSetup/Cleanup instead of IterationSetup/Cleanup to create the Publisher, so after the first few runs TryEnqueue just starts returning false without doing any work.

Once you fix that, you'll start getting warnings that the op count is too low to get legitimately valid results.

Once everything is fixed, you'll probably get something like this, if you use a message size of 128, queue capacity of 33554432 bytes, and 246723 operations per iteration. (e.g. [Benchmark(Description = "Enqueue messages", OperationsPerInvoke = 246723)] and for (var i = 0; i < 246723; ++i))

Of course, per machine, YMMV...

|                                                    Method |     Mean |    Error |   StdDev |  Gen 0 | Allocated |
|---------------------------------------------------------- |---------:|---------:|---------:|-------:|----------:|
|                                        'Enqueue messages' | 543.8 ns |  4.19 ns |  3.50 ns |      - |         - |
|                            'Enqueue messages (zero-copy)' | 547.2 ns |  2.78 ns |  2.32 ns |      - |         - |
|             'Enqueue messages (zero-copy, func pointers)' | 557.0 ns |  3.71 ns |  3.29 ns |      - |         - |
|      'Enqueue and dequeue messages (buffered allocating)' | 724.7 ns |  5.08 ns |  4.50 ns | 0.0284 |     152 B |
|             'Enqueue and dequeue messages (buffer reuse)' | 712.3 ns |  6.32 ns |  5.27 ns |      - |         - |
|                'Enqueue and dequeue messages (zero-copy)' | 709.4 ns | 13.67 ns | 15.20 ns |      - |         - |
| 'Enqueue and dequeue messages (zero-copy, func pointers)' | 718.9 ns |  7.67 ns |  6.80 ns |      - |         - |

You don't have the zero-copy ops yet, but you can review a future PR at https://github.com/StirlingLabs/interprocess

It's still quite fast, but 7ns/op is 28 CPU cycles/op at 4ghz (between 7 and 36 GP instructions), which is a dead giveaway that no work was being done; it's literally impossibly fast, it has to just be checking a value and returning false. Putting the benchmark into debug mode (BenchmarkRunner.Run(typeof(Program).Assembly, new DebugInProcessConfig());) and stepping into it confirmed it.

Leaking named semaphores on Linux

I was trying to figure out a way to make this even faster and found that I am leaking named semaphores on Linux. I'll work on a fix but one is not coming to mind immediately.

System.UnauthorizedAccessException: 'Access to the path is denied.' When Create Publisher

Hi
On windows, there is System.UnauthorizedAccessException: 'Access to the path is denied.' if the publisher and subscriber are created by different accounts in different processes.
for example, a windows service process create a subscriber, later, when an user application try to create the publisher, the exception will be thrown. Can we create the memory mapped file that can be accessed by all everyone?

Thanks

This library don't support multiple subscribers.

I test the sample publisher and subscriber on windows10 OS, then find that it doesn't support multiple subscribers. I start a publisher process and two same subscriber processes (called A and B), but subscriber A receive different data compared with subscriber B, so maybe this is a bug. Some screenshots are as follows.
image
image
image

Start Subscriber at Head Instead of Tail

Hi, thanks for this nice library! ๐Ÿ™

I was playing with the sample, and noticed that when the Subscriber starts, it reads back any existing queued messages first. What I'd like is for the Subscriber to only read new messages from that point onwards. Would that be possible?

Add support for setting MMF security [feature request]

If the 2 processes are running as the same user, there is no problem with security. But if you want 2 processes with different users to communicate, you will get "access denied".

In .NET Framework this was possible using MemoryMappedFileSecurity - but, bizarelly, Microsoft have chosen not to add support for this in .NET Core/.NET 5/6.

The request here is to add the interop plumbing to allow setting an ACL for MMFs (I did something similar a while back, when Microsoft made the same bizare decision for named pipes; not sure if they ever did implement ACLs there).

Interaction with Kestrel in self-hosted console app

Hi! I have a working monitoring process using this great library. Today I decided that I was going to add a self-hosted API through Kestrel...something very simple and something I have done many times before. However, I am finding a couple of really odd issues: First, the processing works fine with these additions but Kestrel just returns no data. The controllers are getting called but no data is returned. So I pulled my hair figuring it all out and ended up commenting out everything but the WebHostBuilder configuration. When I remove this nuget package, and that is the only change, the controller behaves perfectly fine - returns data. Simply adding this package back into the project and the weird behavior begins.

Any ideas?

Update: I'm actually getting 504s in Fiddler. That's a gateway timeout - this is really weird to me. I appreciate any insight.

Ubuntu - System.DllNotFoundException: Unable to load shared library 'librt'

Hi,

Can you please help me with that error. I just created a sample on TFM .NET 5.0 and get that exception on Ubuntu WSL (WIndows Subsystem for Linux) on Window 10.

Sample: https://[email protected]/zone-eric/NET%20Core%20Crossplattform%20IPC/_git/NETCoreXplatIPC

Description: Ubuntu 20.04.1 LTS
Release: 20.04
Codename: focal

System.DllNotFoundException: Unable to load shared library 'librt' or one of its dependencies. In order to help diagnose loading problems, consider setting the LD_DEBUG environment variable: liblibrt: cannot open shared object file: No such file or directory
at Cloudtoid.Interprocess.Semaphore.Linux.Interop.sem_open(String name, Int32 oflag, UInt32 mode, UInt32 value)
at Cloudtoid.Interprocess.Semaphore.Linux.Interop.CreateOrOpenSemaphore(String name, UInt32 initialCount)
at Cloudtoid.Interprocess.Semaphore.Linux.SemaphoreLinux..ctor(String name, Boolean deleteOnDispose)
at Cloudtoid.Interprocess.InterprocessSemaphore.CreateWaiter(String name)
at Cloudtoid.Interprocess.Subscriber..ctor(QueueOptions options, ILoggerFactory loggerFactory)
at Cloudtoid.Interprocess.QueueFactory.CreateSubscriber(QueueOptions options)
at Sunify.WorkerService.Worker.ExecuteAsync(CancellationToken stoppingToken) in /home/eric/source/repos/NETCoreXplatIPC/xPlat-IPC/Sunify.WorkerService/Worker.cs:line 26
at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
at Sunify.WorkerService.Program.Main(String[] args) in /home/eric/source/repos/NETCoreXplatIPC/xPlat-IPC/Sunify.WorkerService/Program.cs:line 15

Full command line log is here: https://gist.github.com/ericbrunner/cd2122fad75920cdb758735cce0bb220

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.