Giter Club home page Giter Club logo

samples's Introduction

This repo hosts samples showing the use of NetMQ, an implementation of ZeroMQ for .NET.

These sample patterns are mostly from the ZeroMQ Guide, which is recommended reading for working with NetMQ and ZeroMQ in general.

Following are the current samples:

  • Beacon
  • Brokerless Reliability (Freelance pattern)
  • Hello World
  • Inter Broker Worker
  • Load Balancing Pattern
  • Majordomo Pattern
  • Multithreading
  • Pirate Pattern
  • Titanic Pattern
  • Weather Update

Feel free to make your own contribution and make a pull request.

samples's People

Contributors

camuvingian avatar drewnoakes avatar nicolaspierre1990 avatar somdoron avatar vxmark avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

samples's Issues

In Majordomo Pattern , Implementation of New Async method for post processing after message received

In MDPBroker.cs of MajordomoPattern implementation i feel there is delay in responding to message after it recieved at broker.

So i am creating a async version of one method. Please comment if there is anything wrong in it as per pattern implementation

private void ProcessReceivedMessageAsync(object sender, NetMQSocketEventArgs e)
        {
            try
            {
                var msg = e.Socket.ReceiveMultipartMessage();
                Debug.WriteLine($"Received: {msg}");
                var senderFrame = msg.Pop();               // [e][protocol header][service or command][data]
                var empty = msg.Pop();                     // [protocol header][service or command][data]
                var headerFrame = msg.Pop();               // [service or command][data]
                var header = headerFrame.ConvertToString();

                switch (header)
                {
                    case MDPConstants.MDP_CLIENT_HEADER:
                        {
                            Task.Factory.StartNew(() => ProcessClientMessage(senderFrame, msg), TaskCreationOptions.LongRunning);
                        }
                        break;
                    case MDPConstants.MDP_WORKER_HEADER:
                        ProcessWorkerMessage(senderFrame, msg);
                        break;
                    default:
                        Debug.WriteLine("ERROR - message with invalid protocol header!");
                        break;
                }
            }
            catch (Exception ex)
            {

                Logger.Fatal(ex.Message, ex);
            }
        }

NetMQ doesn't appear to communicate with pyzmq?

When I use ZeroMQ library or ZMQ library (from NUGET), I can communicate perfectly fine between C# and Python (pyzmq).
However, when I use NetMQ, I cannot get any interoperability between them.

I've attempted PUB/SUB and REP/REQ - as mentioned - they work well with the other two C# libraries and Python (both as initiator and receiver and vice-versa), but Python doesn't see anything whatsoever when using NetMQ. From the print statements in my code, it looks like NetMQ claims it has sent messages, but nothing is seen by Python and vice-versa in the other direction.

It's the same code for all three libraries (as ZeroMQ/ZMQ). I really don't know what is going on. Using Python 3.8.7 and examples from github from all three C# libraries.

Could you kindly investigate this? With thanks!

Assembly NetMQ, Version=3.3.2.2 Multiple Publishers Beacon Not working

BrokenBeaconExample.txt

The offending code is:
//publisher2.Publish("This is a message 2", TimeSpan.FromSeconds(2000));

Which is the second publish. If I un comment this code, I get the following error:
"An unhandled exception of type 'System.NullReferenceException' occurred in NetMQ.dll
Additional information: Object reference not set to an instance of an object."

When I step over the line:
string received = subscriber.ReceiveString(out peerName);

Any thoughts?

ClientAsync Ctor Overload Missing Call to Default Ctor

In project MajordomoProtocol file MDPClientAsync the overloaded ctor

public MDPClientAsync([NotNull] string brokerAddress, string identity)
{
    if (string.IsNullOrWhiteSpace(brokerAddress))
        throw new ArgumentNullException(nameof(brokerAddress), "The broker address must not be null, empty or whitespace!");

    if (!string.IsNullOrWhiteSpace(identity))
        m_identity = Encoding.UTF8.GetBytes(identity);

    m_brokerAddress = brokerAddress;
}

is not calling this().

Bug inside PPP Queue component: Available Worker Management

I think it is a bug to add workers to the list of available workers for every kind of network activity a queue/broker notices from them. And I'd say this is wrong by design.

