Giter Club home page Giter Club logo

memphis-rust-community's Introduction

Hi, I'm Tim

  • ๐ŸŒย  German developer ๐Ÿ‡ฉ๐Ÿ‡ช
  • ๐Ÿ“– Studying Data Science
  • ๐Ÿค– Sometimes teaching Computers how to do things.


memphis-rust-community's People

Contributors

turulix avatar zargornet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

memphis-rust-community's Issues

ping_consumer does not properly check for canceled token.

This should use the tokio::select! macro to speed up cancelation, and prevent raceconditions

fn ping_consumer(&self) {
let cloned_token = self.cancellation_token.clone();
let cloned_client = self.station.memphis_client.clone();
let cloned_partitions_data = self.partitions_list.clone();
let cloned_station = self.station.clone();
let consumer_name = self.get_name();
let durable_name = self.get_internal_name();
tokio::spawn(async move {
async fn ping_stream_consumer(
stream_name: &str,
consumer_name: &str,
client: &MemphisClient,
) -> Result<(), async_nats::Error> {
let stream = client
.get_jetstream_context()
.get_stream(stream_name)
.await?;
let _consumer = stream.consumer_info(consumer_name).await?;
Ok(())
}
while !cloned_token.is_cancelled() {
tokio::time::sleep(Duration::from_secs(30)).await;
match &cloned_partitions_data {
None => {
let res = ping_stream_consumer(
&cloned_station.get_internal_name(None),
&durable_name,
&cloned_client,
)
.await;
if let Err(e) = res {
error!("Error pinging consumer. {}", e);
continue;
} else {
trace!(
"Consumer '{}' on station '{}' is still alive.",
&consumer_name,
&cloned_station.options.station_name
)
}
}
Some(data) => {
for x in data {
let res = ping_stream_consumer(
&cloned_station.get_internal_name(Some(*x)),
&durable_name,
&cloned_client,
)
.await;
if let Err(e) = res {
error!("Error pinging consumer. {}", e);
continue;
} else {
trace!(
"Consumer '{}' on station '{}' is still alive.",
&consumer_name,
&cloned_station.options.station_name
)
}
}
}
};
}
});
}

Example of how it should be implemented:

let handle = tokio::spawn(async move {
while !cancellation_token.is_cancelled() {
let msg_handler = consumer
.batch()
.max_messages(options.batch_size)
.expires(Duration::from_millis(options.batch_max_time_to_wait_ms))
.messages()
.await;
let mut batch = match msg_handler {
Ok(batch) => batch,
Err(e) => {
error!("Error while receiving messages from Memphis. {}", e);
continue;
}
};
while let Some(Ok(msg)) = batch.next().await {
trace!(
"Message received from Memphis. (Subject: {}, Sequence: {})",
msg.subject,
match msg.info() {
Ok(info) => info.stream_sequence,
Err(_e) => 0,
}
);
let memphis_message = MemphisMessage::new(
msg,
client.clone(),
options.consumer_group.clone(),
options.max_ack_time_ms,
);
let res = sender.send(MemphisEvent::MessageReceived(memphis_message));
if res.is_err() {
error!(
"Error while sending message to the receiver. {:?}",
res.err()
);
}
}
}
});
tokio::spawn(async move {
tokio::select! {
_ = handle => {
warn!("Consumer '{}' on group '{}' stopped consuming.", &cloned_options.consumer_name, &cloned_options.consumer_group);
},
_ = cancel_token.cancelled() => {
debug!("Consumer '{}' on group '{}' was cancelled.", &cloned_options.consumer_name, &cloned_options.consumer_group);
}
}
});
Ok(())

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.