cascadium / openmama-zmq Goto Github PK
View Code? Open in Web Editor NEWOpenMAMA ZeroMQ Bridge
License: MIT License
OpenMAMA ZeroMQ Bridge
License: MIT License
HI Frank:
I notice that you provide what looks like a complete implementation of the mamaIO_* functions in the zmq bridge, although it's not at all clear to me what those functions do, or what one would use them for.
I also notice that those functions appear to be completely unused.
Are these vestigial appendages that exist merely to allow the bridge to pass the Mama unit tests? Or do they serve some purpose that I'm not seeing?
Thanks!
Bill
It looks like there are some problems synchronizing subscription deletion with incoming messages for the subscription.
In a simple test program I started getting SEGV's, which resolved down to this stack trace:
#0 0x00007f9c9c5d37ce in zmqBridgeMamaTransportImpl_queueCallback (queue=0xce8d80, closure=0x110de00) at /home/bt/work/OpenMAMA-zmq/master/src/transport.c:914
status = MAMA_STATUS_OK
tmpMsg = 0x0
bridgeMsg = 0x0
pool = 0x0
node = 0x110de00
tmsg = 0x110de40
bufferSize = 115
buffer = 0x110de60
subject = 0x110de60 "_INBOX.centos65vm.08882"
subscription = 0xcf8be0
impl = 0x0
queueImpl = 0x0
#1 0x00007f9c9ca68499 in wombatQueue_dispatchInt (queue=0xce8f00, data=0x0, closure=0x0, isTimed=1 '\001', timout=500) at common/c_cpp/src/c/queue.c:319
impl = 0xce8f00
head = 0xce90d0
cb = 0x7f9c9c5d3726 <zmqBridgeMamaTransportImpl_queueCallback>
closure_ = 0x110de00
data_ = 0xce8d80
#2 0x00007f9c9ca6851a in wombatQueue_timedDispatch (queue=0xce8f00, data=0x0, closure=0x0, timeout=500) at common/c_cpp/src/c/queue.c:335
No locals.
#3 0x00007f9c9c5d0ff8 in zmqBridgeMamaQueue_dispatch (queue=0xce8e60) at /home/bt/work/OpenMAMA-zmq/master/src/queue.c:261
status = WOMBAT_QUEUE_OK
impl = 0xce8e60
#4 0x00007f9c9ca45641 in mamaQueue_dispatch (queue=0xce8d80) at mama/c_cpp/src/c/queue.c:824
impl = 0xce8d80
status = MAMA_STATUS_OK
#5 0x00007f9c9ca46138 in dispatchThreadProc (closure=0xce9c20) at mama/c_cpp/src/c/queue.c:1294
impl = 0xce9c20
#6 0x000000394c007aa1 in start_thread () from /lib64/libpthread.so.0
No symbol table info available.
#7 0x000000394b8e8aad in clone () from /lib64/libc.so.6
No symbol table info available.
(gdb) p *impl
Cannot access memory at address 0x0
(gdb) p *subscription
$1 = {mMamaCallback = {onCreate = 0x7f9c9d448410 <vtable for Transact::MamaMessageImpl+16>, onError = 0x4bb8e200, onMsg = 0x111dbd0, onQuality = 0x0, onGap = 0x1, onRecapRequest = 0x0,
onDestroy = 0x7f9c00000000}, mMamaSubscription = 0x0, mMamaQueue = 0xcf8c10, mZmqQueue = 0xcf8c10, mTransport = 0x0, mSymbol = 0x0, mSubjectKey = 0x0, mClosure = 0x0, mIsNotMuted = 0,
mIsValid = 1, mIsTportDisconnected = 0, mMsg = 0x8b, mEndpointIdentifier = 0x1120b60 "\220\304", <incomplete sequence \313>}
Note that impl
is NULL, but that subscription
appears well-formed, and its mIsNotMuted
is set to 0, indicating that the subscription has been destroyed on another thread.
So, I tried changing the order of the code to check whether the subscription is muted before calling endpointPool_isRegistedByContent
, but that just made the crash more intermittent, by reducing the size of the window. (I was hoping that subscription deletion was somehow synchronized with incoming subscription messages, and that the original crash was simply a bug in the order in which conditions were checked, but that appears not to be the case).
So. it looks to be a race condition between subscription deletion and processing incoming messages for the subscription.
Any hints or tips on how to fix it (or where to start looking) would be greatly appreciated!
I'm thinking mostly:
ZMQ_SNDHWM
ZMQ_RCVHWM
ZMQ_AFFINITY
ZMQ_IDENTITY
ZMQ_SNDBUF
ZMQ_RCVBUF
ZMQ_RECONNECT_IVL
ZMQ_RECONNECT_IVL_MAX
ZMQ_BACKLOG
ZMQ_MAXMSGSIZE
ZMQ_RCVTIMEO
ZMQ_SNDTIMEO
Need to be careful with any types and should contribute convenience property parsing options back to OpenMAMA.
After building from head, attempts to use the library produce the following error:
2017-05-12 14:41:20: mama_loadmamaPayload(): Could not open middleware bridge library [mamazmqimpl] [/build/share/OpenMAMA-zmq/master/lib/libmamazmqimpl.so: undefined symbol: uuid_unparse]
This doesn't happen with 1.0 branch.
When comparing the two builds, the 1.0 build reports the following:
$ readelf -d ./OpenMAMA-zmq-1.0/libmamazmqimpl.so | grep NEEDED
Dynamic section at offset 0xe688 contains 25 entries:
Tag Type Name/Value
0x0000000000000001 (NEEDED) Shared library: [libmama.so]
0x0000000000000001 (NEEDED) Shared library: [libuuid.so.1]
0x0000000000000001 (NEEDED) Shared library: [libzmq.so.3]
0x0000000000000001 (NEEDED) Shared library: [libevent-1.4.so.2]
0x0000000000000001 (NEEDED) Shared library: [libpthread.so.0]
0x0000000000000001 (NEEDED) Shared library: [libc.so.6]
The master build reports the following:
$ readelf -d ./OpenMAMA-zmq/build/src/libmamazmqimpl.so | grep NEEDED
Dynamic section at offset 0x1d6a8 contains 25 entries:
Tag Type Name/Value
0x0000000000000001 (NEEDED) Shared library: [libmama.so]
0x0000000000000001 (NEEDED) Shared library: [libzmq.so.5.1.1]
0x0000000000000001 (NEEDED) Shared library: [libevent-1.4.so.2]
0x0000000000000001 (NEEDED) Shared library: [libc.so.6]
Notice that libuuid.so.1 is listed as NEEDED for 1.0 branch, but not for master.
I'm not sure how the project can even build without libuuid, but it definitely won't run, at least in my environment.
Thanks for any help!
Metadata frames would allow the bridge to inject Meta data about the payload which is currently at the start of the payload (topic, payload type, size etc). Possibly 3 frames. Should also include bridge protocol version.
Build system will still be scons. Dependencies will be:
First impressions is that this will be fairly awkward to do right. I did get it compiled already but I'm not happy with the build paths and flags required.
Hi,
I am trying to set up multiple clients subscribing to a single publisher.
Does the zmq bridge support this?
If so how do I configure the mama.properties?
I know this is supported in qpid but that required an extra transport option (reply_url).
e.g mama.qpid.transport.sub2.reply_url=amqp://qtdeva06:6666
Thanks
James
Investigate what's required (if anything) to support http://api.zeromq.org/4-0:zmq-pgm
This is important for daemon-free fan-in and fan-out (e.g. subscribing to more than one publisher in a single transport).
This work has been completed but it depends on an OpenMAMA pull request to land first so I have staged the change in a new 'next' branch until it lands and the next OpenMAMA release is out.
Including number of nodes to allocate and the size of each node.
See http://api.zeromq.org/4-0:zmq-inproc for details. Will need performance tested against existing wombatQueue to see if it's an improvement.
Moving context of the memory pool to the queue to which the message is being passed would mean less contention on the dispatch thread for many-worker-thread applications.
Getting the error above when running a simple test pgm that creates multiple subscribers.
The full msg is:
2017-06-02 14:58:43: zmqBridgeMamaTransportImpl_dispatchThread(): Cannot currently handle multiple subscribers in same application
Curious if this is on the "roadmap", or if you have any hints/tips as to the issue. I'm happy to roll up my sleeves and take a whack at it...
Thanks!
I'm having an issue with the zmq bridge, for instance I have an app in Java and this app lets to stop and restart a publisher to OpenMama any moment without finish the app. When started, the publisher runs in a thread different to main. Now when I choose to stop the publisher thread the JVM crashes (and the whole app crashes). It seems that the crash is due to a null pointer in the JNI layer but I'm not sure. It was tested with the qpid bridge and its ok, you can start/stop the connection whenever you want (and using any transport) but when you use zmq as middleware and stops the client then happens what I'm reporting.
It is happening in both, windows and ubuntu. This is the environment: Windows 10/java 1.8 x86/OpenMama 2.4.0 x86/zmq bridge x86/ and Ubuntu 14.04/java 1.8/OpenMama 2.4.0/zmq.
I have pushed a repo with an eclipse project that lets to recreate this scenario. You can find it in here
Curious about the configurations for pgm/epgm -- specifically, why not use the same endpoint for both pub and sub?
I've done some testing locally with the following configuration in mama.properties, and it all seems to work fine:
mama.zmq.transport.pubsub_epgm.outgoing_url_0=epgm://127.0.0.1;239.192.1.1:5657
mama.zmq.transport.pubsub_epgm.incoming_url_0=epgm://127.0.0.1;239.192.1.1:5657
Then passing -tport pubsub_epgm to both publisher and subscriber.
I'm able to start multiple publishers and subscribers, and all subscribers get all messages from all publishers.
That setup seems much simpler, esp. for applications which are both publishers and subscribers.
So, I'm wondering if I'm missing something here? Are there reasons why one would not want to do this?
TIA...
I've been spending some time working on request/reply with the 0mq bridge, and working through some problems. Based on that, I think the current approach is unworkable, but I'm curious to know if you agree, and if so, whether you have any thoughts on how to make it work.
The main problem appears to be that each inbox is associated with a unique topic name, which the inbox subscribes to in order to receive replies.
Most of the time that works, but since filtering in 0mq is (usually) done on the publisher, and since there is a window between the time that a subscription is created and when it is known by the publisher, occassionally replies just "go missing" -- the zmq_send call (zmqBridgeMamaPublisher_sendReplyToInboxHandle => zmqBridgeMamaPublisher_send) on the "server" succeeds, but the reply message is never put on the wire. (I was able to confirm this using the very handy zmtpdump program, and interpolating its output with enhanced logs from both the client and server test programs).
That also explains why I see the problem with TCP and IPC connections, but not with EPGM, since with EPGM filtering is done on the subscriber, rather than on the publisher.
So, I'm wondering if you have any thoughts on how best to deal with this? Off the top of my head, it seems like some kind of two-level subscription might work -- that is, where the subscriber subscribes to a broad topic (e.g., "_INBOX.{machine_name}.{process_id}") and then further filters incoming messages based on some additional fields in the message.
One nice thing about that approach is it could also be used to implement wildcard subscriptions, which are currently not supported at all with the 0mq bridge.
If you have time to give this some thought, I'd be very interested in any suggestions you might have. Thanks!
The zmq transport references the collection of subscription endpoints in two threads: the IO thread reads from the zmq socket (in zmqBridgeMamaTransportImpl_dispatchThread
, checks the endpoint pool to see if there are any subscribers for the message, and if so enqueues it once for each subscriber. The messages are dequeued in zmqBridgeMamaTransportImpl_queueCallback
, running on a different thread, which also queries the endpoint pool.
However, application code running in the zmqBridgeMamaTransportImpl_queueCallback
thread can also modify the endpoint pool, typically by deleting subscriptions. Mama subscriptions must be deleted from the thread that created them (by calling mamaSubscription_destroy
), or if that is not possible by calling mamaSubscription_destroyEx
, which queues the destroy on the callback thread, where it is destroyed by mamaSubscription_DestroyThroughQueueCB
.
Additionally, subscriptions can be created on any thread, either directly or by calling functions that create subscriptions "under the covers" (like creating an inbox).
It would seem that the endpoint pool would need to be synchronized with a mutex, but it is not.
Also, for some reason, this synchronization does not appear to be necessary with the qpid transport (why?), but it is definitely required with zmq.
I have a pull request ready to go -- in my case I've made the change in the OpenMAMA code, so it would affect both qpid and zmq. However, if the synchronization is really not required for qpid (why?), then it might make more sense to make the change in zmq. In that case, it would be necessary to come up with a way to ensure that the correct (thread-safe) version of the endpoint pool code gets loaded at runtime for zmq (and any other transports that require it).
Please let me know your thoughts.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.