Imagine a worker which is currently idle and sending heartbeat messages periodically: The implementation will add this worker over and over again to the list of available workers. The list would grow endlessly if there wasn't expiration management which removes older duplicates continously from the list.

message cutting

Hi all,
i would like to ask for help.
sometimes i got message which is incomplete, zeroes on the end of the buffer.
Example:
System.FormatException: Deserialization of :[eyJDb3JyZWxhdGlvbklkIjoiMzM1MWNjNTktMzBmNi00NTk1LWFjYmQtMGRmYTZlMjIyYTRlIiwiRXhwaXJhdGlvblRpbWUiOm51bGwsIlNvdXJjZUFkZHJlc3MiOm51bGwsIlNlbnRUaW1lIjoiMjAxOS0xMS0wN1QxNTowOTowNC43NjQ4MzkzKzAxOjAwIiwiUGF5bG9hZFR5cGUiOiJLaXN0bGVyLkNkdC5Db250cmFjdHMuTG9nZ2VyLk1vZGVscy5JTG9nTWVzc2FnZSIsIlBheWxvYWQiOiJleUpNWlhabGJDSTZNaXdpVTI5MWNtTmxJam9pUzJsemRHeGxjaTVEWkhRdVdHMXNVbkJqVFdWemMyRm5aVUZrWVhCMFpYSk5hV1JrYkdWM1lYSmxMazFwWkdSc1pYZGhjbVV1VW1WeGRXVnpkRkpsYzNCdmJuTmxURzluWjJsdVowMXBaR1JzWlhkaGNtVWlMQ0pOWlhOellXZGxJam9pU0hSMGNDQlNaWE53YjI1elpTQkpibVp2Y20xaGRHbHZianBjWEhKY1hHNVRZMmhsYldFNmFIUjBjQ0JJYjNOME9pQXhNamN1TUM0d0xqRTZPREkzTlNCUVlYUm9PaUJjWEM5U1VFTXlJRkYxWlhKNVUzUnlhVzVuT2lBZ1VtVnpjRzl1YzJVZ1FtOWtlVG9nUEQ5NGJXd2dkbVZ5YzJsdmJqMWNYRndpTVM0d1hGeGNJaUJsYm1OdlpHbHVaejFjWEZ3aWRYUm1MVGhjWEZ3aVB6NWNYSEpjWEc0OGJXVjBhRzlrVW1WemNHOXVjMlUrWEZ4eVhGeHVJQ0E4Y0dGeVlXMXpQbHhjY2x4Y2JpQWdJQ0E4Y0dGeVlXMCtYRnh5WEZ4dUlDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ1BHRnljbUY1UGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0E4WkdGMFlUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBuTjVjM1JsYlM1c2FYTjBUV1YwYUc5a2N6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytjM2x6ZEdWdExtMWxkR2h2WkZOcFoyNWhkSFZ5WlR4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK2MzbHpkR1Z0TG0xbGRHaHZaRWhsYkhBOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtZGxkRjlyWlhsM2IzSmtYMjVoYldWelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXlkVzVmYTJWNWQyOXlaRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWjJWMFgydGxlWGR2Y21SZllYSm5kVzFsYm5SelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NW5aWFJmYTJWNWQyOXlaRjlrYjJOMWJXVnVkR0YwYVc5dVBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXpaWEoyYVdObExrZGxkRXhwYzNSUFpreHZZV1JsWkVWeGRXbHdiV1Z1ZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1kyRnNMbE5sZEUxdmJtbDBiM0pOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWpZV3d1UjJWMFRXOXVhWFJ2Y2tacFpXeGtjenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWTJGc0xsTmxkRU52Ym5ScGJuVnZkWE5QZFhSd2RYUThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbU5oYkM1VFpYUlFkV3h6WldSUGRYUndkWFE4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1OaGJDNVRaWFJUYVc1MWMyOXBaR0ZzVDNWMGNIVjBQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOXVQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOW1aanhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrZGxibVZ5WVhSbFJHTldiMngwWVdkbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtZWEV1VFdWaGMzVnlaVlp2YkhSaFoyVkVZenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrMWxZWE4xY21WTmFXNU5ZWGc4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNU5aV0Z6ZFhKbFZtOXNkR0ZuWlZCbFlXdFFaV0ZyUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1a1lYRXVVbVZ6WlhSRVpYWnBZMlU4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNVNaV0ZrUkdsbmFYUmhiRWx1Y0hWMFVHOXlkRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExsTmxkRVJwWjJsMFlXeFBkWFJ3ZFhSTWFXNWxVM1JoZEdVOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtUmhjUzVIWlhSVGFXZHVZV3hUWVcxd2JHVnpQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVrYlcwdVVtVmhaRlpoYkhWbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtiVzB1VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytaRzF0TGxObGRFbHVjSFYwUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1bVp5NVRkMmwwWTJoRGFHRnVibVZzVDI0OFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtWm5MbE4zYVhSamFFTm9ZVzV1Wld4UFptWThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbVpuTGxObGRFOTFkSEIxZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1ptY3VVMlYwUVhKaWFYUnlZWEo1VTJsbmJtRnNQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVtWnk1TWIyRmtRWEppYVhSeVlYSjVVMmxuYm1Gc1BGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytiM05qTGxOMGIzQThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbTl6WXk1VFpYUlNkVzVOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VjJGcGRGUnBiR3hCWTNGMWFYTnBkR2x2YmtOdmJYQnNaWFJsUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1dmMyTXVWMkZwZEZWdWRHbHNWSEpwWjJkbGNrRnliV1ZrUEZ4Y0wzTjBjbWx1Wn ] failed ---> System.FormatException: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.

