Giter Club home page Giter Club logo

nakadi-java's People

Contributors

adyach avatar dehora avatar hgiddens avatar jjst avatar sauliusm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

nakadi-java's Issues

StreamProcessor.startBlocking() is misleading

It is not clear to me when startBlocking() should be preferred over start().

startBlocking() immediately returns although the library is consuming events. This is because event emissions and subscriptions are performed on the monoComputeScheduler defined in the nakadi.StreamProcessor class. From the name I expected the calling thread to block.

Support undefined and business categories in the API

The API should have a way to express undefined and business categories. These are problematic because they leave no hook to hang custom data off. In the business case, there's also a magic field called metadata that's mixed in with the custom fields. The data change category doesn't have this problem as it defines a data field to hold custom information.

Don't default to LoggingStreamOffsetObserver for the Subscription API

The readme says

If no offset observer is given, the default observer used is LoggingStreamOffsetObserver which simply logs when it is invoked.

This is true, but unwise. If a client connects to a subscription stream and does not set the observer, it will be configured with the logging observer and the subscription API checkpointer, which it then calls ( LoggingStreamObserver.java#L54). This is a problem if the client accidentally forgets to attach an intended observer, as the stream will be logged and checkpointed moving the position forward in the stream. Since the subscription API doesn't allow rewind, this effectively has the potential for silent data loss.

Filing as a bug, as this has operational impact.

Publish under org.zalando

Currently the client's being published under net.dehora.nakadi into bintray/jcenter. This can be switched to org.zalando at some point.

Stream ID Not Found

Using the Nakadi subscription based API to consume events has resulted in the following error where by for some reason the stream_id is not found. Not sure if its to do with some state within the client being carried, or an issue on the Nakadi server itself. This issue causes the client to completely crash, thus stopping any new events from being consumed.

Opening an issue here to allow for further discussion.

Stack trace:

Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: nakadi.InvalidException: Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwProblem(OkHttpResource.java:318)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.handleError(OkHttpResource.java:299)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwIfError(OkHttpResource.java:290)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:173)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:143)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10238)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10205)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:444)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.first(BlockingObservable.java:167)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:145)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:35)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:23)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:28)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:26)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Try$.apply(Try.scala:192)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success.map(Try.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: 2017-01-12 09:30:54,209 INFO  NakadiClient  - StreamBatchRecordSubscriber.onError Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: 2017-01-12 09:30:54,209 ERROR e.DelegatingStreamObserverProvider  - Nakadi observer error
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: nakadi.InvalidException: Unprocessable Entity; Session with stream id b975be3e-ba1b-4279-adc1-14a9e6358514 not found (422)
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwProblem(OkHttpResource.java:318) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.handleError(OkHttpResource.java:299) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.throwIfError(OkHttpResource.java:290) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:173) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:143) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10238) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.Observable.subscribe(Observable.java:10205) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:444) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.shadow.rx.observables.BlockingObservable.first(BlockingObservable.java:167) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:145) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:35) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:23) ~[net.dehora.nakadi.nakadi-java-client-0.7.2.jar:0.7.2]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:28) ~[de.zalando.droichead-1.2.2-22-gee4ee1f.jar:1.2.2-22-gee4ee1f]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at events.DelegatingStreamObserverProvider$$anon$1$$anonfun$1.apply(DelegatingStreamObserverProvider.scala:26) ~[de.zalando.droichead-1.2.2-22-gee4ee1f.jar:1.2.2-22-gee4ee1f]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Try$.apply(Try.scala:192) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.util.Success.map(Try.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:405) ~[com.typesafe.akka.akka-actor_2.11-2.4.2.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[org.scala-lang.scala-library-2.11.8.jar:na]
Jan 12 09:30:54 ip-172-31-146-28 docker/e1317d56b85c[813]: #011at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[org.scala-lang.scala-library-2.11.8.jar:na]

Client sends Integer as Float

Problem
Integer is sent as Double with BusinessEventMapped

Description
Hey guys,

I've recognized that the client seem to send Integer values as Float within the BusinessMappedEvent.
This is problematic because the generated JsonSchema from the ObjectMapper and the Entity Model declared the field as Integer and not as a Double. Therefore the schema validation inside Nakadi fails.
For sure I could just switch all Integer fields to Float but this is not the goal I guess.

I found the line which causes the problem
https://github.com/zalando-incubator/nakadi-java/blob/master/nakadi-java-client/src/main/java/nakadi/EventMappedSupport.java#L33

The serialization of the Integer isn't the problem but the deserialization is since there is just Object and no datatype specified. This seem to be a known problem of gson converter.
http://lmgtfy.com/?q=gson+integer+double

Using `.batchLimit(0)` stops the stream observer from getting any events from Nakadi

In JavaDoc it says:

/**
 * Maximum number of Events in each batch of the stream. If 0 or unspecified
 * will buffer Events indefinitely and flush on reaching of {@link #batchFlushTimeoutSeconds()}.
 */

My configuration:

val streamConfiguration = new StreamConfiguration()
  .subscriptionId(subscriptionId)
  .batchFlushTimeout(1, TimeUnit.SECONDS)
  .batchLimit(batchLimit) // where batchLimit is 0
  .maxUncommittedEvents(batchLimit) // this line doesn't matter. Tried without it.

Unauthorized (401) response not processed correctly

The Problem created for the response below doesn't contain enough information to understand

{"error":"unauthorized","error_description":"Full authentication is required to access this resource"}

Problem{type=about:blank, title='null', status=0, detail='null', instance=null, data=null}

Perhaps the line

    if (problem.status() == 0 && response.statusCode() == 400) 

should accommodate for 401 as well

Support circuit breaking on event sending

It would be useful to have a basic circuit breaker option on event sending. Any caller can do this today via Hystrix, but ideally they should not be required to use a 3rd party lib to break a circuit and maintain their own SLAs. Also the client itself doesn't want to force a new dependency- Hystrix is awesome, but has a not-so-small set of transitive dependencies that make it unsuitable for the client to rely on, also the command pattern it uses has a heavyweight code footprint.

This doesn't need to be a full-fledged breaker. A simple timed/attempt break with fallback and onError support would do, along with some new metrics.

See also: #81

StreamProcessor rejects large initial volumes of events

When fetching larger number of events, the stream processor fails with a IllegalStateException

2016-12-06 14:24:10,156 {WARN} [nakadi-java-io] [] [NakadiClient] StreamConnectionRetry: not retryable, propagating error IllegalStateException, more items arrived than were requested
java.lang.IllegalStateException: more items arrived than were requested
	at nakadi.shadow.rx.internal.producers.ProducerArbiter.produced(ProducerArbiter.java:98)
	at nakadi.shadow.rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:246)
	at nakadi.shadow.rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:244)

