Giter Club home page Giter Club logo

Comments (7)

nxsofsys avatar nxsofsys commented on August 24, 2024 1

I've found that asyncio.Lock doing job. I added lock around callbacks which should run in ordered way.

from nats.py.

wallyqs avatar wallyqs commented on August 24, 2024

Protocol wise the message will be delivered with respective to the order in which the messages were published and asyncio will dispatch for the callback that is ready.

In your example the B messages will be delivered before C and A. If the callbacks relinquish control so that other tasks can process messages then you would see that the messages are processed in that order:

import asyncio
import logging
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://demo.nats.io:4222", loop=loop)

    async def message_handler_A(msg):
        print(f'message_handler_A: {msg}')
        await asyncio.sleep(0)

    async def message_handler_B(msg):
        print(f'message_handler_B: {msg}')
        await asyncio.sleep(0)

    async def message_handler_C(msg):
        print(f'message_handler_C: {msg}')
        await asyncio.sleep(0)

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
message_handler_B: <Msg: subject='message_handler_B' reply='' data='0...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='0...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='0...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='1...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='1...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='1...'>
message_handler_B: <Msg: subject='message_handler_B' reply='' data='2...'>
message_handler_C: <Msg: subject='message_handler_C' reply='' data='2...'>
message_handler_A: <Msg: subject='message_handler_A' reply='' data='2...'>

from nats.py.

nxsofsys avatar nxsofsys commented on August 24, 2024

But if we add one more sleep to one of callbacks, situation changes:

import asyncio
import logging
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("nats://nats:4222", loop=loop)

    async def message_handler_A(msg):
        print(f'message_handler_A')
        await asyncio.sleep(0)

    async def message_handler_B(msg):
        print(f'message_handler_B')
        await asyncio.sleep(0)
        await asyncio.sleep(0)

    async def message_handler_C(msg):
        print(f'message_handler_C')
        await asyncio.sleep(0)

    await nc.subscribe("message_handler_A", cb=message_handler_A)
    await nc.subscribe("message_handler_B", cb=message_handler_B)
    await nc.subscribe("message_handler_C", cb=message_handler_C)
    print('receiving')


if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

then log looks like:

message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_C
message_handler_A
message_handler_C
message_handler_A
message_handler_B
message_handler_B
message_handler_B
message_handler_B
message_handler_B

This is more common situation because callbacks in real case have more returns into loop.

from nats.py.

wallyqs avatar wallyqs commented on August 24, 2024

Right, because when the B handler starts processing the messages it will continue to do so until it the event loop allows other callback to execute (https://github.com/nats-io/asyncio-nats/blob/master/nats/aio/client.py#L529-L549). But internally the messages have been received in the same order as the source, if you relinquishing control as part of the callback using await asyncio.sleep(0) then you can opt to allow other callbacks to be able to have time to process the messages.

from nats.py.

nxsofsys avatar nxsofsys commented on August 24, 2024

I found, that socket receives messages in right order, but in real case callbacks comes in random order, and it completely unusable to publish messages with different topics in loop - for example if I have two subjects - create_user and set_user_password - then often second topic comes before first, and trying to setup password to user which not created yet. Btw, for same subject callbacks always work in right order.

from nats.py.

wallyqs avatar wallyqs commented on August 24, 2024

For that type of usage I'd recommend using request/response functionality from the client as it would give you a stronger ordering guarantee and better decoupling with the subscribers, sharing an example below:

Subscribers:

import asyncio
import time
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("demo.nats.io", loop=loop)

    async def create_user(msg):
        print(f'create_user      : {msg}')
        await nc.publish(msg.reply, b'user created')

    async def set_user_password(msg):
        print(f'set_user_password: {msg}')
        await nc.publish(msg.reply, b'password set')

    await nc.subscribe("create_user", cb=create_user)
    await nc.subscribe("set_user_password", cb=set_user_password)

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Publish:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    await nc.connect("demo.nats.io", loop=loop)

    for i in range(10):
        response = await nc.request("create_user", f"user:user-{i}".encode())
        print(f"Result [{i}]: {response.data.decode()}")
        response = await nc.request("set_user_password", f"password:foo".encode())
        print(f"Result [{i}]: {response.data.decode()}")

if __name__ == '__main__':
    nc = NATS()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Result:

Result [0]: user created
Result [0]: password set
Result [1]: user created
Result [1]: password set
Result [2]: user created
Result [2]: password set
...

from nats.py.

wallyqs avatar wallyqs commented on August 24, 2024

Closing as that is how the client is intended to work right now (similar behavior as in the Go client).

from nats.py.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.