Hi all,
i would like to ask for help.
sometimes i got message which is incomplete, zeroes on the end of the buffer.
Example:
System.FormatException: Deserialization of :[eyJDb3JyZWxhdGlvbklkIjoiMzM1MWNjNTktMzBmNi00NTk1LWFjYmQtMGRmYTZlMjIyYTRlIiwiRXhwaXJhdGlvblRpbWUiOm51bGwsIlNvdXJjZUFkZHJlc3MiOm51bGwsIlNlbnRUaW1lIjoiMjAxOS0xMS0wN1QxNTowOTowNC43NjQ4MzkzKzAxOjAwIiwiUGF5bG9hZFR5cGUiOiJLaXN0bGVyLkNkdC5Db250cmFjdHMuTG9nZ2VyLk1vZGVscy5JTG9nTWVzc2FnZSIsIlBheWxvYWQiOiJleUpNWlhabGJDSTZNaXdpVTI5MWNtTmxJam9pUzJsemRHeGxjaTVEWkhRdVdHMXNVbkJqVFdWemMyRm5aVUZrWVhCMFpYSk5hV1JrYkdWM1lYSmxMazFwWkdSc1pYZGhjbVV1VW1WeGRXVnpkRkpsYzNCdmJuTmxURzluWjJsdVowMXBaR1JzWlhkaGNtVWlMQ0pOWlhOellXZGxJam9pU0hSMGNDQlNaWE53YjI1elpTQkpibVp2Y20xaGRHbHZianBjWEhKY1hHNVRZMmhsYldFNmFIUjBjQ0JJYjNOME9pQXhNamN1TUM0d0xqRTZPREkzTlNCUVlYUm9PaUJjWEM5U1VFTXlJRkYxWlhKNVUzUnlhVzVuT2lBZ1VtVnpjRzl1YzJVZ1FtOWtlVG9nUEQ5NGJXd2dkbVZ5YzJsdmJqMWNYRndpTVM0d1hGeGNJaUJsYm1OdlpHbHVaejFjWEZ3aWRYUm1MVGhjWEZ3aVB6NWNYSEpjWEc0OGJXVjBhRzlrVW1WemNHOXVjMlUrWEZ4eVhGeHVJQ0E4Y0dGeVlXMXpQbHhjY2x4Y2JpQWdJQ0E4Y0dGeVlXMCtYRnh5WEZ4dUlDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ1BHRnljbUY1UGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0E4WkdGMFlUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBuTjVjM1JsYlM1c2FYTjBUV1YwYUc5a2N6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytjM2x6ZEdWdExtMWxkR2h2WkZOcFoyNWhkSFZ5WlR4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK2MzbHpkR1Z0TG0xbGRHaHZaRWhsYkhBOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtZGxkRjlyWlhsM2IzSmtYMjVoYldWelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXlkVzVmYTJWNWQyOXlaRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWjJWMFgydGxlWGR2Y21SZllYSm5kVzFsYm5SelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NW5aWFJmYTJWNWQyOXlaRjlrYjJOMWJXVnVkR0YwYVc5dVBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXpaWEoyYVdObExrZGxkRXhwYzNSUFpreHZZV1JsWkVWeGRXbHdiV1Z1ZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1kyRnNMbE5sZEUxdmJtbDBiM0pOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWpZV3d1UjJWMFRXOXVhWFJ2Y2tacFpXeGtjenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWTJGc0xsTmxkRU52Ym5ScGJuVnZkWE5QZFhSd2RYUThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbU5oYkM1VFpYUlFkV3h6WldSUGRYUndkWFE4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1OaGJDNVRaWFJUYVc1MWMyOXBaR0ZzVDNWMGNIVjBQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOXVQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOW1aanhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrZGxibVZ5WVhSbFJHTldiMngwWVdkbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtZWEV1VFdWaGMzVnlaVlp2YkhSaFoyVkVZenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrMWxZWE4xY21WTmFXNU5ZWGc4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNU5aV0Z6ZFhKbFZtOXNkR0ZuWlZCbFlXdFFaV0ZyUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1a1lYRXVVbVZ6WlhSRVpYWnBZMlU4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNVNaV0ZrUkdsbmFYUmhiRWx1Y0hWMFVHOXlkRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExsTmxkRVJwWjJsMFlXeFBkWFJ3ZFhSTWFXNWxVM1JoZEdVOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtUmhjUzVIWlhSVGFXZHVZV3hUWVcxd2JHVnpQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVrYlcwdVVtVmhaRlpoYkhWbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtiVzB1VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytaRzF0TGxObGRFbHVjSFYwUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1bVp5NVRkMmwwWTJoRGFHRnVibVZzVDI0OFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtWm5MbE4zYVhSamFFTm9ZVzV1Wld4UFptWThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbVpuTGxObGRFOTFkSEIxZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1ptY3VVMlYwUVhKaWFYUnlZWEo1VTJsbmJtRnNQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVtWnk1TWIyRmtRWEppYVhSeVlYSjVVMmxuYm1Gc1BGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytiM05qTGxOMGIzQThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbTl6WXk1VFpYUlNkVzVOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VjJGcGRGUnBiR3hCWTNGMWFYTnBkR2x2YmtOdmJYQnNaWFJsUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1dmMyTXVWMkZwZEZWdWRHbHNWSEpwWjJkbGNrRnliV1ZrUEZ4Y0wzTjBjbWx1Wn ] failed ---> System.FormatException: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
white space on the end of message is null characters.
i have situation when creating subscribers and publishers just when need it, they are wrapped in using.
I am sending a json message which is encoded into base64 string.
my serializer methods:
public string SerializeToString(object _object)
{
return Base64Encode(JsonConvert.SerializeObject(_object));
}
public object Deserialize(byte[] data, Type targetType)
{
try
{
var json = Base64Decode(_encoding.GetString(data));
return JsonConvert.DeserializeObject(json, targetType);
}
catch (Exception e)
{
throw new FormatException($"Deserialization of :[{_encoding.GetString(data)}] failed", e);
}
}
i am using xpub-sub pattern. I have router with logging:
public class PubSubMessageRouter : IPubSubMessageRouter
{
private Proxy _proxy;
private NetMQPoller _poller;
private XSubscriberSocket _xSub;
private XPublisherSocket _xPub;
private PushSocket _controlInPush = new PushSocket();
private PushSocket _controlOutPush = new PushSocket();
private PullSocket _controlInPull = new PullSocket();
private PullSocket _controlOutPull = new PullSocket();
private Task _workerReceived;
private Task _workerSent;
private CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellationToken;
private WeakAction<MessageBase> _subscriberReceived;
private WeakAction<MessageBase> _subscriberSent;
private WeakAction<Exception> _onExceptionHandler;
private ISerializer _serializer;
private bool _disposed;
public string FrontendConnectionString { get; }
public string BackendConnectionString { get; }
public PubSubMessageRouter(
string backendConnectionString,
string frontendConnectionString,
string controlInConnectionName = "controlIn",
string controlOutConnectionName = "controlOut")
{
BackendConnectionString = backendConnectionString;
FrontendConnectionString = frontendConnectionString;
_serializer = new Serialize.Json.JsonObjectSerializer();
_xSub = new XSubscriberSocket($"@{FrontendConnectionString}");
_xPub = new XPublisherSocket($"@{BackendConnectionString}");
_xSub.Options.ReceiveHighWatermark = 10000;
_xSub.Options.ReceiveBuffer = 44000;
_xSub.Options.SendBuffer = 44000;
_xSub.Options.Linger = TimeSpan.FromSeconds(5);
_xPub.Options.ReceiveHighWatermark = 10000;
_xPub.Options.ReceiveBuffer = 44000;
_xPub.Options.SendBuffer = 44000;
_xPub.Options.Linger = TimeSpan.FromSeconds(5);
_controlInPush.Bind($"inproc://{controlInConnectionName}");
_controlInPull.Connect($"inproc://{controlInConnectionName}");
_controlOutPush.Bind($"inproc://{controlOutConnectionName}");
_controlOutPull.Connect($"inproc://{controlOutConnectionName}");
_poller = new NetMQPoller
{
_xSub,
_xPub
};
// proxy messages between frontend / backend
_proxy = new Proxy(
_xSub,
_xPub,
poller: _poller,
controlIn: _controlInPush,
controlOut: _controlOutPush
);
}
public PubSubMessageRouter(
string backendConnectionString,
string frontendConnectionString,
Action<MessageBase> handlerReceived,
Action<MessageBase> handlerSent,
Action<Exception> onException=null) : this(backendConnectionString, frontendConnectionString)
{
_subscriberReceived = new WeakAction<MessageBase>(handlerReceived);
_subscriberSent = new WeakAction<MessageBase>(handlerSent);
if (onException != null)
{
_onExceptionHandler = new WeakAction<Exception>(onException);
}
}
private async void MessagePumpReceived(CancellationToken ct)
{
while (true)
{
try
{
if (_cancellationToken.IsCancellationRequested)
break;
var mqMessage = new NetMQMessage();
if (_controlInPull.TryReceiveMultipartMessage(ref mqMessage))
{
if (mqMessage.FrameCount == 1) //subscribe notification
{
continue;
}
var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));
if (_subscriberReceived?.Target != null)
{
_subscriberReceived.Execute(message);
}
}
else
{
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
}
}
catch (Exception ex)
{
OnExceptionHandler(ex);
}
}
}
private async void MessagePumpSent(CancellationToken ct)
{
while (true)
{
try
{
if (_cancellationToken.IsCancellationRequested)
break;
var mqMessage = new NetMQMessage();
if (_controlOutPull.TryReceiveMultipartMessage(ref mqMessage))
{
if (mqMessage.FrameCount == 1) //subscribe notification
{
continue;
}
var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));
if (_subscriberSent?.Target != null)
{
_subscriberSent.Execute(message);
}
}
else
{
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
}
}
catch (Exception ex)
{
OnExceptionHandler(ex);
}
}
}
private void OnExceptionHandler(Exception obj)
{
if(_onExceptionHandler != null && _onExceptionHandler.IsAlive)
{
_onExceptionHandler.Execute(obj);
}
}
public void StartRouting()
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
_proxy.Start();
_poller.RunAsync();
_workerReceived = Task.Run(
() => MessagePumpReceived(_cancellationToken),
_cancellationToken);
_workerSent = Task.Run(
() => MessagePumpSent(_cancellationToken),
_cancellationToken);
}
public async Task StopRouting()
{
_proxy.Stop();
_poller.Stop();
_cancellationTokenSource?.Cancel();
await Task.WhenAny(Task.WhenAll(_workerSent, _workerReceived),
Task.Delay(5000)); //wait when all thread finished, but max 5 seconds
}
public async void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
await StopRouting();
_poller?.Dispose();
_poller = null;
_proxy = null;
_xSub?.Dispose();
_xSub = null;
_xPub?.Dispose();
_xPub = null;
_controlInPush?.Dispose();
_controlInPush = null;
_controlInPull?.Dispose();
_controlInPull = null;
_controlOutPush?.Dispose();
_controlOutPush = null;
_controlOutPull?.Dispose();
_controlOutPull = null;
_workerSent = null;
_workerReceived = null;
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
_subscriberReceived.MarkForDeletion();
_subscriberReceived = null;
_subscriberSent.MarkForDeletion();
_subscriberSent = null;
_onExceptionHandler.MarkForDeletion();
_onExceptionHandler = null;
_serializer = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
}
I have ansyc message handlers which are fired by service api. Problem usually happens when i am sending 2 long messages one following other:
public async Task Handle(MessageBase message, IApiContext context)
{
using (var logger = new MessageLogger(typeof(RequestSuiteMetaDataHandler).FullName, context.ApplicationSettings.FrontendConnection, context.LogTopic))
using (var adapterManager = new AdaptersManager.AdaptersManager(new WindowsRegistrySettingsProvider()))
{
var response = new RequestSuiteMetaDataResponse();
try
{
var request = new JsonObjectSerializer().Deserialize<RequestSuiteMetaData>(message.Payload);
//some code skipped
response.Succeed = true;
response.SuiteName = queryResponse.SuiteName;
response.TestPhases = queryResponse.TestPhases;// long content around 8 kB
catch (Exception exp)
{
await logger.LogMessage(LogLevel.Error, exp.GetAllMessages());
response.Error = exp.GetAllMessages();
response.Succeed = false;
}
var payloadString = new JsonObjectSerializer().SerializeToString(response);
var messageToSend = new MessageBase
{
CorrelationId = message.CorrelationId,
PayloadType = typeof(IRequestSuiteMetaDataResponse).FullName,
SentTime = DateTime.Now,
Payload = payloadString
};
// problem source:
await logger.LogMessage(LogLevel.Debug,
$"Sending message: {Base64Decode(messageToSend.Payload)} of type: {messageToSend.PayloadType}");
Thread.Sleep(50);
await context.ResponsePublisher.SendMessage(context.ResponseTopic, messageToSend);
}
}
i have central async logger, sending log overzero mq.
public class MessageLogger: ILogger
{
const char TestBackSlash = '\';
const char TestSlash = '/';
const char TestDblQuote = '"';
private readonly string _source;
private readonly string _logTopic;
private ISerializer _serializer;
private IPublisher _publisher;
private bool _disposed;
public CdtMessageLogger(string source,
string frontendConnectionString,
string logTopic = TopicsList.AllLogs,
ISerializer serializer = null,
SingleThreadTaskScheduler scheduler = null)
{
_source = source;
_logTopic = logTopic;
_serializer = serializer ?? new Serialize.Json.JsonObjectSerializer();
_publisher = new Publisher($">{frontendConnectionString}", scheduler);
}
public Task LogMessage(LogLevel level, string message)
{
var messageToSend = new MessageBase
{
CorrelationId = Guid.NewGuid(),
PayloadType = typeof(ILogMessage).FullName,
SentTime = DateTime.Now,
Payload = _serializer.SerializeToString(new LogMessage
{
Level = level,
Message = RemoveSpecialChars(message),
Source = _source
})
};
return _publisher.SendMessage(_logTopic, messageToSend);
}
private string RemoveSpecialChars(string input)
{
var output = new StringBuilder(input.Length);
foreach (var c in input)
{
switch (c)
{
case TestSlash:
output.AppendFormat("{0}{1}", TestBackSlash, TestSlash);
break;
case TestBackSlash:
output.AppendFormat("{0}{0}", TestBackSlash);
break;
case TestDblQuote:
output.AppendFormat("{0}{1}", TestBackSlash, TestDblQuote);
break;
case '\b':
output.Append("\\b");
break;
case '\f':
output.Append("\\f");
break;
case '\n':
output.Append("\\n");
break;
case '\r':
output.Append("\\r");
break;
case '\t':
output.Append("\\t");
break;
default:
output.Append(c);
break;
}
}
return output.ToString();
}
public void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
_publisher?.Dispose();
_publisher = null;
_serializer = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
}
public class Publisher: IPublisher
{
private PublisherSocket _pub;
private ISerializer _serializer;
private SingleThreadTaskScheduler _taskScheduler;
private bool _disposed;
public Publisher(string connectionString , SingleThreadTaskScheduler scheduler=null)
{
_pub = new PublisherSocket(connectionString);
_pub.Options.SendHighWatermark = 1000;
_pub.Options.ReceiveBuffer = 44000;
_pub.Options.SendBuffer = 44000;
_pub.Options.ReconnectInterval = TimeSpan.FromMilliseconds(50);
_pub.Options.Linger = TimeSpan.FromSeconds(5);
_serializer = new Serialize.Json.JsonObjectSerializer();
_taskScheduler = scheduler ?? new SingleThreadTaskScheduler();
}
public void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
_pub?.Close();
_pub?.Dispose();
_pub = null;
_serializer = null;
_taskScheduler.StopSchedulerThread();
_taskScheduler.Dispose();
_taskScheduler = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
public Task SendMessage(string topicName, MessageBase message)
{
return Task.Factory.StartNew(() =>
{
var data = _serializer.Serialize(message);
var messToSend = new NetMQMessage();
messToSend.Append(topicName);
messToSend.Append(data);
_pub.SendMultipartMessage(messToSend);
}, CancellationToken.None, TaskCreationOptions.None, _taskScheduler);
}
}
Message rate is not so high <20 messages/s, usually a lot of short messages and sometimes a long ones. It is looking like message buffer is allocated in correct size, but not all characters are received, why?
any ideas?