This happens on the event stream when setting a cursor value like BEGIN and being sent a few thousand messages immediately.

Set a min time on stream subscription retries

When a client handling a partition goes away, the other running clients will compete to grab it once they wake up and retry. They work by retrying every N seconds to see if a partition is available and then fall back to sleep if none are. The aim here is to avoid any need for special level handling to manage down the number of client connections by the caller. It should be ok to run the client across your very large microservices cluster such that even for just a few partitions, they won't hammer the broker.

You can control the retry delay via maxRetryDelay on the StreamConfiguration, but it currently doesn't have a minimum bound - you could set it to 100 millis, or 10 millis and potentially hurt the broker. Ideally the client won't accept a value here that's less than say, one second.

Filing as a bug rather than an enhancement as this has operational impact if not addressed.

Extend RetryPolicy to support max time backoff

The RetryPolicy and ExponentialRetry object, work based on a max attempts plus max delay before exiting with a finished status. For some situations it's useful to be able to bound the maximum overall amount of time spent on a request before bailing, regardless of the number of attempts. Event producers that want to bound the total amount of time sending an event before giving up, and the subscription stream checkpointer are the main examples.

Handle retryable errors on subscription consumer checkpointer

SubscriptionOffsetObserver doesn't handle retryable errors when posting a checkpoint. It could perform a small number of exponential retries before failing. What happens now is the stream will enter a retry, so it's not fatal, but it could be improved.

