dehora / nakadi-java Goto Github PK
View Code? Open in Web Editor NEW๐ Client library for the Nakadi Event Broker (examples: http://bit.ly/njc-examples, site: https://dehora.github.io/nakadi-java/)
License: MIT License
๐ Client library for the Nakadi Event Broker (examples: http://bit.ly/njc-examples, site: https://dehora.github.io/nakadi-java/)
License: MIT License
There are also other certificates that are not in the default trust store, such as letsencrypt, startcom, godaddy.
It is not clear to me when startBlocking()
should be preferred over start()
.
startBlocking()
immediately returns although the library is consuming events. This is because event emissions and subscriptions are performed on the monoComputeScheduler
defined in the nakadi.StreamProcessor
class. From the name I expected the calling thread to block.
The API should have a way to express undefined and business categories. These are problematic because they leave no hook to hang custom data off. In the business case, there's also a magic field called metadata that's mixed in with the custom fields. The data change category doesn't have this problem as it defines a data
field to hold custom information.
The readme says
If no offset observer is given, the default observer used is LoggingStreamOffsetObserver which simply logs when it is invoked.
This is true, but unwise. If a client connects to a subscription stream and does not set the observer, it will be configured with the logging observer and the subscription API checkpointer, which it then calls ( LoggingStreamObserver.java#L54). This is a problem if the client accidentally forgets to attach an intended observer, as the stream will be logged and checkpointed moving the position forward in the stream. Since the subscription API doesn't allow rewind, this effectively has the potential for silent data loss.
Filing as a bug, as this has operational impact.
#120 introduced an experimental option to post raw strings as events. The code hacks in the JSON array required by the server using string concatentation, but doesn't enforce UTF8.
Currently the client's being published under net.dehora.nakadi
into bintray/jcenter. This can be switched to org.zalando
at some point.
To handle half-open connections or where the server has gone away but the stream connection exists, the client could disconnect if it doesn't see a keep-alive batch within an expected amount of time
Results in healthcheck calls throwing exceptions.
Using the Nakadi subscription based API to consume events has resulted in the following error where by for some reason the stream_id
is not found. Not sure if its to do with some state within the client being carried, or an issue on the Nakadi server itself. This issue causes the client to completely crash, thus stopping any new events from being consumed.
Opening an issue here to allow for further discussion.
Stack trace:
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: nakadi.InvalidException: Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwProblem(OkHttpResource.java:318)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.handleError(OkHttpResource.java:299)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwIfError(OkHttpResource.java:290)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:173)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:143)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10238)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10205)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:444)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.first(BlockingObservable.java:167)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:145)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:35)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:23)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:28)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:26)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Try$.apply(Try.scala:192)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success.map(Try.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: 2017-01-12 09:30:54,209 INFO NakadiClient - StreamBatchRecordSubscriber.onError Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: 2017-01-12 09:30:54,209 ERROR e.DelegatingStreamObserverProvider - Nakadi observer error
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: nakadi.InvalidException: Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwProblem(OkHttpResource.java:318) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.handleError(OkHttpResource.java:299) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwIfError(OkHttpResource.java:290) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:173) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:143) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10238) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10205) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:444) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.first(BlockingObservable.java:167) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:145) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:35) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:23) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:28) ~[de.zalando.droichead-1.2.2-22-gee4ee1f.jar:1.2.2-22-gee4ee1f]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:26) ~[de.zalando.droichead-1.2.2-22-gee4ee1f.jar:1.2.2-22-gee4ee1f]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Try$.apply(Try.scala:192) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success.map(Try.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Problem
Integer is sent as Double with BusinessEventMapped
Description
Hey guys,
I've recognized that the client seem to send Integer values as Float within the BusinessMappedEvent.
This is problematic because the generated JsonSchema from the ObjectMapper and the Entity Model declared the field as Integer and not as a Double. Therefore the schema validation inside Nakadi fails.
For sure I could just switch all Integer fields to Float but this is not the goal I guess.
I found the line which causes the problem
https://github.com/zalando-incubator/nakadi-java/blob/master/nakadi-java-client/src/main/java/nakadi/EventMappedSupport.java#L33
The serialization of the Integer isn't the problem but the deserialization is since there is just Object and no datatype specified. This seem to be a known problem of gson converter.
http://lmgtfy.com/?q=gson+integer+double
need a data change observer and the other two could do with a polish.
In JavaDoc it says:
/**
* Maximum number of Events in each batch of the stream. If 0 or unspecified
* will buffer Events indefinitely and flush on reaching of {@link #batchFlushTimeoutSeconds()}.
*/
My configuration:
val streamConfiguration = new StreamConfiguration()
.subscriptionId(subscriptionId)
.batchFlushTimeout(1, TimeUnit.SECONDS)
.batchLimit(batchLimit) // where batchLimit is 0
.maxUncommittedEvents(batchLimit) // this line doesn't matter. Tried without it.
The Problem
created for the response below doesn't contain enough information to understand
{"error":"unauthorized","error_description":"Full authentication is required to access this resource"}
Problem{type=about:blank, title='null', status=0, detail='null', instance=null, data=null}
Perhaps the line
if (problem.status() == 0 && response.statusCode() == 400)
should accommodate for 401 as well
It would be useful to have a basic circuit breaker option on event sending. Any caller can do this today via Hystrix, but ideally they should not be required to use a 3rd party lib to break a circuit and maintain their own SLAs. Also the client itself doesn't want to force a new dependency- Hystrix is awesome, but has a not-so-small set of transitive dependencies that make it unsuitable for the client to rely on, also the command pattern it uses has a heavyweight code footprint.
This doesn't need to be a full-fledged breaker. A simple timed/attempt break with fallback and onError support would do, along with some new metrics.
See also: #81
When fetching larger number of events, the stream processor fails with a IllegalStateException
2016-12-06 14:24:10,156 {WARN} [nakadi-java-io] [] [NakadiClient] StreamConnectionRetry: not retryable, propagating error IllegalStateException, more items arrived than were requested
java.lang.IllegalStateException: more items arrived than were requested
at nakadi.shadow.rx.internal.producers.ProducerArbiter.produced(ProducerArbiter.java:98)
at nakadi.shadow.rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:246)
at nakadi.shadow.rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:244)
This happens on the event stream when setting a cursor value like BEGIN
and being sent a few thousand messages immediately.
Make sure every public class has Javadoc, with no errors.
When a client handling a partition goes away, the other running clients will compete to grab it once they wake up and retry. They work by retrying every N seconds to see if a partition is available and then fall back to sleep if none are. The aim here is to avoid any need for special level handling to manage down the number of client connections by the caller. It should be ok to run the client across your very large microservices cluster such that even for just a few partitions, they won't hammer the broker.
You can control the retry delay via maxRetryDelay on the StreamConfiguration, but it currently doesn't have a minimum bound - you could set it to 100 millis, or 10 millis and potentially hurt the broker. Ideally the client won't accept a value here that's less than say, one second.
Filing as a bug rather than an enhancement as this has operational impact if not addressed.
The RetryPolicy
and ExponentialRetry
object, work based on a max attempts plus max delay before exiting with a finished status. For some situations it's useful to be able to bound the maximum overall amount of time spent on a request before bailing, regardless of the number of attempts. Event producers that want to bound the total amount of time sending an event before giving up, and the subscription stream checkpointer are the main examples.
SubscriptionOffsetObserver
doesn't handle retryable errors when posting a checkpoint. It could perform a small number of exponential retries before failing. What happens now is the stream will enter a retry, so it's not fatal, but it could be improved.
The stream processor executors can eat some exceptions, especially around bad stream serdes.
Filing as a bug, as this has operational impact.
val subscriptionDef = new Subscription()
.consumerGroup("")
.eventType("")
.owningApplication("")
val subscription = resource.tryFind(subscriptionDef.id()).fold(resource.create(subscriptionDef))(identity)
request()
calls don't honor retries, which will probably surprise someone.
Prior to 0.0.7 the token provider assumed the UID scope. Since 0.0.7, the API's been changed to allow using the defined scopes for event posting and streaming. The rest of the API needs to be covered to use scopes as well. Also, it's a bit implicit, eg you can't state which scopes to use where. Maybe this needs to be changed to support using custom scopes on the subscription API.
For example, allow Accept gzip encoding as an option.
NakadiClient client = NakadiClient.newBuilder()
.baseURI("http://localhost:9080")
.metricCollector(myMetricsCollector)
.resourceTokenProvider(myResourceTokenProvider)
.readTimeout(60, TimeUnit.SECONDS)
.connectTimeout(30, TimeUnit.SECONDS)
.build();
The client builder does not have a method called resourceTokenProvider
(should be just tokenProvider
).
It's been reported that StreamObserver.requestBackPressure may not be working correctly.
Nakadi returns an array instead of a single object when schema violations are found. Find example below.
[{"publishing_status":"failed","detail":"#: 2 schema violations found\n#: ... }]
However, OkHttpResource.handleError()
expects a single Problem
object:
Problem problem = (Problem)Optional.ofNullable(this.jsonSupport.fromJson(raw, Problem.class)) ...
Hence, the deserialization with gson fails with the following exception:
Caused by: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was BEGIN_ARRAY at line 1 column 2 path $
at nakadi.shadow.com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385)
at nakadi.shadow.com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:213)
... 49 more
Not sure if it's a bug of just expected behavior. If it is, IMO should be documented.
Maybe it would be useful to use Zip File System Provider in case if user wants to load certificates from JAR.
On creating a subscription, allow the Subscription object to be returned (makes it much easier to pluck out the subscription id).
Bug in 0.3.0 and below. JsonSupport's available as a getter on the client but the interface itself is package private. ๐ท
Provide a convenience StreamOffsetObserver that works with the named event stream and allows checkpoint storage to a database. It should handle cross-process distribution of partitions, as well as lease stealing in the event partitions are detected as not making progress. The default db could/should be DynamoDB.
This could act also as an fallback alternative to the subscription service but that doesn't allow resuming from a know offset, a limitation of the current API. In practice it can work, but it gets complicated. The client would need to process local checkpoints by submitting them to server before resuming the stream to avoid duplicates, or, drop messages that are know to have been seen by filtering them.
The observer should do the following
The collection classes don't handle retryable errors (eg 429).
This could be done by wrapping them up in an observable with a retry handler as per StreamConnectionRetry
. But based on a quick spike, Observable.using probably isn't the way to set things up, as we don't want to dispose of the Response via disposeAction
ahead of it being used by the caller.
The API parameter is defined here. This would be very useful for customers that want to programmatically control subscriptions.
A wrapper API that uses the client as an engine would be useful.
Putting scala around java tends to be tricky. It might be needed for example to remove wildcarded generic methods and any generic varargs from the client (this was already done for EventResource based on feedback ). As another example, some methods use optionals which would require using something like scala-java8-compat.
We would like to delete subscriptions given the consumer group. But the code below doesn't recognize consumer_group
parameter, but only the owning_application
. It also doesn't return all the subscriptions we have given owning_application
.
resource.list(new QueryParams()
.param("consumer_group", consumerGroup)
.param("owning_application", application))
.items()
Currently you have to do something like this:
TokenProviderZign zign = TokenProviderZign.newBuilder()
.refreshEvery(60, TimeUnit.SECONDS)
.waitFor(5, TimeUnit.SECONDS)
.scopes(
"gordian-blade-scope",
TokenProvider.NAKADI_CONFIG_WRITE,
TokenProvider.NAKADI_EVENT_STREAM_READ,
TokenProvider.NAKADI_EVENT_STREAM_WRITE,
TokenProvider.NAKADI_EVENT_TYPE_WRITE,
TokenProvider.UID
)
.build();
the inbuilt scopes should be loaded automatically.
It would be useful to have an extension jar that supported fetching Zign tokens
It would be lovely to have a basic send event and consume a stream in Scala for the readme, or failing that add some Scala code to the examples project. The client seems to work with 2.11 but any Scala user has to convert the readme samples first to get going.
related: #77
The current EventResource takes a list. It might be interesting to accept a stream or iterable of events to send. Internally this would honor per partition ordering and backpressure/rate limits in the event the Nakadi server starts rejecting requests.
After an error in commiting offset the client could not reconnect and felt in a loop trying to do this forever.
May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - disposing connection on thread nakadi-java-io-0 2104337645 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] DEBUG d.z.w.l.CustomerItemCancellationRequestedListener - Completed reading events of event type fulfillment.shipment-order-item-cancellation-requested
22:18:54.894 cancellation-service-live /var/log/application.log May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - stream repeater invoked 0 restarts=1
22:18:54.894 cancellation-service-live /var/log/application.log May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - stream repeater will delay before restarting, delay=3 seconds, restarts=1
22:18:57.691 cancellation-service-live /var/log/application.log May 10 20:18:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:57 [nakadi-java-io-0] INFO NakadiClient - stream_connection details mode=subscriptionStream resolved_event_name=fulfillment.shipment-order-item-cancellation-requested url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1 scope=nakadi.event_stream.read
22:18:57.691 cancellation-service-live /var/log/application.log May 10 20:18:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:57 [nakadi-java-io-0] INFO NakadiClient - stream_connection opening 562090725 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
22:19:57.892 cancellation-service-live /var/log/application.log May 10 20:19:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:19:57 [nakadi-java-io-0] INFO NakadiClient - disposing connection on thread nakadi-java-io-0 562090725 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
22:19:57.892 cancellation-service-live /var/log/application.log May 10 20:19:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:19:57 [nakadi-java-io-0] DEBUG d.z.w.l.CustomerItemCancellationRequestedListener - Completed reading events of event type fulfillment.shipment-order-item-cancellation-requested
Initial error:
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:15 [nakadi-java-compute-0] ERROR NakadiClient - stream_processor_err_compute Thread[nakadi-java-compute-0,5,main], timeout; (400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: nakadi.NetworkException: timeout; (400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.executeRequest(OkHttpResource.java:256)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestInner(OkHttpResource.java:193)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:188)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:147)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.Observable.subscribe(Observable.java:10842)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.Observable.blockingFirst(Observable.java:4727)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:149)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:94)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:47)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:32)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:28)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:21)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at de.zalando.whip.listener.CustomerItemCancellationRequestedListener.onNext(CustomerItemCancellationRequestedListener.java:83)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.StreamBatchRecordSubscriber.onNext(StreamBatchRecordSubscriber.java:41)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.StreamBatchRecordSubscriber.onNext(StreamBatchRecordSubscriber.java:7)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:225)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.lang.Thread.run(Thread.java:745)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: Caused by: java.net.SocketTimeoutException: timeout
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.Okio$4.newTimeoutException(Okio.java:227)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout.exit(AsyncTimeout.java:284)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout$2.read(AsyncTimeout.java:240)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.indexOf(RealBufferedSource.java:344)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:216)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:210)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.RealCall.execute(RealCall.java:69)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.okHttpCall(OkHttpResource.java:285)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.executeRequest(OkHttpResource.java:254)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #11... 24 common frames omitted
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: Caused by: java.net.SocketException: Socket closed
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.net.SocketInputStream.read(SocketInputStream.java:203)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.net.SocketInputStream.read(SocketInputStream.java:141)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.InputRecord.read(InputRecord.java:503)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.Okio$2.read(Okio.java:138)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #11... 45 common frames omitted
Calling StreamProcessor.stop()
can result in an exception (example below). This can break outside callers that are closing consumers in shutdown hooks (eg Play or Dropwizard).
Example:
NakadiClient - stream_processor_err_io Thread[nakadi-java-io-0,5,main], java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
nakadi.shadow.io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at nakadi.shadow.io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at nakadi.shadow.io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:136)
at nakadi.shadow.io.reactivex.internal.schedulers.ComputationScheduler$EventLoopWorker.schedule(ComputationScheduler.java:211)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutTimedSubscriber.scheduleTimeout(FlowableTimeoutTimed.java:233)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutTimedSubscriber.onNext(FlowableTimeoutTimed.java:223)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:91)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUnsubscribeOn$UnsubscribeSubscriber.onNext(FlowableUnsubscribeOn.java:61)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onNext(FlowableSubscribeOn.java:97)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUsing$UsingSubscriber.onNext(FlowableUsing.java:104)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.drain(FlowableOnBackpressureBuffer.java:187)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:112)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.fastPath(FlowableFromIterable.java:181)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onSubscribe(FlowableOnBackpressureBuffer.java:91)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:12960)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUsing.subscribeActual(FlowableUsing.java:73)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:12960)
at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260)
at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:225)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:549)
at nakadi.shadow.io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:129)
... 32 common frames omitted
Finder by id/by name calls throw not found exceptions. It might useful to have a softer "try" variant that returns an Optional.
It looks like recent Nakadi servers are refusing to accept application/x-json-stream
for consumer streaming despite documenting the media type in the API definition.
This was fine previously; will file a bug upstream, patching in the client for now
Currently hosted under dl.bintray.com.
The library comes with, among others, shadowed GSON library for JSON processing.
If domain objects require additional configuration for proper serialisation one needs to use shadowed version from within the library.
For example, if a field requires @SerializedName annotation to work correctly, we need to import this annotation with nakadi.shadow
prefix. This means that we introduce a dependency on the shadowed code that comes with the library.
One potential solution would be to push two version of the jar: with and without shadowed dependencies, similarly to how kryo manages shadowed dependencies.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.