Comments (7)
I've found that asyncio.Lock doing job. I added lock around callbacks which should run in ordered way.
from nats.py.
Protocol wise the message will be delivered with respective to the order in which the messages were published and asyncio will dispatch for the callback that is ready.
In your example the B messages will be delivered before C and A. If the callbacks relinquish control so that other tasks can process messages then you would see that the messages are processed in that order:
import asyncio
import logging
import time
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("nats://demo.nats.io:4222", loop=loop)
async def message_handler_A(msg):
print(f'message_handler_A: {msg}')
await asyncio.sleep(0)
async def message_handler_B(msg):
print(f'message_handler_B: {msg}')
await asyncio.sleep(0)
async def message_handler_C(msg):
print(f'message_handler_C: {msg}')
await asyncio.sleep(0)
await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')
if __name__ == '__main__':
nc = NATS()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
message_handler_B: <Msg: subject='message_handler_B' reply='' data='0...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='0...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='0...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='1...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='1...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='1...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='2...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='2...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='2...'>
from nats.py.
But if we add one more sleep to one of callbacks, situation changes:
import asyncio
import logging
import time
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("nats://nats:4222", loop=loop)
async def message_handler_A(msg):
print(f'message_handler_A')
await asyncio.sleep(0)
async def message_handler_B(msg):
print(f'message_handler_B')
await asyncio.sleep(0)
await asyncio.sleep(0)
async def message_handler_C(msg):
print(f'message_handler_C')
await asyncio.sleep(0)
await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')
if __name__ == '__main__':
nc = NATS()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
then log looks like:
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_B
message_handler_B
message_handler_B
message_handler_B
This is more common situation because callbacks in real case have more returns into loop.
from nats.py.
Right, because when the B handler starts processing the messages it will continue to do so until it the event loop allows other callback to execute (https://github.com/nats-io/asyncio-nats/blob/master/nats/aio/client.py#L529-L549). But internally the messages have been received in the same order as the source, if you relinquishing control as part of the callback using await asyncio.sleep(0)
then you can opt to allow other callbacks to be able to have time to process the messages.
from nats.py.
I found, that socket receives messages in right order, but in real case callbacks comes in random order, and it completely unusable to publish messages with different topics in loop - for example if I have two subjects - create_user and set_user_password - then often second topic comes before first, and trying to setup password to user which not created yet. Btw, for same subject callbacks always work in right order.
from nats.py.
For that type of usage I'd recommend using request/response functionality from the client as it would give you a stronger ordering guarantee and better decoupling with the subscribers, sharing an example below:
Subscribers:
import asyncio
import time
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("demo.nats.io", loop=loop)
async def create_user(msg):
print(f'create_user : {msg}')
await nc.publish(msg.reply, b'user created')
async def set_user_password(msg):
print(f'set_user_password: {msg}')
await nc.publish(msg.reply, b'password set')
await nc.subscribe("create_user", cb=create_user)
await nc.subscribe("set_user_password", cb=set_user_password)
if __name__ == '__main__':
nc = NATS()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Publish:
import asyncio
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("demo.nats.io", loop=loop)
for i in range(10):
response = await nc.request("create_user", f"user:user-{i}".encode())
print(f"Result [{i}]: {response.data.decode()}")
response = await nc.request("set_user_password", f"password:foo".encode())
print(f"Result [{i}]: {response.data.decode()}")
if __name__ == '__main__':
nc = NATS()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Result:
Result [0]: user created
Result [0]: password set
Result [1]: user created
Result [1]: password set
Result [2]: user created
Result [2]: password set
...
from nats.py.
Closing as that is how the client is intended to work right now (similar behavior as in the Go client).
from nats.py.
Related Issues (20)
- KV's `Entry.operation` should have a literal type
- Nats object store image with header data
- Ability to delete a consumer HOT 1
- nats.errors.TimeoutError: nats: timeout HOT 4
- Getting returning True from eof_received() has no effect when using ssl HOT 1
- Add filtering to KV method returning all keys HOT 1
- nats.errors.TimeoutError: nats: timeout
- test and list Python 3.12 and 3.13 compatibility HOT 1
- Impossible to watch forever
- KeyValue.keys method should not raise NoKeysError
- KeyValue.watch inactive_threshold parameter has no effect
- KeyValue.watchall include_history ignored
- Subject should implement AsyncContextManager for auto-unsubscribe HOT 1
- ObjectStore.list() should not raise NotFoundError if no objects found HOT 1
- Usage of mutable value in function signature of connect HOT 1
- Write a `CONTRIBUTING.md` document
- `streams_info` Method Only Returns First Page of Streams
- Currently errors display lower case subjects but should be case sensitive
- Memory leak in ObjectStore.get due to uncancelled tasks
- Client got network error after a while
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from nats.py.