Is it possible to find or create subscription?

val subscriptionDef = new Subscription()
      .consumerGroup("")
      .eventType("")
      .owningApplication("")

val subscription = resource.tryFind(subscriptionDef.id()).fold(resource.create(subscriptionDef))(identity)

Improve use of Oauth scopes

Prior to 0.0.7 the token provider assumed the UID scope. Since 0.0.7, the API's been changed to allow using the defined scopes for event posting and streaming. The rest of the API needs to be covered to use scopes as well. Also, it's a bit implicit, eg you can't state which scopes to use where. Maybe this needs to be changed to support using custom scopes on the subscription API.

Mistake in example in README.md

NakadiClient client = NakadiClient.newBuilder()
  .baseURI("http://localhost:9080")
  .metricCollector(myMetricsCollector)
  .resourceTokenProvider(myResourceTokenProvider)
  .readTimeout(60, TimeUnit.SECONDS)
  .connectTimeout(30, TimeUnit.SECONDS)
  .build();

The client builder does not have a method called resourceTokenProvider (should be just tokenProvider).

Error message is lost when publishing invalid events

Nakadi returns an array instead of a single object when schema violations are found. Find example below.

[{"publishing_status":"failed","detail":"#: 2 schema violations found\n#: ... }]

However, OkHttpResource.handleError() expects a single Problem object:

Problem problem = (Problem)Optional.ofNullable(this.jsonSupport.fromJson(raw, Problem.class)) ...

Hence, the deserialization with gson fails with the following exception:

Caused by: java.lang.IllegalStateException: Expected BEGIN_OBJECT but was BEGIN_ARRAY at line 1 column 2 path $
	at nakadi.shadow.com.google.gson.stream.JsonReader.beginObject(JsonReader.java:385)
	at nakadi.shadow.com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:213)
	... 49 more

Make JsonSupport visible

Bug in 0.3.0 and below. JsonSupport's available as a getter on the client but the interface itself is package private. ๐Ÿ˜ท

Provide an extension StreamOffsetObserver for named event streams

Provide a convenience StreamOffsetObserver that works with the named event stream and allows checkpoint storage to a database. It should handle cross-process distribution of partitions, as well as lease stealing in the event partitions are detected as not making progress. The default db could/should be DynamoDB.

This could act also as an fallback alternative to the subscription service but that doesn't allow resuming from a know offset, a limitation of the current API. In practice it can work, but it gets complicated. The client would need to process local checkpoints by submitting them to server before resuming the stream to avoid duplicates, or, drop messages that are know to have been seen by filtering them.

The observer should do the following

  • track progress keyed by partition with checkpoint and lease data
  • automatically create the backing tablestore
  • require to name a consumer group as per the sub api
  • bound the number of held connections to the number of partitions for a given consumer group
  • have other connections running a background check the way the current sub stream does with 409
  • be friendly to the underlying data store (amortized updates)
  • allow background checking clients to "steal" a partition if it's deemed to not make progress
  • have a policy to not auto-checkpoint nakadi if upstream from the observer is faiing
  • export some alarmable metrics

Handle retryable errors on auto-paginators

The collection classes don't handle retryable errors (eg 429).

This could be done by wrapping them up in an observable with a retry handler as per StreamConnectionRetry. But based on a quick spike, Observable.using probably isn't the way to set things up, as we don't want to dispose of the Response via disposeAction ahead of it being used by the caller.

Add a Scala API

A wrapper API that uses the client as an engine would be useful.

Putting scala around java tends to be tricky. It might be needed for example to remove wildcarded generic methods and any generic varargs from the client (this was already done for EventResource based on feedback ). As another example, some methods use optionals which would require using something like scala-java8-compat.

consumer_group param doesn't restrict the subscriptions returned

We would like to delete subscriptions given the consumer group. But the code below doesn't recognize consumer_group parameter, but only the owning_application. It also doesn't return all the subscriptions we have given owning_application.

 resource.list(new QueryParams()
                .param("consumer_group", consumerGroup)
                .param("owning_application", application))
                .items()

TokenProviderZign should add known scopes by default

Currently you have to do something like this:

    TokenProviderZign zign = TokenProviderZign.newBuilder()
        .refreshEvery(60, TimeUnit.SECONDS)
        .waitFor(5, TimeUnit.SECONDS)
        .scopes(
            "gordian-blade-scope",
            TokenProvider.NAKADI_CONFIG_WRITE,
            TokenProvider.NAKADI_EVENT_STREAM_READ,
            TokenProvider.NAKADI_EVENT_STREAM_WRITE,
            TokenProvider.NAKADI_EVENT_TYPE_WRITE,
            TokenProvider.UID
            )
        .build();

the inbuilt scopes should be loaded automatically.

Document Scala use

It would be lovely to have a basic send event and consume a stream in Scala for the readme, or failing that add some Scala code to the examples project. The client seems to work with 2.11 but any Scala user has to convert the readme samples first to get going.

related: #77

Add streaming producer

The current EventResource takes a list. It might be interesting to accept a stream or iterable of events to send. Internally this would honor per partition ordering and backpressure/rate limits in the event the Nakadi server starts rejecting requests.

Failing to reconnect to subscription after disconnect

After an error in commiting offset the client could not reconnect and felt in a loop trying to do this forever.

May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - disposing connection on thread nakadi-java-io-0 2104337645 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] DEBUG d.z.w.l.CustomerItemCancellationRequestedListener - Completed reading events of event type fulfillment.shipment-order-item-cancellation-requested
22:18:54.894 cancellation-service-live /var/log/application.log May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - stream repeater invoked 0 restarts=1
22:18:54.894 cancellation-service-live /var/log/application.log May 10 20:18:54 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:54 [nakadi-java-io-0] INFO NakadiClient - stream repeater will delay before restarting, delay=3 seconds, restarts=1
22:18:57.691 cancellation-service-live /var/log/application.log May 10 20:18:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:57 [nakadi-java-io-0] INFO NakadiClient - stream_connection details mode=subscriptionStream resolved_event_name=fulfillment.shipment-order-item-cancellation-requested url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1 scope=nakadi.event_stream.read
22:18:57.691 cancellation-service-live /var/log/application.log May 10 20:18:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:57 [nakadi-java-io-0] INFO NakadiClient - stream_connection opening 562090725 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
22:19:57.892 cancellation-service-live /var/log/application.log May 10 20:19:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:19:57 [nakadi-java-io-0] INFO NakadiClient - disposing connection on thread nakadi-java-io-0 562090725 HttpResponse{response=Response{protocol=http/1.1, code=200, message=OK, url=/subscriptions/2309d9c0-261b-43d6-9a40-b7d391bf1575/events?batch_limit=5&batch_flush_timeout=3&max_uncommitted_events=1}}
22:19:57.892 cancellation-service-live /var/log/application.log May 10 20:19:57 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:19:57 [nakadi-java-io-0] DEBUG d.z.w.l.CustomerItemCancellationRequestedListener - Completed reading events of event type fulfillment.shipment-order-item-cancellation-requested

