Giter Club home page Giter Club logo

nameko's Introduction

Nameko

[nah-meh-koh]

A microservices framework for Python that lets service developers concentrate on application logic and encourages testability.

A nameko service is just a class:

# helloworld.py

from nameko.rpc import rpc

class GreetingService:
    name = "greeting_service"

    @rpc
    def hello(self, name):
        return "Hello, {}!".format(name)

You can run it in a shell:

$ nameko run helloworld
starting services: greeting_service
...

And play with it from another:

$ nameko shell
>>> n.rpc.greeting_service.hello(name="ナメコ")
'Hello, ナメコ!'

Features

  • AMQP RPC and Events (pub-sub)
  • HTTP GET, POST & websockets
  • CLI for easy and rapid development
  • Utilities for unit and integration testing

Getting Started

Support

For help, comments or questions, please go to https://discourse.nameko.io/.

For enterprise

Available as part of the Tidelift Subscription.

The maintainers of Nameko and thousands of other packages are working with Tidelift to deliver commercial support and maintenance for the open source dependencies you use to build your applications. Save time, reduce risk, and improve code health, while paying the maintainers of the exact dependencies you use. Learn more.

Security contact information

To report a security vulnerability, please use the Tidelift security contact. Tidelift will coordinate the fix and disclosure.

Contribute

  • Fork the repository
  • Raise an issue or make a feature request

License

Apache 2.0. See LICENSE for details.

nameko's People

Contributors

adamchainz avatar andriykohut avatar ayoshi avatar bobh66 avatar bow avatar davidszotten avatar daviskirk avatar edwardgeorge avatar fobiols avatar frexvahi avatar geoffjukes avatar gianchub avatar iky avatar jessepollak avatar junkafarian avatar kollhof avatar kooba avatar mattbennett avatar mitsuhiko avatar oleksii-terekhov avatar radekj avatar s-maj avatar scarchik avatar shirishc avatar skycastlelily avatar sobolevn avatar timbu avatar timgates42 avatar tonybajan avatar tyler46 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nameko's Issues

timer

The Timer claims to

# next time, sleep however long is left of our interval, taking
# off the time we took to run

but we only measure the time of spawn_worker, not the actual entrypoint run

looks like this has been broken a very long time

try e.g.

import time
from nameko.timer import timer

clock = time.time()


class Service(object):
    name = 'timertest'

    @timer(1)
    def tick(self):
        print 'tick', time.time() - clock
        time.sleep(5)

