minghuaw / azeventhubs Goto Github PK
View Code? Open in Web Editor NEWUnofficial Azure Event Hubs SDK over AMQP 1.0 for rust
Unofficial Azure Event Hubs SDK over AMQP 1.0 for rust
I've recently started using this library and after a couple of days of consuming messages from Event Hub I've got hit by the following errors:
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error closing connection during recovering: IllegalState
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error ending CBS session during recovering: IllegalState
[azeventhubs::amqp::amqp_cbs_link] [ERROR] CBS authorization refresh failed: Local error: ExpectImmediateDetach
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error closing connection during recovering: IllegalState
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error ending CBS session during recovering: IllegalState
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote
I've looked into the code that seems to cause this and found these lines:
azeventhubs/src/amqp/amqp_connection_scope.rs
Lines 704 to 717 in 7b3e7f3
azeventhubs/src/amqp/amqp_connection_scope.rs
Lines 667 to 689 in 7b3e7f3
If the connection is closed, should the recover_connection
function still try to close it anyway? There's a similar scenario with the CBS handle a couple of lines below the first snippet. It however doesn't seem like this is causing the error, it's just something weird I've found while investigating the problem.
Do you have any idea why this could be happening? Or is it perhaps something users should be handling manually?
The underlying AMQP 1.0 implementation fe2o3-amqp
is expecting to have a breaking release (minghuaw/fe2o3-amqp#215), which would also include an upgrade of the crate http
to "1.0" (part of the hyper
ecosystem). This could introduce conflict with reqwest
, which is a dependency in azure_core
and is planning to upgrade hyper
to "1.0" (seanmonstar/reqwest#2039).
Due to the potential conflict, fe2o3-amqp
would remain "0.8" versions until upgraded reqwest
is incorporated into azure_core
.
[2023-12-07T00:42:26Z ERROR azeventhubs::amqp::amqp_cbs_link] CBS authorization refresh failed: Local error: IllegalSessionState
[2023-12-07T00:42:50Z INFO event_hub_producer_example] 60 minutes passed
thread 'tokio-runtime-worker' panicked at src\amqp\amqp_cbs_link.rs:322:38:
invalid key
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Error: Error { context: Custom(Custom { kind: Io, error: IllegalSessionState }) }
In dotnet SDK, the link recovery procedure is stopped immediately if the link is stolen
The current constructor methods for EventHubConnection
, EventHubConsumerClient
, and EventHubProducerClient
are in the form of from_xxx
but returns a Result
. This seems not strictly following the convention that from_xxx
methods should not return a result, and it should be changed to be in the form of try_from_xxx
if a result is returned.
EventHubTokenCredential
is not accessible due to the module containing it not being publicly exported from the library.
But it is not reexported in lib.rs
The variants of the enum also should be exported so they can be used by a consumer.
The is_inclusive
field of EventPosition
is always modified to false
in recover_consumer_by_creating_new_consumer()
regardless of whether the value is taken from consumer.current_event_position
or consumer.initial_options
. This value should only be changed to false
if it is taken from the current_event_position
.
The link closed by the remote peer cannot be resumed, and thus a completely new link should be created
In my setup I create a single consumer per partition, which results in N EventHubClient
s. If I use the EventHubClient
constructor then it looks as though I end up with N AmqpClient
s, each with their own connection. I also see that those clients spawn their own tasks.
I see the EventHubConnection
which looks like it results in reusing a single connection. Would I get better performance using a single shared connection across all clients?
Additionally, it looks like EventHubConnection
is generic on the connection. However, I would be using AmqpClient
. It looks like the AmqpClient
being used in this crate is coming from the amqp
subdirectory and is not exposed?
Is there another way I can store an EventHubConnection<AmqpClients>
?
Event Hub will close the producer client if it has been idling for more than 30 mins. The error message is carried in the detach frame, however, when passing the error to the user, the session error is prioritized and this will result in losing the error carried in the detach frame
tokio::task::JoinSet
might be useful
I am not sure how to best debug this, but I have 4 tokio tasks created (One for each of 4 partitions) and I have a cancellation token that is used stop reading from each partition. As part of cleanup, close
is called on the EventStream
. With 4 partitions though, pretty reliably one of the close
calls are hanging and never completing.
Each partition is using an independent EventHubConsumerClient
to start its partition stream.
This is a new feature stabilized in 1.75, and other crates have observed performance improvement by adopting this feature (eg. async-graphql
reported almost 20% performance gain )
Sorry I don't have a self contained repro on this one but I call read_events_from_partition
and pass EventPosition::earliest()
and read a few events. I then save the sequence number to start from and start a new reading stream but this time using EventPosition::from_sequence_number(pos, false)
.
async fn get_start_position(partition_id: &str) -> EventPosition {
// Some logic to determine the start pos
let starting_pos: i64 = get_sequence_number_to_start_from(&partition_id);
match starting_pos {
-1 => EventPosition::earliest(),
pos => EventPosition::from_sequence_number(pos, false),
}
}
// ...
let starting_pos = get_start_position(&checkpoint, &partition_id).await;
// ...
let stream = client
.read_events_from_partition(
&partition_id,
starting_pos,
ReadEventOptions::default()
)
.await;
The behavior I am seeing is that even though I am passing an EventPosition
that is later than the oldest event I am still reading from the oldest event. (The same event as I first received when I starting reading from `EventPosition::earliest()``
Am I misconfiguring something perhaps?
It was observed that creating new producer or consumer client using azure_identity::DefaultCredential
would fail with the following error, but connecting using the connection string still works fine.
Error: Error { context: Custom(Custom { kind: Other, error: Status(StatusError { code: StatusCode(500), description: Some("The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101") }) }) }
The above error could result in an idling connection which could end up with another error
Error { condition: ConnectionError(ConnectionForced), description: Some("The connection was inactive for more than the allowed 60000 milliseconds and is closed by container 'LinkTracker'. TrackingId:, SystemTracker:gateway5, Timestamp:2024-01-08T15:05:08"), info: None }
While this problem was observed when upgrading the dependency azure_core
to "0.19"
, I have experienced the same error with version "0.18"
and the dotnet SDK on my local machine (mac) and an Azure VM. The dotnet SDK seems just hang forever and the message is never sent to the Event Hub.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.