Comments (19)
@atik7 https://github.com/async-graphql/examples/tree/c8219078a4b7aa6d84d22e9b79f033088897be4b/poem/subscription-redis
from async-graphql.
The problem has been solved.π
from async-graphql.
Okay, I will provide it later today. @atik7
from async-graphql.
Global stream are easy to implement, and I'll add a component tomorrow to do just that.
Creating the stream should be an asynchronous function, so I'm going to do that first.
from async-graphql.
Are you Chinese?
from async-graphql.
Are you Chinese?
No German π
Why?
from async-graphql.
I thought we were in the same time zone.π
I added SimpeBroker
and updated the example.
SimpleBroker
should be used for standalone testing, and later I will provide support for Kafka
, Redis
, etc.
from async-graphql.
Yeah thought so too. π Are you from china?
from async-graphql.
Yes, I'm Chinese.π
from async-graphql.
SimpleBroker panics at called `Option::unwrap()` on a `None` value
if used with more than one type. Is this intended? If so this should be reflected in the docs/example.
from async-graphql.
I didn't have a test because I was going to sleep. That should be fine.
from async-graphql.
Can you provide a βRedis-basedβ (https://github.com/mitsuhiko/redis-rs) example of global stream for subscription?
from async-graphql.
I would be interested in an redis-rs example as well.
from async-graphql.
May be helpful...
publishing
let message = serde_json::to_string(&order)?;
let channel = format!("user_{}_order", driver.user_id);
let _ = caching::cache_publish(ctx, channel, message).await?;
and consuming
async fn logistics_order(
&self,
ctx: &Context<'_>,
channel: String,
) -> impl Stream<Item = Order> {
let context = ctx.data::<MainContext>().unwrap();
let con = context.redis.get_async_connection().await.unwrap();
let mut pubsub = con.into_pubsub();
pubsub.subscribe(channel).await.unwrap();
stream! {
let mut msg_stream = pubsub.on_message();
while let Some(msg) = msg_stream.next().await {
let message: String = msg.get_payload().unwrap();
let order: Order = serde_json::from_str(&message).unwrap();
yield order;
}
}
}
from async-graphql.
Appreciated, actually just finished an similar implementation but using rust channels.
Because the above example creates an redis connection for each subscription. (You cannot share a pubsub channel accross threads)
So what i did was to fanout pubsub events to subscriptions with rust channels
from async-graphql.
Can u provide some code example, it would be greatly helpful for us.
from async-graphql.
...
lazy_static! {
pub static ref EVENT_BUS: tokio::sync::broadcast::Sender<Event> = {
let (tx, _) = tokio::sync::broadcast::channel(10000);
tx
};
}
...
#[Object]
impl Mutation {
async fn emit_event(&self, ctx: &Context<'_>, input: Input) -> Result<Output> {
let mut redis_conn = ctx
.data_unchecked::<redis::aio::MultiplexedConnection>()
.clone();
let _: () = redis_conn
.publish("events", serde_json::to_string(&input)?)
.await?;
Ok(input)
}
}
...
#[Subscription]
impl Subscription {
async fn receive_event(&self, ctx: &Context<'_>) -> impl Stream<Item = Event> {
BroadcastStream::new(EVENT_BUS.subscribe())
.filter(|msg| msg.is_ok())
.map(|msg| msg.unwrap())
}
}
....
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let redis_url = env::var("REDIS_URL").expect("REDIS_URL is not set");
let redis_client = redis::Client::open(redis_url).expect("client");
let mut redis_pubsub = redis_client
.clone()
.get_tokio_connection()
.await?
.into_pubsub();
let fanout_task = async move {
redis_pubsub
.subscribe("events")
.await
.unwrap();
let mut msg_stream = redis_pubsub.into_on_message();
while let Some(msg) = msg_stream.next().await {
let msg: String = msg.get_payload().unwrap();
let event: Event = serde_json::from_str(&msg).unwrap();
let _ = schema::EVENT_BUS.send(event);
}
};
// spawn the fanout task as a background task that will run forever.
tokio::spawn(fanout_task);
...
}
from async-graphql.
@sunli829 Please add a redis example, thank you!
from async-graphql.
@sunli829 thank you :-)
from async-graphql.
Related Issues (20)
- Access the value of "variable" arguments (`Value::Variable`)
- Has anyone does any benchmarks because im getting extremely poor results and I do not know why? HOT 8
- Get mutable referenece to the global data defined in the `Context` or `Schema` HOT 1
- Confusing `unused_mut` warning in `#[Object]` HOT 8
- How to handle both directions of one-to-many relation in federated graph
- Question: How to get server to send ping messages on subscriptions? HOT 2
- Parsing multiple operations in a file HOT 1
- Non nullable variables should allow default values HOT 3
- Object with single skipped field but with ComplexObject HOT 2
- As using proxy type
- Using flatten inside an impl with no other fields causes a compile error
- Subscription with MPSC receiver in context data
- Reduce clippy noise from #[Object] macro HOT 2
- Guard trait lifetime HOT 1
- Using generics with both SimpleObject and InputObject as field in output type fails HOT 1
- Subscription Authentication
- Stack overflow after upgrade to 7.0.2 HOT 5
- Does async-graphql validate responses?
- Error reading data from ExtensionContext after upgrade to 7.0.3 HOT 1
- Create a general error formateer
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from async-graphql.