we should either remove the broken "feature", or if we want this despite living without it for this long, fix it (can use a pattern similar to the http entrypoint handler https://github.com/onefinestay/nameko/blob/master/nameko/web/handlers.py#L48)

configurable kombu queues

First of all, nameko is a great library! I want to replace my own creepy "microservice framework" with it.

It would be great if I could configure the rpc queues (rpc.RpcConsumer). I would need the following parameters:

  • auto_delete like reliable_delivery for event_handler
  • queue_arguments as dictionary

What do you think?

cheers,
chris

By default, rabbitmq doesn't allow nameko to connect

At least on Windows (haven't tested elsewhere), rabbitmq's default installation denies guest access to the nameko vhost. This means that the Nameko docs are slightly misleading - there is one configuration step required before anything works :)

As administrator, run rabbitmqctl add_vhost nameko followed by rabbitmqctl set_permissions -p nameko guest ".*" ".*" ".*" in order to address the issue. I'll create a PR against the docs.

async is a keyword in python3.5 coroutines

here's one of the examples from the docs:

with ClusterRpcProxy(config) as cluster_rpc:
    hello_res = cluster_rpc.service_x.remote_method.async("hello")
    world_res = cluster_rpc.service_x.remote_method.async("world")
    # do work while waiting
    hello_res.result()  # "hello-x-y"
    world_res.result()  # "world-x-y"

It will fail inside the coroutine:

async def test(x):
    x.async()

will produce something like this:

    x.async()
          ^
SyntaxError: invalid syntax

Perhaps adding an alias for async() method or smth. like that wouldn't be that hard, I'll take a look.

Broadcast event example fails

When I run the broadcast event example in docs/examples/event_broadcast.py it dies with the following message:

nameko.events.EventHandlerConfigurationError: Broadcast event handlers cannot be configured with reliable delivery.

Timer Decorator and other Functionalities

Let me take a scenario where i want to know the number of scheduled threads running for a given program is there a way to know about that either from the cli or from any core module that the nameko have

In the given example lets take i have a function that i have the decorator timer which the interval is given for 10 secs to run so can the function access that interval variable inside the function

from nameko.timer import timer

@timer(interval=10)
def testing(self,interval):
  print interval

comment on module loading in nameko.cli.run

If I read this correctly, https://github.com/onefinestay/nameko/blob/2.0/nameko/cli/run.py#L53 asks if there is another way to distinguish between not finding a module and finding a module but having an exception during loading of that module.

You could use https://docs.python.org/2/library/imp.html to first find the module using https://docs.python.org/2/library/imp.html#imp.find_module
If found you can then load it using https://docs.python.org/2/library/imp.html#imp.load_module
In Python 3 you probably want to use https://docs.python.org/3/library/importlib.html#module-importlib

exception serialisation bug

from nameko.exceptions import serialize

def test_serialize_args():
    cause = Exception('oops')
    exc = CustomError('something went wrong', cause)

    assert json.dumps(serialize(exc))

fails. serialize probably needs to stringify all args, not just exc

nameko and grequests?

I think there is an issue when using grequests with nameko, a simple import will cause the service to hang up and do not respond (actually the connection to amqp is not even established):

import grequests
from nameko.events import event_handler


class TestService(object):
    name = 'test'

    @event_handler('foo', 'bar')
    def baz(self):
        pass

Otherwise all works fine so I guess it's maybe an issue upstream with gevent vs. amqp or something like that. Anyway, that might be of interest to document that if that's a limitation.

nameko's performance

did anybody take a swing at profiling nameko? Like how much time is consumed outside RabbitMQ operations? etc..

Nova legacy tests rarely fail

Running test/legacy/test_nova.py, the two assert not g assertions in test_send_rpc_errors and test_send_rpc_multi_message_reply_ignores_all_but_last have failed on rare occasions for me. There were no unusual log messages, but the tests were being run on an underpowered instance.

event_handler for multiple services

And again ... nameko is great! :)

Is it possible to register an event_handler for more than just one service? something like:

@event_handler(["service_a", "service_b"], "event")
def bla(self, payload):
    pass

I have a proof of concept:

class MultiServiceEventHandler(EventHandler):
    @classmethod
    def decorator(cls, *args, **kwargs):

        def registering_decorator(fn, args, kwargs):
            if isinstance(args[0], list):
                args = list(args)
                services = args.pop(0)
                for service in services:
                    instance = cls(service, *args, **kwargs)
                    register_entrypoint(fn, instance)
            else:
                instance = cls(*args, **kwargs)
                register_entrypoint(fn, instance)
            return fn

        if len(args) == 1 and isinstance(args[0], types.FunctionType):
            # usage without arguments to the decorator:
            # @foobar
            # def spam():
            #     pass
            return registering_decorator(args[0], args=(), kwargs={})
        else:
            # usage with arguments to the decorator:
            # @foobar('shrub', ...)
            # def spam():
            #     pass
            return partial(registering_decorator, args=args, kwargs=kwargs)

event_handler = MultiServiceEventHandler.decorator

Do you plan something like this or should I stick with my own EventHandler?

cheers,
chris

Make it more obvious the example needs rabbitmq

The example service on the index page encourages you to just try to run it, which fails if you haven't installed RabbitMQ. Make it more obvious you need to follow the installation process first.

This should perhaps get lumped into a larger piece of work to separate out the project page from the docs page.

Exceptions with non-ascii text are not supported by RemoteError

I'm doing a bit of investigation into our handling of exceptions containing non-ascii chars, and it looks like there's a problem with deserializing the error on client side. The client attempts to construct a RemoteError, fails, and so ends up raising a UnicodeEncodeError error instead...

Could be just a simple fix like: #281 ?

How and why to use backdoor? (or how to debug services)

Hi,

I was trying to debug a running service, obviously failed putting ipdb inside services rpc methods, I ended using logging for debug.

But, I read about backdoor and tried to use.

in shell 1 I started the service

nameko run myservice --broker mybroker --backdoor-port 5050

then in another shell

nameko backdor localhost:5050

I was expecting to connect to the running service to be able to inspect (just like an ipdb session?)

