|
fn ping_consumer(&self) { |
|
let cloned_token = self.cancellation_token.clone(); |
|
let cloned_client = self.station.memphis_client.clone(); |
|
let cloned_partitions_data = self.partitions_list.clone(); |
|
let cloned_station = self.station.clone(); |
|
|
|
let consumer_name = self.get_name(); |
|
let durable_name = self.get_internal_name(); |
|
|
|
tokio::spawn(async move { |
|
async fn ping_stream_consumer( |
|
stream_name: &str, |
|
consumer_name: &str, |
|
client: &MemphisClient, |
|
) -> Result<(), async_nats::Error> { |
|
let stream = client |
|
.get_jetstream_context() |
|
.get_stream(stream_name) |
|
.await?; |
|
|
|
let _consumer = stream.consumer_info(consumer_name).await?; |
|
|
|
Ok(()) |
|
} |
|
while !cloned_token.is_cancelled() { |
|
tokio::time::sleep(Duration::from_secs(30)).await; |
|
match &cloned_partitions_data { |
|
None => { |
|
let res = ping_stream_consumer( |
|
&cloned_station.get_internal_name(None), |
|
&durable_name, |
|
&cloned_client, |
|
) |
|
.await; |
|
if let Err(e) = res { |
|
error!("Error pinging consumer. {}", e); |
|
continue; |
|
} else { |
|
trace!( |
|
"Consumer '{}' on station '{}' is still alive.", |
|
&consumer_name, |
|
&cloned_station.options.station_name |
|
) |
|
} |
|
} |
|
Some(data) => { |
|
for x in data { |
|
let res = ping_stream_consumer( |
|
&cloned_station.get_internal_name(Some(*x)), |
|
&durable_name, |
|
&cloned_client, |
|
) |
|
.await; |
|
if let Err(e) = res { |
|
error!("Error pinging consumer. {}", e); |
|
continue; |
|
} else { |
|
trace!( |
|
"Consumer '{}' on station '{}' is still alive.", |
|
&consumer_name, |
|
&cloned_station.options.station_name |
|
) |
|
} |
|
} |
|
} |
|
}; |
|
} |
|
}); |
|
} |