Initial error:

22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: 2017-05-10 20:18:15 [nakadi-java-compute-0] ERROR NakadiClient - stream_processor_err_compute Thread[nakadi-java-compute-0,5,main], timeout; (400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: nakadi.NetworkException: timeout; (400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.executeRequest(OkHttpResource.java:256)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestInner(OkHttpResource.java:193)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestThrowingInner(OkHttpResource.java:188)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.lambda$requestThrowing$5(OkHttpResource.java:147)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.observable.ObservableDefer.subscribeActual(ObservableDefer.java:32)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.Observable.subscribe(Observable.java:10842)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.Observable.blockingFirst(Observable.java:4727)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.requestThrowing(OkHttpResource.java:149)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:165)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionResourceReal.checkpoint(SubscriptionResourceReal.java:127)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:94)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:47)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetCheckpointer.checkpoint(SubscriptionOffsetCheckpointer.java:32)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetObserver.checkpoint(SubscriptionOffsetObserver.java:28)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.SubscriptionOffsetObserver.onNext(SubscriptionOffsetObserver.java:21)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at de.zalando.whip.listener.CustomerItemCancellationRequestedListener.onNext(CustomerItemCancellationRequestedListener.java:83)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.StreamBatchRecordSubscriber.onNext(StreamBatchRecordSubscriber.java:41)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.StreamBatchRecordSubscriber.onNext(StreamBatchRecordSubscriber.java:7)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(FlowableObserveOn.java:400)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:225)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.lang.Thread.run(Thread.java:745)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: Caused by: java.net.SocketTimeoutException: timeout
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.Okio$4.newTimeoutException(Okio.java:227)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout.exit(AsyncTimeout.java:284)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout$2.read(AsyncTimeout.java:240)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.indexOf(RealBufferedSource.java:344)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:216)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:210)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.674 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okhttp3.RealCall.execute(RealCall.java:69)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.okHttpCall(OkHttpResource.java:285)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.OkHttpResource.executeRequest(OkHttpResource.java:254)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #11... 24 common frames omitted
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: Caused by: java.net.SocketException: Socket closed
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.net.SocketInputStream.read(SocketInputStream.java:203)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at java.net.SocketInputStream.read(SocketInputStream.java:141)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.InputRecord.read(InputRecord.java:503)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.Okio$2.read(Okio.java:138)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #011at nakadi.shadow.okio.AsyncTimeout$2.read(AsyncTimeout.java:236)
22:18:18.675 cancellation-service-live /var/log/application.log May 10 20:18:15 ip-172-31-159-69 docker/4f1edd2aed0e[812]: #11... 45 common frames omitted

