Start the driver:
$ java -cp aeron-samples/build/libs/samples.jar -Dagrona.disable.bounds.checks=true uk.co.real_logic.aeron.samples.LowLatencyMediaDriver
Start the first receiver, receiver A:
$ java -cp aeron-samples/build/libs/samples.jar -Dagrona.disable.bounds.checks=true -Daeron.sample.frameCountLimit=256 uk.co.real_logic.aeron.samples.RateSubscriber
Start a second receiver, receiver B:
$ java -cp aeron-samples/build/libs/samples.jar -Dagrona.disable.bounds.checks=true -Daeron.sample.frameCountLimit=256 uk.co.real_logic.aeron.samples.RateSubscriber
Start a publisher:
$ java -cp aeron-samples/build/libs/samples.jar -Daeron.sample.messageLength=40 -Daeron.sample.messages=500000000 -Dagrona.disable.bounds.checks=true uk.co.real_logic.aeron.samples.StreamingPublisher
Now, suspend receiver B with control-z. After a second or two, the sending and receiving rate will drop to 0 on the source and on receiver A. Wait a few more seconds, and the send rate will return and messages will flow again. Now unsuspend receiver B by an ‘fg’, and it should start receiving messages again. Now kill receiver B with control-c.
At this point, sending and receiving appears to stop altogether. Source reports no messages sent, receiver A (up the whole time) does not receive any further messages. That’s not good…
Now kill receiver A with a control-c as well. Uh oh - it now reports this:
Shutting down...
Exception in thread "main" uk.co.real_logic.aeron.exceptions.RegistrationException: Could not find stream Id to decrement: 10
at uk.co.real_logic.aeron.ClientConductor.onError(ClientConductor.java:257)
at uk.co.real_logic.aeron.DriverListenerAdapter.onError(DriverListenerAdapter.java:135)
at uk.co.real_logic.aeron.DriverListenerAdapter.onMessage(DriverListenerAdapter.java:120)
at uk.co.real_logic.agrona.concurrent.broadcast.CopyBroadcastReceiver.receive(CopyBroadcastReceiver.java:84)
at uk.co.real_logic.aeron.DriverListenerAdapter.receiveMessages(DriverListenerAdapter.java:56)
at uk.co.real_logic.aeron.ClientConductor.doWork(ClientConductor.java:104)
at uk.co.real_logic.aeron.common.AgentRunner.run(AgentRunner.java:85)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
and hangs forever with this stack:
2015-02-24 14:03:02
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode):
"Attach Listener" #14 daemon prio=9 os_prio=31 tid=0x00007fa1f505c800 nid=0x380f waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" #13 prio=5 os_prio=31 tid=0x00007fa1f4002000 nid=0xe07 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007fa1f3820000 nid=0x5703 waiting on condition [0x0000000136305000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076abb9ff8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007fa1f381f800 nid=0x5503 waiting on condition [0x0000000136202000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076abb9ff8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fa1f5024800 nid=0x5103 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fa1f3023000 nid=0x4f03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fa1f3022800 nid=0x4d03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fa1f3021800 nid=0x4b03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fa1f4001000 nid=0x4903 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fa1f283e000 nid=0x3c17 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fa1f5809000 nid=0x3503 in Object.wait() [0x000000012bf2d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076ab062f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142)
- locked <0x000000076ab062f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fa1f5808000 nid=0x3303 in Object.wait() [0x000000012be2a000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076ab05d68> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
- locked <0x000000076ab05d68> (a java.lang.ref.Reference$Lock)
"VM Thread" os_prio=31 tid=0x00007fa1f5805800 nid=0x3103 runnable
"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fa1f280e000 nid=0x140b runnable
"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fa1f281d000 nid=0x2313 runnable
"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fa1f281d800 nid=0x2503 runnable
"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fa1f281e800 nid=0x2703 runnable
"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fa1f281f000 nid=0x2903 runnable
"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fa1f281f800 nid=0x2b03 runnable
"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fa1f2820000 nid=0x2d03 runnable
"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fa1f2821000 nid=0x2f03 runnable
"VM Periodic Task Thread" os_prio=31 tid=0x00007fa1f5025800 nid=0x5303 waiting on condition
JNI global references: 326
Meanwhile, at the same time that receiver A was control-c’d, the driver reports this:
[44947.428765] EXCEPTION [591/591]: java.lang.IllegalStateException(Could not find stream Id to decrement: 10)
uk.co.real_logic.aeron.driver.ReceiveChannelEndpoint.decRefToStream ReceiveChannelEndpoint.java:114
uk.co.real_logic.aeron.driver.DriverConductor.onRemoveSubscription DriverConductor.java:513
uk.co.real_logic.aeron.driver.DriverConductor.onClientCommand DriverConductor.java:306
uk.co.real_logic.aeron.driver.DriverConductor$$Lambda$38/1327763628.onMessage null:-1
uk.co.real_logic.agrona.concurrent.ringbuffer.ManyToOneRingBuffer.read ManyToOneRingBuffer.java:144
and is using up an awful lot of CPU; here’s a stack sample of the driver after this exception while it’s burning CPU:
015-02-24 14:04:41
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.31-b07 mixed mode):
"Attach Listener" #13 daemon prio=9 os_prio=31 tid=0x00007fea39a8c000 nid=0x3a0b waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"driver-conductor" #12 prio=5 os_prio=31 tid=0x00007fea39adb800 nid=0x5903 runnable [0x00000001290a6000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.DatagramChannelImpl.receive0(Native Method)
at sun.nio.ch.DatagramChannelImpl.receiveIntoNativeBuffer(DatagramChannelImpl.java:429)
at sun.nio.ch.DatagramChannelImpl.receive(DatagramChannelImpl.java:407)
at sun.nio.ch.DatagramChannelImpl.receive(DatagramChannelImpl.java:360)
- locked <0x000000076b5ed2e8> (a java.lang.Object)
at uk.co.real_logic.aeron.driver.UdpChannelTransport.receive(UdpChannelTransport.java:294)
at uk.co.real_logic.aeron.driver.UdpChannelTransport.pollFrames(UdpChannelTransport.java:241)
at uk.co.real_logic.aeron.driver.TransportPoller.pollTransports(TransportPoller.java:146)
at uk.co.real_logic.aeron.driver.DriverConductor.doWork(DriverConductor.java:201)
at uk.co.real_logic.aeron.common.AgentRunner.run(AgentRunner.java:85)
at java.lang.Thread.run(Thread.java:745)
"receiver" #11 prio=5 os_prio=31 tid=0x00007fea39a61000 nid=0x5703 runnable [0x0000000128fa3000]
java.lang.Thread.State: RUNNABLE
at uk.co.real_logic.aeron.common.AgentRunner.run(AgentRunner.java:85)
at java.lang.Thread.run(Thread.java:745)
"sender" #10 prio=5 os_prio=31 tid=0x00007fea39a60800 nid=0x5503 runnable [0x0000000128ea0000]
java.lang.Thread.State: RUNNABLE
at uk.co.real_logic.aeron.driver.Sender.doSend(Sender.java:123)
at uk.co.real_logic.aeron.driver.Sender.doWork(Sender.java:50)
at uk.co.real_logic.aeron.common.AgentRunner.run(AgentRunner.java:85)
at java.lang.Thread.run(Thread.java:745)
"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007fea3a805000 nid=0x5103 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007fea3b80b000 nid=0x4f03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007fea3b80a800 nid=0x4d03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007fea3b809800 nid=0x4b03 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007fea3988e800 nid=0x4903 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007fea3988e000 nid=0x3d0b runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007fea3986f800 nid=0x3503 in Object.wait() [0x000000011e8f0000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076ab062f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:142)
- locked <0x000000076ab062f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:158)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007fea3986f000 nid=0x3303 in Object.wait() [0x000000011e7ed000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000076ab05d68> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
- locked <0x000000076ab05d68> (a java.lang.ref.Reference$Lock)
"main" #1 prio=5 os_prio=31 tid=0x00007fea39812800 nid=0xe07 waiting on condition [0x0000000100da0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at uk.co.real_logic.aeron.common.concurrent.SigIntBarrier.await(SigIntBarrier.java:54)
at uk.co.real_logic.aeron.samples.LowLatencyMediaDriver.main(LowLatencyMediaDriver.java:39)
"VM Thread" os_prio=31 tid=0x00007fea3986c000 nid=0x3103 runnable
"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007fea3981e000 nid=0x150b runnable
"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007fea3a800000 nid=0x1407 runnable
"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007fea3a003000 nid=0x2503 runnable
"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007fea3a801000 nid=0x2703 runnable
"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007fea3a004000 nid=0x2903 runnable
"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007fea3a801800 nid=0x2b03 runnable
"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007fea3a802000 nid=0x2d03 runnable
"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007fea3981e800 nid=0x2f03 runnable
"VM Periodic Task Thread" os_prio=31 tid=0x00007fea3a83e000 nid=0x5303 waiting on condition
JNI global references: 473
At this point, starting up a new receiver does allow messages to be sent (still from the original source, which has been up the whole time) and received again by the brand new receiver.