white space on the end of message is null characters.

i have situation when creating subscribers and publishers just when need it, they are wrapped in using.

I am sending a json message which is encoded into base64 string.

my serializer methods:

public string SerializeToString(object _object)
{
return Base64Encode(JsonConvert.SerializeObject(_object));
}

    public object Deserialize(byte[] data, Type targetType)
    {
        try
        {
            var json = Base64Decode(_encoding.GetString(data));
            return JsonConvert.DeserializeObject(json, targetType);
        }
        catch (Exception e)
        {
            throw new FormatException($"Deserialization of :[{_encoding.GetString(data)}] failed", e);
        }
    }

i am using xpub-sub pattern. I have router with logging:

public class PubSubMessageRouter : IPubSubMessageRouter
{
    private Proxy _proxy;
    private NetMQPoller _poller;
    private XSubscriberSocket _xSub;
    private XPublisherSocket _xPub;
    private PushSocket _controlInPush = new PushSocket();
    private PushSocket _controlOutPush = new PushSocket();
    private PullSocket _controlInPull = new PullSocket();
    private PullSocket _controlOutPull = new PullSocket();
    private Task _workerReceived;
    private Task _workerSent;
    private CancellationTokenSource _cancellationTokenSource;
    private CancellationToken _cancellationToken;
    private WeakAction<MessageBase> _subscriberReceived;
    private WeakAction<MessageBase> _subscriberSent;
    private WeakAction<Exception> _onExceptionHandler;
    private ISerializer _serializer;
    private bool _disposed;

    public string FrontendConnectionString { get; }

    public string BackendConnectionString { get; }

    public PubSubMessageRouter(
        string backendConnectionString,
        string frontendConnectionString,
        string controlInConnectionName = "controlIn",
        string controlOutConnectionName = "controlOut")
    {
        BackendConnectionString = backendConnectionString;
        FrontendConnectionString = frontendConnectionString;
        _serializer = new Serialize.Json.JsonObjectSerializer();
        _xSub = new XSubscriberSocket($"@{FrontendConnectionString}");
        _xPub = new XPublisherSocket($"@{BackendConnectionString}");
        _xSub.Options.ReceiveHighWatermark = 10000;
        _xSub.Options.ReceiveBuffer = 44000;
        _xSub.Options.SendBuffer = 44000;
        _xSub.Options.Linger = TimeSpan.FromSeconds(5);
        _xPub.Options.ReceiveHighWatermark = 10000;
        _xPub.Options.ReceiveBuffer = 44000;
        _xPub.Options.SendBuffer = 44000;
        _xPub.Options.Linger = TimeSpan.FromSeconds(5);
        _controlInPush.Bind($"inproc://{controlInConnectionName}");
        _controlInPull.Connect($"inproc://{controlInConnectionName}");
        _controlOutPush.Bind($"inproc://{controlOutConnectionName}");
        _controlOutPull.Connect($"inproc://{controlOutConnectionName}");
        _poller = new NetMQPoller
        {
            _xSub,
            _xPub
        };
        // proxy messages between frontend / backend
        _proxy = new Proxy(
            _xSub,
            _xPub,
            poller: _poller,
            controlIn: _controlInPush,
            controlOut: _controlOutPush
        );
    }

