Giter Club home page Giter Club logo

azeventhubs's People

Contributors

minghuaw avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar

azeventhubs's Issues

Error while recovering connection

I've recently started using this library and after a couple of days of consuming messages from Event Hub I've got hit by the following errors:

[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error closing connection during recovering: IllegalState
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error ending CBS session during recovering: IllegalState
[azeventhubs::amqp::amqp_cbs_link] [ERROR] CBS authorization refresh failed: Local error: ExpectImmediateDetach
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error closing connection during recovering: IllegalState
[azeventhubs::amqp::amqp_connection_scope] [ERROR] Error ending CBS session during recovering: IllegalState
[message_processor] [ERROR] Link closed by remote
[message_processor] [ERROR] Link closed by remote

I've looked into the code that seems to cause this and found these lines:

// Recover connection if it is closed
if self.connection.is_closed().await {
match &mut self.connection {
Sharable::Owned(connection) => {
recover_connection(
connection,
&self.service_endpoint,
&self.connection_endpoint,
self.transport,
&self.id,
self.connection_idle_timeout,
)
.await?
}

async fn recover_connection(
connection: &mut AmqpConnection,
service_endpoint: &Url,
connection_endpoint: &Url,
transport_type: EventHubsTransportType,
id: &str,
idle_timeout: StdDuration,
) -> Result<(), AmqpConnectionScopeError> {
if let Err(err) = connection.handle.close().await {
log::error!("Error closing connection during recovering: {:?}", err);
}
let connection_handle = AmqpConnectionScope::open_connection(
service_endpoint,
connection_endpoint,
transport_type,
id,
idle_timeout,
)
.await?;
*connection = AmqpConnection::new(connection_handle);
Ok(())
}

If the connection is closed, should the recover_connection function still try to close it anyway? There's a similar scenario with the CBS handle a couple of lines below the first snippet. It however doesn't seem like this is causing the error, it's just something weird I've found while investigating the problem.

Do you have any idea why this could be happening? Or is it perhaps something users should be handling manually?

Upcoming breaking update in dependencies `fe2o3-amqp`

The underlying AMQP 1.0 implementation fe2o3-amqp is expecting to have a breaking release (minghuaw/fe2o3-amqp#215), which would also include an upgrade of the crate http to "1.0" (part of the hyper ecosystem). This could introduce conflict with reqwest, which is a dependency in azure_core and is planning to upgrade hyper to "1.0" (seanmonstar/reqwest#2039).

Due to the potential conflict, fe2o3-amqp would remain "0.8" versions until upgraded reqwest is incorporated into azure_core.

Producer client panics upon retrying CBS auth after idling

[2023-12-07T00:42:26Z ERROR azeventhubs::amqp::amqp_cbs_link] CBS authorization refresh failed: Local error: IllegalSessionState
[2023-12-07T00:42:50Z INFO  event_hub_producer_example] 60 minutes passed
thread 'tokio-runtime-worker' panicked at src\amqp\amqp_cbs_link.rs:322:38:
invalid key
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Error: Error { context: Custom(Custom { kind: Io, error: IllegalSessionState }) }

Rename `azeventhubs` constructor methods to reflect that a `Result` is returned

The current constructor methods for EventHubConnection, EventHubConsumerClient, and EventHubProducerClient are in the form of from_xxx but returns a Result. This seems not strictly following the convention that from_xxx methods should not return a result, and it should be changed to be in the form of try_from_xxx if a result is returned.

Storing and reusing a single `EventHubConnection`

In my setup I create a single consumer per partition, which results in N EventHubClients. If I use the EventHubClient constructor then it looks as though I end up with N AmqpClients, each with their own connection. I also see that those clients spawn their own tasks.

I see the EventHubConnection which looks like it results in reusing a single connection. Would I get better performance using a single shared connection across all clients?

Additionally, it looks like EventHubConnection is generic on the connection. However, I would be using AmqpClient. It looks like the AmqpClient being used in this crate is coming from the amqp subdirectory and is not exposed?

Is there another way I can store an EventHubConnection<AmqpClients>?

Improve error reporting for idling client

Event Hub will close the producer client if it has been idling for more than 30 mins. The error message is carried in the detach frame, however, when passing the error to the user, the session error is prioritized and this will result in losing the error carried in the detach frame

Hanging on close EventStream

I am not sure how to best debug this, but I have 4 tokio tasks created (One for each of 4 partitions) and I have a cancellation token that is used stop reading from each partition. As part of cleanup, close is called on the EventStream. With 4 partitions though, pretty reliably one of the close calls are hanging and never completing.

Each partition is using an independent EventHubConsumerClient to start its partition stream.

`EventPosition::SequenceNumber` seems to still start from the beginning

Sorry I don't have a self contained repro on this one but I call read_events_from_partition and pass EventPosition::earliest() and read a few events. I then save the sequence number to start from and start a new reading stream but this time using EventPosition::from_sequence_number(pos, false).

async fn get_start_position(partition_id: &str) -> EventPosition {
    // Some logic to determine the start pos
    let starting_pos: i64 = get_sequence_number_to_start_from(&partition_id);
    match starting_pos {
        -1 => EventPosition::earliest(),
        pos => EventPosition::from_sequence_number(pos, false),
    }
}

// ...


let starting_pos = get_start_position(&checkpoint, &partition_id).await;
// ...
let stream = client
                .read_events_from_partition(
                    &partition_id,
                    starting_pos,
                    ReadEventOptions::default()
                )
                .await;

The behavior I am seeing is that even though I am passing an EventPosition that is later than the oldest event I am still reading from the oldest event. (The same event as I first received when I starting reading from `EventPosition::earliest()``

Am I misconfiguring something perhaps?

Cannot create either producer or consumer client with azure_identity `DefaultCredential`

It was observed that creating new producer or consumer client using azure_identity::DefaultCredential would fail with the following error, but connecting using the connection string still works fine.

Error: Error { context: Custom(Custom { kind: Other, error: Status(StatusError { code: StatusCode(500), description: Some("The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101") }) }) }

The above error could result in an idling connection which could end up with another error

Error { condition: ConnectionError(ConnectionForced), description: Some("The connection was inactive for more than the allowed 60000 milliseconds and is closed by container 'LinkTracker'. TrackingId:, SystemTracker:gateway5, Timestamp:2024-01-08T15:05:08"), info: None }

While this problem was observed when upgrading the dependency azure_core to "0.19", I have experienced the same error with version "0.18" and the dotnet SDK on my local machine (mac) and an Azure VM. The dotnet SDK seems just hang forever and the message is never sent to the Event Hub.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.