StreamProcessor.stop() not shutting down cleanly

Calling StreamProcessor.stop() can result in an exception (example below). This can break outside callers that are closing consumers in shutdown hooks (eg Play or Dropwizard).

Example:

NakadiClient - stream_processor_err_io Thread[nakadi-java-io-0,5,main], java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
nakadi.shadow.io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
        at nakadi.shadow.io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
        at nakadi.shadow.io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:136)
        at nakadi.shadow.io.reactivex.internal.schedulers.ComputationScheduler$EventLoopWorker.schedule(ComputationScheduler.java:211)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutTimedSubscriber.scheduleTimeout(FlowableTimeoutTimed.java:233)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableTimeoutTimed$TimeoutTimedSubscriber.onNext(FlowableTimeoutTimed.java:223)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:91)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUnsubscribeOn$UnsubscribeSubscriber.onNext(FlowableUnsubscribeOn.java:61)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.onNext(FlowableSubscribeOn.java:97)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUsing$UsingSubscriber.onNext(FlowableUsing.java:104)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:69)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.drain(FlowableOnBackpressureBuffer.java:187)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:112)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.fastPath(FlowableFromIterable.java:181)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onSubscribe(FlowableOnBackpressureBuffer.java:91)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer.subscribeActual(FlowableOnBackpressureBuffer.java:46)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:38)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:12960)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableUsing.subscribeActual(FlowableUsing.java:73)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:13013)
        at nakadi.shadow.io.reactivex.Flowable.subscribe(Flowable.java:12960)
        at nakadi.shadow.io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
        at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:260)
        at nakadi.shadow.io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:225)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@25a7229c rejected from java.util.concurrent.ScheduledThreadPoolExecutor@620b71d3[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
        at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:549)
        at nakadi.shadow.io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:129)
        ... 32 common frames omitted

Add non-throwing finders

Finder by id/by name calls throw not found exceptions. It might useful to have a softer "try" variant that returns an Optional.

Version of the library without shadowed dependencies

The library comes with, among others, shadowed GSON library for JSON processing.
If domain objects require additional configuration for proper serialisation one needs to use shadowed version from within the library.

For example, if a field requires @SerializedName annotation to work correctly, we need to import this annotation with nakadi.shadow prefix. This means that we introduce a dependency on the shadowed code that comes with the library.

One potential solution would be to push two version of the jar: with and without shadowed dependencies, similarly to how kryo manages shadowed dependencies.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.