but got.

rochacbruno@CATHO:~$  nameko backdoor localhost:5050                                                                                                                
/bin/netcat
Traceback (most recent call last):
  File "/usr/local/bin/nameko", line 11, in <module>
    sys.exit(main())
  File "/usr/local/lib/python2.7/dist-packages/nameko/cli/main.py", line 26, in main
    args.main(args)
  File "/usr/local/lib/python2.7/dist-packages/nameko/cli/backdoor.py", line 39, in main
    if call(cmd) != 0:
  File "/usr/lib/python2.7/subprocess.py", line 522, in call
    return Popen(*popenargs, **kwargs).wait()
  File "/usr/lib/python2.7/subprocess.py", line 710, in __init__
    errread, errwrite)
  File "/usr/lib/python2.7/subprocess.py", line 1327, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory

Maybe it is my misunderstanding about backdoors or on how to use it.

  1. What is a backdoor and what is it for?
  2. How is the way to debug running services? (there is an eager mode? verbose mode? redir stdout?)

Thanks so much

Cannot use custom exceptions in a http web handler

Hi! I would like if we could think of a better solution for this.

https://github.com/onefinestay/nameko/blob/master/nameko/web/handlers.py#L85-L86

        except Exception as exc:
            if (
                isinstance(exc, self.expected_exceptions) or
                isinstance(exc, BadRequest)
            ):
                status_code = 400
            else:
                status_code = 500
            error_dict = serialize(exc)
            payload = u'Error: {exc_type}: {value}\n'.format(**error_dict)

            response = Response(
                payload,
                status=status_code,
            )
        return response

Actually It's always catching all exceptions and returning a 500 error (except BadRequest), but I think the exception handling of those errors shouldn't be there, but controlled by the developer in case He/She wants to catch an expected custom exception.

Because of this design decision I also find It very difficult to debug because I can't get the traceback right away while developing, just the 500, but the full trace is expected in the log IMHO. I know the error is in the response data, but a change in the log format is needed to get the exception trace thrown right away on the command line.

Thank you for Nameko! 👍

add_call_matching needs to except `priority`

If your test is adding multiple routes on the same topic and method we are left to control the order in which they are looked up by the order we declare them in the test.

add_call_matching should accept a priority for clarity which add_routing will sort by.

2.0 event dispatchers should generate headers at dispatch time not at injection time

headers are created and passed to the standalone dispatcher impl.
https://github.com/onefinestay/nameko/blob/2.0/nameko/events.py#L92

If headers change before dispatching an event (think auth, user-id, ...) the current impl. will not behave as one might expect. It does however work using low-level messaging and in nameko<2.0

Conceptually it should probably look more like:

def get_dependency(self, worker_ctx):
    def dispatch(event_type, event_data):
        headers = self.get_message_headers(worker_ctx)
        kwargs = self.kwargs
        dispatcher = event_dispatcher(self.config, headers=headers, **kwargs)
        dispatcher(self.service_name, event_type, event_data)
    return dispatch

docs link to github

we may want a more prominent link to github from the docs (e.g. in sidebar)

what extra config needed for nameko demo?

i start nmeko with the demo like :
$ nameko run helloworld
or
$ nameko shell
both 3.5 and 2.7 run into socket error as below :
is there extra config needed ?

....
File "/Users/xxxxxx/lab/python3fornameko/lib/python3.5/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
OSError: [Errno 61] ECONNREFUSED
....
   return TCPTransport(host, connect_timeout)
  File "D:\Python27\lib\site-packages\amqp\transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: [Errno 10061] WSAECONNREFUSED

Harmonise and document expected_exceptions and sensitive_variables

We do not yet have a section of the docs that explains how expected_exceptions and sensitive_variables should be used. Furthermore, sensitive_variables are not yet supported on the HTTP entrypoint. We should also consider whether there's a better name than expected_exceptions

abstracting out serializer

Hi, first of all - I would like to thank you - I really love the API, it's kind of brilliant.

That noted - I think there is an important functionality lacking - abstraction over serialization protocol.
I see that RPC module has json hardcoded as serialization mechanism.

