Giter Club home page Giter Club logo

aio_nameko_proxy's Introduction

aio-nameko-proxy

A standalone nameko rpc proxy for asyncio and some wrappers for using nameko rpc proxy with asynchronous web frameworks(Sanic, fastapi).

This project is based on aio-pika and reference the source code of official nameko project and aio-pika.

install

pip install aio-nameko-proxy

examples:

standalone AIOClusterRpcProxy

If you want most of your messages to be persistent(default). Set the delivery mode parameter as DeliveryMode.PERSISTENT, Call sw_dlm_call when you need to send a non-persistent message.

import ssl
import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode

config = {
    "AMQP_URI": "amqp://guest:[email protected]:5672",  # Required, 
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5, 
    "delivery_mode": DeliveryMode.PERSISTENT,
    "serializer": "my_serializer",
    "ACCEPT": ["pickle", "json", "my_serializer"],
    "SERIALIZERS": {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    },
    # If SSL is configured, Remember to change the URI to TLS port. eg: "amqps://guest:[email protected]:5671"
    "AMQP_SSL": {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
}

async def run():

    async with AIOClusterRpcProxy(config) as rpc:
            # time_out: the time_out of waitting the remote method result.
            # con_time_out: the time_out of connecting to the rabbitmq server or binding the queue, consume and so on.

            # persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
            result = await reply_obj.result()
    
            # non-persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
            result = await reply_obj.result()


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

If you want most of your messages to be non-persistent(persistent is default). Set the delivery mode parameter as DeliveryMode.NOT_PERSISTENT, Call sw_dlm_call when you need to send a persistent message.

import asyncio
from aio_nameko_proxy import AIOClusterRpcProxy
from aio_pika import DeliveryMode
config = {
    "AMQP_URI": "pyamqp://guest:[email protected]:5672",
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5, 
    "delivery_mode": DeliveryMode.NOT_PERSISTENT
}

async def run():
    async with AIOClusterRpcProxy(config) as rpc:
            # non-persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
            result = await reply_obj.result()
    
            # persistent msg call
            result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")
    
            reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
            result = await reply_obj.result()


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

AIOPooledClusterRpcProxy

import asyncio
from aio_nameko_proxy import AIOPooledClusterRpcProxy
from aio_pika import DeliveryMode

config = {
    "AMQP_URI": "pyamqp://guest:[email protected]:5672",
    "rpc_exchange": "nameko-rpc",
    "time_out": 30, 
    "con_time_out": 5,
    "pool_size": 10,
    "initial_size": 2,
    "delivery_mode": DeliveryMode.NOT_PERSISTENT
}


async def run():

    async with AIOPooledClusterRpcProxy(config) as proxy_pool:
    
            async with proxy_pool.acquire() as rpc:
                result = await rpc.rpc_demo_service.normal_rpc("demo")


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

Sanic Wrapper

import ssl
from sanic import Sanic
from sanic.response import json
from aio_pika import DeliveryMode
from aio_nameko_proxy.wrappers import SanicNamekoClusterRpcProxy

class Config(object):
    # AMQP_URI: Required
    NAMEKO_AMQP_URI = "pyamqp://guest:[email protected]:5672"
    # rpc_exchange
    NAMEKO_RPC_EXCHANGE = "nameko-rpc"
    # pool_size
    NAMEKO_POOL_SIZE = 60
    # initial_size
    NAMEKO_INITIAL_SIZE = 60
    # time_out
    NAMEKO_TIME_OUT = 30
    # con_time_out
    NAMEKO_CON_TIME_OUT = 5
    # serializer
    NAMEKO_SERIALIZER = "json"
    # ACCEPT
    NAMEKO_ACCEPT = ["pickle", "json"]
    # SERIALIZERS: custom serializers
    NAMEKO_SERIALIZERS = {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    }
    # AMQP_SSL: ssl configs
    NAMEKO_AMQP_SSL = {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
    # delivery_mode
    NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
    # other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
    # ...


app = Sanic("App Name")
app.config.from_object(Config)

# rpc_cluster = SanicNamekoClusterRpcProxy(app)

# or

# from aio_nameko_proxy.wrappers import rpc_cluster  # contextvars required in py36
# SanicNamekoClusterRpcProxy(app)

# or
rpc_cluster = SanicNamekoClusterRpcProxy()
rpc_cluster.init_app(app)


@app.route("/")
async def test(request):
    
    rpc = await rpc_cluster.get_proxy()

    result = await rpc.rpc_demo_service.normal_rpc("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
    result = await reply_obj.result()

    result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
    result = await reply_obj.result()

    return json({"hello": "world"})


@app.websocket('/ws')
async def ws(request, ws):
    
    rpc = await rpc_cluster.get_proxy()
    
    for i in range(3):
        _ = await ws.recv()
        result = await rpc.rpc_demo_service.normal_rpc("demo")
        await ws.send(result)
    ws.close()
    
    # in websocket handlers, you should call the remove actively in the end
    rpc_cluster.remove()



if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

FastAPI Wrapper

import ssl
from fastapi import FastAPI, WebSocket
from aio_pika import DeliveryMode
from pydantic import BaseSettings

from aio_nameko_proxy.wrappers import FastApiNamekoProxyMiddleware, rpc_cluster  # contextvars required in py36



class Settings(BaseSettings):

    # AMQP_URI: Required
    NAMEKO_AMQP_URI = "pyamqp://guest:[email protected]:5672"
    # rpc_exchange
    NAMEKO_RPC_EXCHANGE = "nameko-rpc"
    # pool_size
    NAMEKO_POOL_SIZE = 60
    # initial_size
    NAMEKO_INITIAL_SIZE = 60
    # time_out
    NAMEKO_TIME_OUT = 30
    # con_time_out
    NAMEKO_CON_TIME_OUT = 5
    # serializer
    NAMEKO_SERIALIZER = "json"
    # ACCEPT
    NAMEKO_ACCEPT = ["pickle", "json"]
    # SERIALIZERS: custom serializers
    NAMEKO_SERIALIZERS = {
        "my_serializer": {
            "encoder": "my_slizer.dumps",
            "decoder": "my_slizer.loads",
            "content_type": "my-content-type",
            "content_encoding": "utf-8"
        }
    }
    # AMQP_SSL: ssl configs
    NAMEKO_AMQP_SSL = {
        'ca_certs': 'certs/ca_certificate.pem',  # or 'cafile': 'certs/ca_certificate.pem',
        'certfile': 'certs/client_certificate.pem',
        'keyfile': 'certs/client_key.pem',
        'cert_reqs': ssl.CERT_REQUIRED
    }
    # delivery_mode
    NAMEKO_DELIVERY_MODE = DeliveryMode.PERSISTENT
    # other supported properties of aio-pika.Message, the key name format is "NAMEKO_{}".format(property_name.upper())
    # ...

settings = Settings()

app = FastAPI()

app.add_middleware(FastApiNamekoProxyMiddleware, config=settings)

@app.get("/")
async def test():
    
    rpc = await rpc_cluster.get_proxy()

    result = await rpc.rpc_demo_service.normal_rpc("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.call_async("demo")
    result = await reply_obj.result()

    result = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call("demo")

    reply_obj = await rpc.rpc_demo_service.normal_rpc.sw_dlm_call_async("demo")
    result = await reply_obj.result()

    return {"hello": "world"}


@app.websocket("/ws")
async def ws(ws: WebSocket):
    await ws.accept()
    rpc = await rpc_cluster.get_proxy()
        
    for i in range(3):
        _ = await ws.receive()
        result = await rpc.rpc_demo_service.normal_rpc("demo")
        await ws.send(result)
    ws.close()
    
    # in websocket handlers, you should call the remove() actively in the end
    rpc_cluster.remove()

aio_nameko_proxy's People

Contributors

laiyongtao avatar liug-lynx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

aio_nameko_proxy's Issues

What will be new?

I would love to contribute to this Repo, but I don't know where to start. Can you give me some help?thank you

fastapi not working

Hi, I'm trying to use this project with fastapi following the example provided but no matter what I try, I always get

RuntimeError: Please initialize your cluster before using

could you please point me in the right direction to fix this issue?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.