    public PubSubMessageRouter(
        string backendConnectionString,
        string frontendConnectionString,
        Action<MessageBase> handlerReceived,
        Action<MessageBase> handlerSent,
        Action<Exception> onException=null) : this(backendConnectionString, frontendConnectionString)
    {
        _subscriberReceived = new WeakAction<MessageBase>(handlerReceived);
        _subscriberSent = new WeakAction<MessageBase>(handlerSent);

        if (onException != null)
        {
            _onExceptionHandler = new WeakAction<Exception>(onException);
        }
    }


    private async void MessagePumpReceived(CancellationToken ct)
    {
        while (true)
        {
            try
            {
                if (_cancellationToken.IsCancellationRequested)
                    break;

                var mqMessage = new NetMQMessage();

                if (_controlInPull.TryReceiveMultipartMessage(ref mqMessage))
                {
                    if (mqMessage.FrameCount == 1) //subscribe notification
                    {
                        continue;
                    }

                    var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));

                    if (_subscriberReceived?.Target != null)
                    {
                        _subscriberReceived.Execute(message);
                    }
                }
                else
                {
                    await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
                }
            }
            catch (Exception ex)
            {
                OnExceptionHandler(ex);
            }
        }
    }

    private async void MessagePumpSent(CancellationToken ct)
    {
        while (true)
        {
            try
            {
                if (_cancellationToken.IsCancellationRequested)
                    break;

                var mqMessage = new NetMQMessage();

                if (_controlOutPull.TryReceiveMultipartMessage(ref mqMessage))
                {
                    if (mqMessage.FrameCount == 1) //subscribe notification
                    {
                        continue;
                    }


                    var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));

                    if (_subscriberSent?.Target != null)
                    {
                        _subscriberSent.Execute(message);
                    }
                }
                else
                {
                    await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
                }
            }
            catch (Exception ex)
            {
                OnExceptionHandler(ex);
            }
        }
    }

    private void OnExceptionHandler(Exception obj)
    {
        if(_onExceptionHandler != null && _onExceptionHandler.IsAlive)
        {
            _onExceptionHandler.Execute(obj);
        }
    }

    public void StartRouting()
    {
        _cancellationTokenSource = new CancellationTokenSource();
        _cancellationToken = _cancellationTokenSource.Token;

        _proxy.Start();
        _poller.RunAsync();

        _workerReceived = Task.Run(
            () => MessagePumpReceived(_cancellationToken),
            _cancellationToken);

        _workerSent = Task.Run(
            () => MessagePumpSent(_cancellationToken),
            _cancellationToken);
    }

    public async Task StopRouting()
    {
        _proxy.Stop();
        _poller.Stop();

        _cancellationTokenSource?.Cancel();
        await Task.WhenAny(Task.WhenAll(_workerSent, _workerReceived),
            Task.Delay(5000)); //wait when all thread finished, but max 5 seconds
    }

    public async void Dispose(bool disposing)
    {
        if (_disposed)
            return;

        if (disposing)
        {
            await StopRouting();
            _poller?.Dispose();
            _poller = null;
            _proxy = null;
            _xSub?.Dispose();
            _xSub = null;
            _xPub?.Dispose();
            _xPub = null;
            _controlInPush?.Dispose();
            _controlInPush = null;
            _controlInPull?.Dispose();
            _controlInPull = null;
            _controlOutPush?.Dispose();
            _controlOutPush = null;
            _controlOutPull?.Dispose();
            _controlOutPull = null;
            _workerSent = null;
            _workerReceived = null;
            _cancellationTokenSource?.Dispose();
            _cancellationTokenSource = null;
            _subscriberReceived.MarkForDeletion();
            _subscriberReceived = null;
            _subscriberSent.MarkForDeletion();
            _subscriberSent = null;
            _onExceptionHandler.MarkForDeletion();
            _onExceptionHandler = null;
            _serializer = null;
        }

        _disposed = true;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
        GC.Collect();
    }
}

