nameko / nameko Goto Github PK
View Code? Open in Web Editor NEWPython framework for building microservices
Home Page: https://www.nameko.io
License: Apache License 2.0
Python framework for building microservices
Home Page: https://www.nameko.io
License: Apache License 2.0
this is pretty bad ux. can we do better?
Is there a way to receive websocket connect/disconnects events from WebSocketHubProvider dependency? Just "experimenting".
here's one of the examples from the docs:
with ClusterRpcProxy(config) as cluster_rpc:
hello_res = cluster_rpc.service_x.remote_method.async("hello")
world_res = cluster_rpc.service_x.remote_method.async("world")
# do work while waiting
hello_res.result() # "hello-x-y"
world_res.result() # "world-x-y"
It will fail inside the coroutine:
async def test(x):
x.async()
will produce something like this:
x.async()
^
SyntaxError: invalid syntax
Perhaps adding an alias for async()
method or smth. like that wouldn't be that hard, I'll take a look.
I write a test_server and test_client to simple use nameko:
here is my repo : https://github.com/jiamo/nameko_research
The test_service_client.py get some codes from cli module.
can't get response.
So is there some misunderstandings in my repo.
The following should not raise an error. Instead it should behave as if no sensitive_variables
were specified.
from nameko.containers import ServiceContainer
from nameko.testing.utils import get_extension
from nameko.utils import get_redacted_args
from nameko.web.handlers import HttpRequestHandler, http
class Service(object):
name = 'foo'
@http('GET', '/bar')
def bar(self, request, arg):
return 'bar'
container = ServiceContainer(Service, {})
entrypoint = get_extension(container, HttpRequestHandler)
print(get_redacted_args(entrypoint, "arg"))
could we let extensions register argparse
options (e.g. via setuptools entrypoints) for configuration when run via nameko run
?
could we extend this to a nicer way to set options when hosting via code (i.e. something better than importing FOO_CONFIG_KEY
)
Now that we're no longer on a coverage fork, we should switch on branch coverage (once we get that up to 100%)
headers are created and passed to the standalone dispatcher impl.
https://github.com/onefinestay/nameko/blob/2.0/nameko/events.py#L92
If headers change before dispatching an event (think auth, user-id, ...) the current impl. will not behave as one might expect. It does however work using low-level messaging and in nameko<2.0
Conceptually it should probably look more like:
def get_dependency(self, worker_ctx):
def dispatch(event_type, event_data):
headers = self.get_message_headers(worker_ctx)
kwargs = self.kwargs
dispatcher = event_dispatcher(self.config, headers=headers, **kwargs)
dispatcher(self.service_name, event_type, event_data)
return dispatch
Would it be an idea to create a hook for http middleware? I am trying to consume the service from the browser, and therefore need to set CORS-headers. I got it working by returning custom Response objects instead of json, but doing this in middleware might make more sense.
We should start using a random vhost for test runs rather than trying to clean up an existing one.
reset_rabbit_connections
doesn't work properly because changes in the rabbit management interface aren't reflected immediately, so if the vhost is dirty there's nothing we can do about it.
It would be better to use a random vhost (unless requested otherwise) and still fail the test at the end if there are connections left over. reset_rabbit_connections
should be removed.
First of all, nameko is a great library! I want to replace my own creepy "microservice framework" with it.
It would be great if I could configure the rpc queues (rpc.RpcConsumer). I would need the following parameters:
What do you think?
cheers,
chris
c.f. #286
It's not helpful to show the stop()
immediately after starting in https://github.com/onefinestay/nameko/blob/master/docs/examples/service_container.py#L16 and the runner example. Better to show wait()
and point people towards the nameko.cli.run
module for an importable function for running services programmatically.
Is there any way to suspend service execution at runtime? So it stops to process incoming messages?
Hi devs,
I'd like to make a client-server version of the library,
that won't need any broker (AMQP / rabbitmq), but instead a simple client-server protocol like http://mprpc.readthedocs.org/
Is it possible? What should I do for this?
nameko looks like a very nice project and it could be very helpful in our application. That said, we use SQS. Is it feasible to support SQS in nameko or does nameko require certain AMQP features? If it's feasible, we'd be happy to take a look at adding the support (or at building a nemako-sqs overlay).
The Timer
claims to
# next time, sleep however long is left of our interval, taking
# off the time we took to run
but we only measure the time of spawn_worker
, not the actual entrypoint run
looks like this has been broken a very long time
try e.g.
import time
from nameko.timer import timer
clock = time.time()
class Service(object):
name = 'timertest'
@timer(1)
def tick(self):
print 'tick', time.time() - clock
time.sleep(5)
we should either remove the broken "feature", or if we want this despite living without it for this long, fix it (can use a pattern similar to the http entrypoint handler https://github.com/onefinestay/nameko/blob/master/nameko/web/handlers.py#L48)
Running test/legacy/test_nova.py, the two assert not g
assertions in test_send_rpc_errors
and test_send_rpc_multi_message_reply_ignores_all_but_last
have failed on rare occasions for me. There were no unusual log messages, but the tests were being run on an underpowered instance.
Hi! I would like if we could think of a better solution for this.
https://github.com/onefinestay/nameko/blob/master/nameko/web/handlers.py#L85-L86
except Exception as exc:
if (
isinstance(exc, self.expected_exceptions) or
isinstance(exc, BadRequest)
):
status_code = 400
else:
status_code = 500
error_dict = serialize(exc)
payload = u'Error: {exc_type}: {value}\n'.format(**error_dict)
response = Response(
payload,
status=status_code,
)
return response
Actually It's always catching all exceptions and returning a 500 error (except BadRequest), but I think the exception handling of those errors shouldn't be there, but controlled by the developer in case He/She wants to catch an expected custom exception.
Because of this design decision I also find It very difficult to debug because I can't get the traceback right away while developing, just the 500, but the full trace is expected in the log IMHO. I know the error is in the response data, but a change in the log format is needed to get the exception trace thrown right away on the command line.
Thank you for Nameko! 👍
I think there is an issue when using grequests with nameko, a simple import will cause the service to hang up and do not respond (actually the connection to amqp is not even established):
import grequests
from nameko.events import event_handler
class TestService(object):
name = 'test'
@event_handler('foo', 'bar')
def baz(self):
pass
Otherwise all works fine so I guess it's maybe an issue upstream with gevent vs. amqp or something like that. Anyway, that might be of interest to document that if that's a limitation.
The main goal of nameko is to make it easy for developers to write services. A secondary goal is to make it easy for developers to write injection and entry providers to support the first goal.
The current implementation has some shortcomings with respect to the first
goal. It just doesn't play well with code analysis tools due to the injection
magic.
class FoobarService(object):
db = db_session(...)
@rpc
def shrub(self):
# should auto-complete in editors that support it
# should not complain when running flake8 or
# similar code analysis tools
self.db.quer...
# should show the same help as if being inside the shrub()
# method and calling help(self.db)
help(FoobarService.db)
I propose we remove the factory functions for injeciton providers and adopt
a different protocol to declare and manage injections.
class FoobarService(object):
db = DbSession(...)
...
The first step is to declare injections via prototypes.
The prototypes are instances of the same type as the objects being injected
during a worker life-cycle. This will get rid of the issue we currently have
with code analysis tools. Additionally it is more consistent when reading
the code.
We now have a new challenge, the life-cycle management of the injection.
For each running container we require a provider instance which generates
injection instances which are applied to a worker instances of the service
class.
We can associate an injection provider with the prototype via it's type,
using a decorator.
class DbSessionProvider(InjectionProvider):
def __init__(self, prototype):
self.connection_url = prototype.connection_url
def acquire_injection(self, worker_ctx):
return DbSession(self.connection_uri, ...)
@injection(DbSessionProvider)
class DbSession(object):
...
def query(self, query, *arg, **kwargs):
...
We can now build a map to allow a container to inspect a service class and infer
what provider class to use for a particular attribute (injection).
When a provider is instantiated it will be given the prototype to allow proper
initialization. We could also make it part of an existing or additional life-cycle
method. From there on it will behave the same way providers behave now.
A minimal implementation of the injection provider and helper API would look something like:
injection_provider_map = {}
def injection(provider_cls):
def injection_decorator(cls)
injection_provider_map[cls] = provider_cls
return cls
return injection_decorator
def get_provider_class(prototype):
return injection_provider_map[type(prototype)]
we should put a (universal) wheel on pypi
Is there anything like Flower for Celery, or is this not comparable? Or maybe you have a special setup of external tools?
did anybody take a swing at profiling nameko? Like how much time is consumed outside RabbitMQ operations? etc..
There are a few places in the docs where we're calling out gotchas or anticipating common problems (e.g. https://github.com/onefinestay/nameko/pull/290/files#diff-394c075d9b8535b91f211556731beeacR25)
Ideally these would be all in the same place so that:
Timeout in entry_point_waiter doesn't trickle down into wait_for_worker_idle. This becomes a problem is entry_point_waiter timeout is higher than the one in wait_for_worker_idle, as only the lowest timeout value will apply. This causes the entry point waiter to timeout too soon.
https://github.com/onefinestay/nameko/blob/master/nameko/testing/services.py#L100
When run nameko run helloworld
in Mac,has a error:
File "/home/lihang/Buildout/eggs/amqp-1.4.6-py2.7.egg/amqp/transport.py", line 95, in __init__
raise socket.error(last_err)
socket.error: [Errno 61] ECONNREFUSED
And again ... nameko is great! :)
Is it possible to register an event_handler for more than just one service? something like:
@event_handler(["service_a", "service_b"], "event")
def bla(self, payload):
pass
I have a proof of concept:
class MultiServiceEventHandler(EventHandler):
@classmethod
def decorator(cls, *args, **kwargs):
def registering_decorator(fn, args, kwargs):
if isinstance(args[0], list):
args = list(args)
services = args.pop(0)
for service in services:
instance = cls(service, *args, **kwargs)
register_entrypoint(fn, instance)
else:
instance = cls(*args, **kwargs)
register_entrypoint(fn, instance)
return fn
if len(args) == 1 and isinstance(args[0], types.FunctionType):
# usage without arguments to the decorator:
# @foobar
# def spam():
# pass
return registering_decorator(args[0], args=(), kwargs={})
else:
# usage with arguments to the decorator:
# @foobar('shrub', ...)
# def spam():
# pass
return partial(registering_decorator, args=args, kwargs=kwargs)
event_handler = MultiServiceEventHandler.decorator
Do you plan something like this or should I stick with my own EventHandler?
cheers,
chris
can use git for-each-ref --sort=taggerdate --format '%(refname) %(taggerdate:short)' refs/tags
i start nmeko with the demo like :
$ nameko run helloworld
or
$ nameko shell
both 3.5 and 2.7 run into socket error as below :
is there extra config needed ?
....
File "/Users/xxxxxx/lab/python3fornameko/lib/python3.5/site-packages/amqp/transport.py", line 95, in __init__
raise socket.error(last_err)
OSError: [Errno 61] ECONNREFUSED
....
return TCPTransport(host, connect_timeout)
File "D:\Python27\lib\site-packages\amqp\transport.py", line 95, in __init__
raise socket.error(last_err)
socket.error: [Errno 10061] WSAECONNREFUSED
I'm doing a bit of investigation into our handling of exceptions containing non-ascii chars, and it looks like there's a problem with deserializing the error on client side. The client attempts to construct a RemoteError, fails, and so ends up raising a UnicodeEncodeError
error instead...
Could be just a simple fix like: #281 ?
The example service on the index page encourages you to just try to run it, which fails if you haven't installed RabbitMQ. Make it more obvious you need to follow the installation process first.
This should perhaps get lumped into a larger piece of work to separate out the project page from the docs page.
we may want a more prominent link to github from the docs (e.g. in sidebar)
Let me take a scenario where i want to know the number of scheduled threads running for a given program is there a way to know about that either from the cli or from any core module that the nameko have
In the given example lets take i have a function that i have the decorator timer which the interval is given for 10 secs to run so can the function access that interval variable inside the function
from nameko.timer import timer
@timer(interval=10)
def testing(self,interval):
print interval
We do not yet have a section of the docs that explains how expected_exceptions
and sensitive_variables
should be used. Furthermore, sensitive_variables
are not yet supported on the HTTP entrypoint. We should also consider whether there's a better name than expected_exceptions
nameko.web.server.WebServer
doesn't implement a kill
method, meaning that its socket and greenthread are left lying around.
Furthermore, the set of registered providers
is not purged, although that should really be the responsibility of ProviderCollector.kill
, rather than leaving that up to the subclasses.
QueueConsumer
currently empties the set during its own kill()
, which would no longer be required.
Currently blocked on an eventlet bug: eventlet/eventlet#213
We should be able to add pypy to our tox config when the above is resolved.
Hi, first of all - I would like to thank you - I really love the API, it's kind of brilliant.
That noted - I think there is an important functionality lacking - abstraction over serialization protocol.
I see that RPC module has json hardcoded as serialization mechanism.
That presents several problems:
Are there plan for abstracting it ? It doesn't seem to be much work, but I only glanced over rpc implementation, so I could be mistaken.
At least on Windows (haven't tested elsewhere), rabbitmq's default installation denies guest access to the nameko vhost. This means that the Nameko docs are slightly misleading - there is one configuration step required before anything works :)
As administrator, run rabbitmqctl add_vhost nameko
followed by rabbitmqctl set_permissions -p nameko guest ".*" ".*" ".*"
in order to address the issue. I'll create a PR against the docs.
In nameko/testing/pytest.py
@pytest.fixture
def empty_config(request):
from nameko.constants import AMQP_URI_CONFIG_KEY
return {
AMQP_URI_CONFIG_KEY: ""
}
@pytest.fixture
def mock_container(request, empty_config):
from mock import Mock
from nameko.constants import SERIALIZER_CONFIG_KEY, DEFAULT_SERIALIZER
from nameko.containers import ServiceContainer
container = Mock(spec=ServiceContainer)
container.config = empty_config
container.config[SERIALIZER_CONFIG_KEY] = DEFAULT_SERIALIZER
container.serializer = container.config[SERIALIZER_CONFIG_KEY]
container.accept = [DEFAULT_SERIALIZER]
return container
empty_config
is a fixture. so what the request
is?
I found somewhere we use like this:
container.config = {}
so the {}
is passed as the request?
i can't see what the use case is (behaviour is pretty strange)
unless we can think of a good example why one might want this, let's drop it
from nameko.exceptions import serialize
def test_serialize_args():
cause = Exception('oops')
exc = CustomError('something went wrong', cause)
assert json.dumps(serialize(exc))
fails. serialize
probably needs to stringify all args, not just exc
Hi,
We want to experiment with your framework, and we have a number of questions:
Thank you!
If I read this correctly, https://github.com/onefinestay/nameko/blob/2.0/nameko/cli/run.py#L53 asks if there is another way to distinguish between not finding a module and finding a module but having an exception during loading of that module.
You could use https://docs.python.org/2/library/imp.html to first find the module using https://docs.python.org/2/library/imp.html#imp.find_module
If found you can then load it using https://docs.python.org/2/library/imp.html#imp.load_module
In Python 3 you probably want to use https://docs.python.org/3/library/importlib.html#module-importlib
If your test is adding multiple routes on the same topic and method we are left to control the order in which they are looked up by the order we declare them in the test.
add_call_matching
should accept a priority
for clarity which add_routing
will sort by.
There doesn't seem to be any way of overriding message delivery options per-service or per-call like there is with Celery. I will focus on our use cases, but I believe that's an important feature for many existing and potential users of Nameko.
Currently Nameko enforces the following while publishing messages through Kombu:
All messages are persistent - that allows RabbitMQ to be restarted without losing the messages. However, it is also much slower - 10x slower on average as we tested it last time.
In majority of our use cases, performance is far more important than persistency of messages. We're processing millions of events every hour while RabbitMQ died only once in the last year.
All messages are queuing in RabbitMQ until they're received by a target service.
Again, majority of our use cases depend on microservices talking to each other in real time. If one of the services is down, or badly lagging, then we'd like to drop the messages after a while. If there is no expiry, all the queued up messages that nobody is waiting for results for will have to be processed anyway, before getting to the actual real-time ones.
Example: if "login" call is not processed within 10 seconds, then we would fail the main request. We'd like that task to be expire from the queue if it hasn't been picked up in those 10 seconds.
No compression enabled.
Some task types may require larger amount of data to be sent as parameters, or maybe they will return a large response. In those cases, we'd like to be able to set the appropriate compression method where the transfer benefit is bigger than CPU loss.
Example: returning a significant list of database items
Please note, in my code example I only applied compression to parameters, but I'd like to do same for responses.
Please see my initial commit, it's far from ready, but I'm unsure which way would be more appropriate:
astawiarski@e676f11
I am willing to deliver that functionality once we agree that it should be a part of Nameko and how it should be implemented.
Option A: Similar to how it works with Celery - sender can override delivery options for a particular call
$ nameko shell
>>> n.rpc.greeting_service.hello.send(args=[], kwargs={'name': "Matt"}, expire=10, delivery='transient', compression='gzip')
Option B: Allow to configure custom delivery options per-service
$ nameko shell
>>> n.rpc.greeting_service.set_delivery_options(expire=10, delivery='transient', compression='gzip')
>>> n.rpc.greeting_service.hello(name="Matt")
>>> n.rpc.greeting_service.hello.async(name="Matt")
The default AMQP URI is amqp://guest:guest@localhost/nameko. The examples in the docs don't work if there's no nameko vhost. We should change the default URI to use the root vhost so they work out of the box.
Hi,
I was trying to debug a running service, obviously failed putting ipdb inside services rpc methods, I ended using logging for debug.
But, I read about backdoor and tried to use.
in shell 1 I started the service
nameko run myservice --broker mybroker --backdoor-port 5050
then in another shell
nameko backdor localhost:5050
I was expecting to connect to the running service to be able to inspect (just like an ipdb session?)
but got.
rochacbruno@CATHO:~$ nameko backdoor localhost:5050
/bin/netcat
Traceback (most recent call last):
File "/usr/local/bin/nameko", line 11, in <module>
sys.exit(main())
File "/usr/local/lib/python2.7/dist-packages/nameko/cli/main.py", line 26, in main
args.main(args)
File "/usr/local/lib/python2.7/dist-packages/nameko/cli/backdoor.py", line 39, in main
if call(cmd) != 0:
File "/usr/lib/python2.7/subprocess.py", line 522, in call
return Popen(*popenargs, **kwargs).wait()
File "/usr/lib/python2.7/subprocess.py", line 710, in __init__
errread, errwrite)
File "/usr/lib/python2.7/subprocess.py", line 1327, in _execute_child
raise child_exception
OSError: [Errno 2] No such file or directory
Maybe it is my misunderstanding about backdoors or on how to use it.
Thanks so much
When I run the broadcast event example in docs/examples/event_broadcast.py
it dies with the following message:
nameko.events.EventHandlerConfigurationError: Broadcast event handlers cannot be configured with reliable delivery.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.