That presents several problems:

  • it's hard to replace json with, for example ujson which is much faster. ( maybe even messagepack / cap'n'proto, etc )
  • This disallows for serializing types like Decimal, which json does support through the use of custom encoder/decoder (ujson unfortunately doesn't).

Are there plan for abstracting it ? It doesn't seem to be much work, but I only glanced over rpc implementation, so I could be mistaken.

Examples require a nameko vhost in RabbitMQ

The default AMQP URI is amqp://guest:guest@localhost/nameko. The examples in the docs don't work if there's no nameko vhost. We should change the default URI to use the root vhost so they work out of the box.

Possible to support SQS?

nameko looks like a very nice project and it could be very helpful in our application. That said, we use SQS. Is it feasible to support SQS in nameko or does nameko require certain AMQP features? If it's feasible, we'd be happy to take a look at adding the support (or at building a nemako-sqs overlay).

understand the pytest in mock_container

In nameko/testing/pytest.py

@pytest.fixture
def empty_config(request):
    from nameko.constants import AMQP_URI_CONFIG_KEY
    return {
        AMQP_URI_CONFIG_KEY: ""
    }


@pytest.fixture
def mock_container(request, empty_config):
    from mock import Mock
    from nameko.constants import SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER
    from nameko.containers import ServiceContainer

    container = Mock(spec=ServiceContainer)
    container.config = empty_config
    container.config[SERIALIZER_CONFIG_KEY] = DEFAULT_SERIALIZER
    container.serializer = container.config[SERIALIZER_CONFIG_KEY]
    container.accept = [DEFAULT_SERIALIZER]
    return container

empty_config is a fixture. so what the request is?

I found somewhere we use like this:
container.config = {} so the {} is passed as the request?

`get_redacted_args` should be tolerant of entrypoints without `sensitive_variables`

The following should not raise an error. Instead it should behave as if no sensitive_variables were specified.

from nameko.containers import ServiceContainer
from nameko.testing.utils import get_extension
from nameko.utils import get_redacted_args
from nameko.web.handlers import HttpRequestHandler, http


class Service(object):
    name = 'foo'

    @http('GET', '/bar')
    def bar(self, request, arg):
        return 'bar'

container = ServiceContainer(Service, {})
entrypoint = get_extension(container, HttpRequestHandler)

print(get_redacted_args(entrypoint, "arg"))

Custom delivery options

Problem

There doesn't seem to be any way of overriding message delivery options per-service or per-call like there is with Celery. I will focus on our use cases, but I believe that's an important feature for many existing and potential users of Nameko.

Currently Nameko enforces the following while publishing messages through Kombu:

1. Persistency

All messages are persistent - that allows RabbitMQ to be restarted without losing the messages. However, it is also much slower - 10x slower on average as we tested it last time.

In majority of our use cases, performance is far more important than persistency of messages. We're processing millions of events every hour while RabbitMQ died only once in the last year.

2. No expiry

All messages are queuing in RabbitMQ until they're received by a target service.

Again, majority of our use cases depend on microservices talking to each other in real time. If one of the services is down, or badly lagging, then we'd like to drop the messages after a while. If there is no expiry, all the queued up messages that nobody is waiting for results for will have to be processed anyway, before getting to the actual real-time ones.

Example: if "login" call is not processed within 10 seconds, then we would fail the main request. We'd like that task to be expire from the queue if it hasn't been picked up in those 10 seconds.

3. Compression

No compression enabled.

Some task types may require larger amount of data to be sent as parameters, or maybe they will return a large response. In those cases, we'd like to be able to set the appropriate compression method where the transfer benefit is bigger than CPU loss.

Example: returning a significant list of database items

Please note, in my code example I only applied compression to parameters, but I'd like to do same for responses.

Solution

POC Implementation

Please see my initial commit, it's far from ready, but I'm unsure which way would be more appropriate:
astawiarski@e676f11

I am willing to deliver that functionality once we agree that it should be a part of Nameko and how it should be implemented.

POC Usage

Option A: Similar to how it works with Celery - sender can override delivery options for a particular call

$ nameko shell
>>> n.rpc.greeting_service.hello.send(args=[], kwargs={'name': "Matt"}, expire=10, delivery='transient', compression='gzip')

Option B: Allow to configure custom delivery options per-service

$ nameko shell
>>> n.rpc.greeting_service.set_delivery_options(expire=10, delivery='transient', compression='gzip')
>>> n.rpc.greeting_service.hello(name="Matt")
>>> n.rpc.greeting_service.hello.async(name="Matt")

socket.error: [Errno 61] ECONNREFUSED?

When run nameko run helloworld in Mac,has a error:

File "/home/lihang/Buildout/eggs/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: [Errno 61] ECONNREFUSED

Service suspension

Is there any way to suspend service execution at runtime? So it stops to process incoming messages?

Configration and general questions

Hi,

We want to experiment with your framework, and we have a number of questions:

  1. Why is the default max_workers argument set to 10 and not the default GreenPool value (1000)?
    Is there a reason for that? We are planning on using it in a high-concurrency environment where a few dozens to a few hundred of concurrent calls are executed at any given time.
  2. Has the framework been tested for memory leaks?
  3. Is the framework stable/production ready?
  4. How do you monitor and control nameko? (supervisord/monit/etc). Since it does not run as a service, how do you implement start/stop/restart commands?

Thank you!

Configuration management

could we let extensions register argparse options (e.g. via setuptools entrypoints) for configuration when run via nameko run?

could we extend this to a nicer way to set options when hosting via code (i.e. something better than importing FOO_CONFIG_KEY)

Missing ``kill`` behaviour

nameko.web.server.WebServer doesn't implement a kill method, meaning that its socket and greenthread are left lying around.

Furthermore, the set of registered providers is not purged, although that should really be the responsibility of ProviderCollector.kill, rather than leaving that up to the subclasses.

QueueConsumer currently empties the set during its own kill(), which would no longer be required.

Random vhost for testing

We should start using a random vhost for test runs rather than trying to clean up an existing one.

reset_rabbit_connections doesn't work properly because changes in the rabbit management interface aren't reflected immediately, so if the vhost is dirty there's nothing we can do about it.

It would be better to use a random vhost (unless requested otherwise) and still fail the test at the end if there are connections left over. reset_rabbit_connections should be removed.

injection with less magic

The main goal of nameko is to make it easy for developers to write services. A secondary goal is to make it easy for developers to write injection and entry providers to support the first goal.

The current implementation has some shortcomings with respect to the first
goal. It just doesn't play well with code analysis tools due to the injection
magic.

class FoobarService(object):
    db = db_session(...)

    @rpc
    def shrub(self):
        # should auto-complete in editors that support it
        # should not complain when running flake8 or 
        # similar code analysis tools
        self.db.quer...


# should show the same help as if being inside the shrub() 
# method and calling help(self.db)
help(FoobarService.db)

I propose we remove the factory functions for injeciton providers and adopt
a different protocol to declare and manage injections.

class FoobarService(object):
    db = DbSession(...)

    ...

The first step is to declare injections via prototypes.
The prototypes are instances of the same type as the objects being injected
during a worker life-cycle. This will get rid of the issue we currently have
with code analysis tools. Additionally it is more consistent when reading
the code.

We now have a new challenge, the life-cycle management of the injection.
For each running container we require a provider instance which generates
injection instances which are applied to a worker instances of the service
class.

We can associate an injection provider with the prototype via it's type,
using a decorator.

class DbSessionProvider(InjectionProvider):
    def __init__(self, prototype):
        self.connection_url = prototype.connection_url

    def acquire_injection(self, worker_ctx):
        return DbSession(self.connection_uri, ...)


@injection(DbSessionProvider)
class DbSession(object):
    ...
    def query(self, query, *arg, **kwargs):
        ...

We can now build a map to allow a container to inspect a service class and infer
what provider class to use for a particular attribute (injection).
When a provider is instantiated it will be given the prototype to allow proper
initialization. We could also make it part of an existing or additional life-cycle
method. From there on it will behave the same way providers behave now.

A minimal implementation of the injection provider and helper API would look something like:

injection_provider_map = {}

def injection(provider_cls):

    def injection_decorator(cls)
        injection_provider_map[cls] = provider_cls
        return cls

    return injection_decorator


def get_provider_class(prototype):
    return injection_provider_map[type(prototype)]

branch coverage

Now that we're no longer on a coverage fork, we should switch on branch coverage (once we get that up to 100%)

Middleware hook

Would it be an idea to create a hook for http middleware? I am trying to consume the service from the browser, and therefore need to set CORS-headers. I got it working by returning custom Response objects instead of json, but doing this in middleware might make more sense.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.