I have ansyc message handlers which are fired by service api. Problem usually happens when i am sending 2 long messages one following other:

public async Task Handle(MessageBase message, IApiContext context)
{
using (var logger = new MessageLogger(typeof(RequestSuiteMetaDataHandler).FullName, context.ApplicationSettings.FrontendConnection, context.LogTopic))
using (var adapterManager = new AdaptersManager.AdaptersManager(new WindowsRegistrySettingsProvider()))
{

            var response = new RequestSuiteMetaDataResponse();

            try
            {
                var request = new JsonObjectSerializer().Deserialize<RequestSuiteMetaData>(message.Payload);
               //some code skipped
               
                response.Succeed = true;
                response.SuiteName = queryResponse.SuiteName;
                response.TestPhases = queryResponse.TestPhases;// long content around 8 kB
            catch (Exception exp)
            {
                await logger.LogMessage(LogLevel.Error, exp.GetAllMessages());
                response.Error = exp.GetAllMessages();
                response.Succeed = false;
            }

            var payloadString = new JsonObjectSerializer().SerializeToString(response);

            var messageToSend = new MessageBase
            {
                CorrelationId = message.CorrelationId,
                PayloadType = typeof(IRequestSuiteMetaDataResponse).FullName,
                SentTime = DateTime.Now,
                Payload = payloadString
            };

            // problem source:
            await logger.LogMessage(LogLevel.Debug,
                $"Sending message: {Base64Decode(messageToSend.Payload)} of type: {messageToSend.PayloadType}");
            Thread.Sleep(50);
            await context.ResponsePublisher.SendMessage(context.ResponseTopic, messageToSend);  
        }
    }

i have central async logger, sending log overzero mq.
public class MessageLogger: ILogger
{
const char TestBackSlash = '\';
const char TestSlash = '/';
const char TestDblQuote = '"';

    private readonly string _source;
    private readonly string _logTopic;
    private ISerializer _serializer;
    private IPublisher _publisher;
    private bool _disposed;

    public CdtMessageLogger(string source, 
        string frontendConnectionString, 
        string logTopic = TopicsList.AllLogs, 
        ISerializer serializer = null, 
        SingleThreadTaskScheduler scheduler = null)
    {
        _source = source;
        _logTopic = logTopic;
        _serializer = serializer ?? new Serialize.Json.JsonObjectSerializer();
        _publisher = new Publisher($">{frontendConnectionString}", scheduler);
    }

    public Task LogMessage(LogLevel level, string message)
    {
        var messageToSend = new MessageBase
        {
            CorrelationId = Guid.NewGuid(),
            PayloadType = typeof(ILogMessage).FullName,
            SentTime = DateTime.Now,
            Payload = _serializer.SerializeToString(new LogMessage
            {
                Level = level,
                Message = RemoveSpecialChars(message),
                Source = _source
            })
        };

        return _publisher.SendMessage(_logTopic, messageToSend);
    }

    private string RemoveSpecialChars(string input)
    {
        
        var output = new StringBuilder(input.Length);
        foreach (var c in input)
        {
            switch (c)
            {
                case TestSlash:
                    output.AppendFormat("{0}{1}", TestBackSlash, TestSlash);
                    break;

                case TestBackSlash:
                    output.AppendFormat("{0}{0}", TestBackSlash);
                    break;

                case TestDblQuote:
                    output.AppendFormat("{0}{1}", TestBackSlash, TestDblQuote);
                    break;

                case '\b':
                    output.Append("\\b");
                    break;

                case '\f':
                    output.Append("\\f");
                    break;

                case '\n':
                    output.Append("\\n");
                    break;

                case '\r':
                    output.Append("\\r");
                    break;

                case '\t':
                    output.Append("\\t");
                    break;

                default:
                    output.Append(c);
                    break;
            }
        }

        return output.ToString();
    }


    public void Dispose(bool disposing)
    {
        if (_disposed)
            return;

        if (disposing)
        {
            _publisher?.Dispose();
            _publisher = null;
            _serializer = null;
        }

        _disposed = true;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
        GC.Collect();
    }
}

public class Publisher: IPublisher
{
private PublisherSocket _pub;
private ISerializer _serializer;
private SingleThreadTaskScheduler _taskScheduler;
private bool _disposed;

    public Publisher(string connectionString , SingleThreadTaskScheduler scheduler=null)
    {
        _pub = new PublisherSocket(connectionString);
        _pub.Options.SendHighWatermark = 1000;
        _pub.Options.ReceiveBuffer = 44000;
        _pub.Options.SendBuffer = 44000;
        _pub.Options.ReconnectInterval = TimeSpan.FromMilliseconds(50);
        _pub.Options.Linger = TimeSpan.FromSeconds(5);
        _serializer = new Serialize.Json.JsonObjectSerializer();
        _taskScheduler = scheduler ?? new SingleThreadTaskScheduler();
    }

    public void Dispose(bool disposing)
    {
        if (_disposed)
            return;

        if (disposing)
        {
            _pub?.Close();
            _pub?.Dispose();
            _pub = null;
            _serializer = null;
            _taskScheduler.StopSchedulerThread();
            _taskScheduler.Dispose();
            _taskScheduler = null;
        }

        _disposed = true;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
        GC.Collect();
    }

    public Task SendMessage(string topicName, MessageBase message)
    {
        return Task.Factory.StartNew(() =>
        {
            var data = _serializer.Serialize(message);
            var messToSend = new NetMQMessage();
            messToSend.Append(topicName);
            messToSend.Append(data);
            _pub.SendMultipartMessage(messToSend);
        }, CancellationToken.None, TaskCreationOptions.None, _taskScheduler);
    }
}

Message rate is not so high <20 messages/s, usually a lot of short messages and sometimes a long ones. It is looking like message buffer is allocated in correct size, but not all characters are received, why?

any ideas?

Question: Paranoid Pirate Pattern auto re-connect for workers and clients on queue restart

This question is related to the Paranoid Pirate Pattern sample project:

Is it by intention that the Paranoid Pirate Pattern is not supposed to recover from a restart of the Queue component or is this an error of the sample implementation (or even NetMQ core) which needs to be fixed?

Steps to reproduce:

  1. start queue
  2. start a worker
  3. start a client
  4. stop queue
  5. wait for a timeout message of worker and/or client printed to console
  6. restart queue

After these steps messages from clients wont be processed anymore despite all components (queue + worker + client) are up and running again after step 6.

PubSub sample (WeatherUpdate) doesn't use the recommended multi-part technique

Although the sample clearly uses a 'topic' based pub/sub system, the topic (in this case a zipcode) is not sent with the pattern shown in the doco:

"pub.SendMoreFrame("zipcode").SendFrame("some stuff");" where "zipcode" is the topic

ie the SendMoreFrame is not used for the topic.

The publish call includes the zip in the message....not as the topic. I guess it works because the subscriber is programmed to suit....

Regards

Rob

Broker disconnected after certain interval after getting following error

Environment
NetMQ Version: 4.0.0.207
Operating System: Windows 10
.NET Version: .net core 2.1.6
Expected behaviour
broker/worker keep working for hours/days

Actual behavior
Got following error in logs

System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
at MajordomoProtocol.MDPBroker.ProcessReceivedMessage(Object sender, NetMQSocketEventArgs e) in MDPBroker.cs:line 317
at NetMQ.NetMQSocket.InvokeEvents(Object sender, PollEvents events)
at NetMQ.NetMQPoller.RunPoller()
at NetMQ.NetMQPoller.Run(SynchronizationContext syncContext)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot)

Steps to reproduce the behavior
Setup worker/broker in above mentioned environment

Majordomo: Timeout, Heartbeat, Disconnects

Can someone give me a hint on how to set
Broker / Worker Timeouts to get a stable Broker-Worker Connection on the
Majordomo Implementation?

I use 10 Seconds Heartbeats for Broker and Worker, both running on the localhost
but get every now and then a disconnect.

First Bug I found is in Broker-Constructor, if I pass a timeout the
m_heartbeatExpiry did not get adjusted accordingly. So I fixed this but still get these
disconnects.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.