Comments (6)
OK some updates here and have now a wip example of python socket.io + sanic + asyncio working together more or less: https://gist.github.com/wallyqs/c8d16cf2fa5aa2e4f4e49b714f96575d
Not very familiar with the python socket.io framework so feedback welcome, but so far it does not look like there are any blockers in using these tools together, (would be nice to have an implementation of the class AsyncNatsManager(AsyncPubSubManager):
and a more complete example though).
import asyncio
from sanic import Sanic
from sanic.response import html
import socketio
# --- NATS PubSub Manager ---
import nats
from nats.aio.client import Client as NATS
from socketio.asyncio_pubsub_manager import AsyncPubSubManager
import json
class AsyncNatsManager(AsyncPubSubManager):
name = 'asyncionats'
def __init__(self,
servers=None,
channel='socketio',
write_only=False,
loop=asyncio.get_event_loop(),
):
if servers is None:
servers = ["nats://127.0.0.1:4222"]
self.servers = servers
self.queue = asyncio.Queue()
# Establish single connection to NATS for the client.
self.nc = None
super().__init__(channel=channel, write_only=write_only)
async def _publish(self, data):
print("Socket.io <<- NATS :", data)
# Send the client events through NATS
if self.nc is None:
self.nc = NATS()
await self.nc.connect(servers=self.servers)
# Skip broadcasted messages that were received from NATS.
if data['event'] != 'event':
payload = json.dumps(data['data']).encode()
await self.nc.publish("socketio.{}".format(data['event']), payload)
async def _listen(self):
if self.nc is None:
self.nc = NATS()
await self.nc.connect(servers=self.servers)
# Close over the socketio to be able to emit within
# the NATS callback.
sio = self
async def message_handler(msg):
nonlocal sio
print("NATS ->> Socket.io:", msg.data.decode())
data = json.loads(msg.data.decode())
# Broadcast the bare message received via NATS as a Socket.io event
await sio.emit('nats', data, namespace='/test')
await self.queue.put(data)
await self.nc.subscribe(self.channel, cb=message_handler)
return await self.queue.get()
# --- Sanic + Socket.io based Application with attached PubSub Manager ---
app = Sanic()
mgr = AsyncNatsManager()
sio = socketio.AsyncServer(client_manager=mgr, async_mode='sanic')
sio.attach(app)
@app.route('/')
async def index(request):
with open('app.html') as f:
return html(f.read())
@sio.on('event', namespace='/test')
async def test_message(sid, message):
await sio.emit('response', {'data': message['data']}, room=sid,
namespace='/test')
@sio.on('nats', namespace='/test')
async def test_nats_message(sid, message):
print("NATS msg!!!!!!!!", message, sid)
await sio.emit('response', {'data': message['data']}, room=sid,
namespace='/test')
@sio.on('connect', namespace='/test')
async def test_connect(sid, environ):
print("Client connected", sid)
await sio.emit('response', {'data': 'Connected', 'count': 0}, room=sid,
namespace='/test')
@sio.on('disconnect', namespace='/test')
def test_disconnect(sid):
print('Client disconnected')
app.static('/static', './static')
if __name__ == '__main__':
app.run()
<!DOCTYPE HTML>
<html>
<head>
<title>NATS + SocketIO </title>
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function(){
namespace = '/test';
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);
socket.on('connect', function() {
$('#log').append('<br />Connected!');
socket.emit('event', {data: 'Connection started...'});
});
socket.on('disconnect', function() {
socket.emit('event', {data: 'Disconnecting...'});
$('#log').append('<br />Disconnected!');
});
socket.on('response', function(msg) {
$('#log').append('<br />Received: ' + msg.data);
});
socket.on('nats', function(msg) {
console.log("NATS???");
$('#log').append('<br />NATS: ' + msg.data);
});
// event handler for server sent data
$('form#emit').submit(function(event) {
socket.emit('event', {data: $('#emit_data').val()});
return false;
});
});
</script>
</head>
<body>
<h1>NATS + SocketIO</h1>
<h2>Publish to NATS</h2>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Publish">
</form>
<h2>Receive:</h2>
<div><p id="log"></p></div>
</body>
</html>
from nats.py.
I will check this one today.
from nats.py.
@wallyqs Any news?
from nats.py.
@Negashev - Any progress on this. I was also thinking to replace rabbitmq with NATS, in my case as well the data is serialized via pickle before it is finally send to the messaging server.
Is it possible or is it stuck because of some bug ?
from nats.py.
@wallyqs Any news?
from nats.py.
Closing for now since no blockers, would be good to have AsyncPubSubManager
implementation for NATS but that may be done on a new repo.
from nats.py.
Related Issues (20)
- Publishers on same subject don't get processed in prallel HOT 3
- Can't install version 2.2.0 HOT 9
- Unable to connect using Bcrypted password HOT 1
- Add compression field for stream config
- ping/pong is not working HOT 1
- msg.ack_sync(timeout=15.0) but acknowledgment floor stays at 8 HOT 2
- CERTIFICATE_VERIFY_FAILED HOT 2
- Message publish failure after reconnection
- Jetstream no available responders HOT 1
- published wheels would be really helpful
- [JetStream] Publishing to a subject without stream will fail in nats.errors.TimeoutError
- Object Store replication from LeafNode to Cluster nats: error: nats: no stream matches subject HOT 2
- Placement setting has no effect when creating a new key-value bucket HOT 1
- Using the async iterator is significantly slower compared to using callbacks in a subscription
- Implement Consumer Pause
- Upon reconnect in k8s, python nats client does not receive messages
- Client does not handle connect URL containing multiple servers HOT 4
- nats-py broken while running opentelemetry instrumentation 0.44b0
- Add Service API HOT 3
- KV should raise an error if the key contains unacceptable characters
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from nats.py.