Giter Club home page Giter Club logo

alibaba-rsocket-broker's Issues

有个报错的信息, RST-600500: Failed to parse composite metadata

2020-04-28 00:02:11.427 INFO 1 --- [tor-tcp-epoll-4] b.r.b.r.RSocketBrokerHandlerRegistryImpl : RST-500200: Succeed to accept connection from rsocket-user-service
2020-04-28 00:02:11.460 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.CompositeMetadataRSocket : RST-600500: Failed to parse composite metadata

java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203) ~[na:1.8.0_242]
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tag.of(Tag.java:29) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tags.of(Tags.java:254) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.MeterRegistry.counter(MeterRegistry.java:363) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Metrics.counter(Metrics.java:76) ~[micrometer-core-1.3.2.jar:1.3.2]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.metrics(CompositeMetadataRSocket.java:155) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:70) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

2020-04-28 00:02:11.463 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.impl.RSocketListenerImpl : RST-200501: Exception during rsocket call

io.rsocket.exceptions.InvalidException: RST-600500: Failed to parse composite metadata
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:76) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

应用端到Broker集群的Load Balance架构和稳定性

目前RSocket主要涉及到的Load Balance到Broker 集群的多连接管理。 Alibaba RSocket Broker采用share nothing架构,也就是集群中broker相互之间不通讯,不承担消息转发。 目前客户端SDK的设计思路是通过集群推过来的拓扑结构变更,SDK端完成连接的重连和路由更新等。 目前代码只有基础功能,需要再细致完善,提升稳定性。

当Responder端的service未指定version时,Broker的控制台报错

1. Stack

	at com.alibaba.rsocket.broker.web.ui.ServicesView.lambda$services$0(ServicesView.java:50) ~[classes/:na]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_231]
	at java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3566) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[na:1.8.0_231]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_231]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_231]
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_231]

Alibaba RSocket Broker的Docker镜像

提供RSocket Broker 开发环境Docker镜像,方便使用Docker或者Docker Compose的开发者快速启动RSocket Broker进行相关的测试。 目前已经RSocket Broker已经使用Jib进行镜像制作,但是还没有提供到docker hub上。

Scalecube-cluster支持broker到broker的rpc调用

在某些情况下,需要支持broker到broker直接的RPC调用,例如访问broker的一些配置等,目前一个场景就是broker要支持外部应用应用接入,应用无法直接访问broker的内部IP,如office的机器访问云vpc内部的rsocker broker集群,这个时候broker要有两个ip,所以对外部应用推送就需要走外部IP或者域名推送,内部应用则走内部IP推送。

解决思路:

  • 添加json rpc支持,这个比较简单一些,scalecube-cluster已经支持request/response,添加一个规范就可以。
  • 添加自定义的scalecube-jackson-codec支持,支持broker直接的对象序列化,目前主要要支持CloudEvents的JSON序列化

Broker RPC特性可以方便后续对broker的功能的扩展,broker之间通讯也比较简洁。

使用state machine来管理应用的状态

目前的应用状态主要包括 connected, serving, paused, stopped,同时触发不同的逻辑。 使用状态机管理逻辑可能更清晰,代码也容易管理。 目前spring state machine的flux支持还没有发布,不过目前可以做一些测试,性能不用担心,主要是StateMachine对象的大小。

spring-projects/spring-statemachine#397

Kotlin Coroutines & Flow支持

目前RSocket Service接口支持Reactor,RxJava2, RxJava 3,还没有对Kolin Coroutines和 Flow的支持。 Kotlin Coroutines和Reactive都可以相互转换的,这里调研一下大家是否有在Kotlin? 是否有意愿使用Kotlin的原生接口?

interface UserService {
    suspend fun getAdmin(): String
    suspend fun getNickById(id: Int): String
    fun getAllNames(): Flow<String>
}

RSocket Payload自定数据类型支持

在RSocket Broker的场景中,一个RSocket长连接会传输各种类型的Payload,虽然RSocket长连接支持全局的Payload的data mime type,但是实际中会存在不用的业务场景使用不同的数据编码格式,如RPC调用和Kafka消息可能就是不同的数据类型,这个时候就需要给每一个Payload设置独立的数据编码格式,对于RPC场景,可能还存在对返回数据的数据编码需求,如以下签名:

User findUserByNick(String nick);

有可能是请求的时候是Text/Plain编码,而返回的对象结果要求是JSON编码,这种不同的编码需求主要是出于个性化和性能要求。 在RSocket 281的Metadata规范中,增加了data encoding和accept data encoding两者元数据类型,这样可以保证满足该场景的要求。

在RSocket Broker中,建议服务提供方支持多种数据类型,目前主要包括:

  • 数字类型编码支持: 如果Integer, long, double等
  • 字符串支持: String
  • byte数组: 如在文件上传的场景
  • 自定义序列化方式: JSON, Hessian, Protobuf, Avor这四者

Composite Metadata Extension: data Mime/type definition per-stream: rsocket/rsocket#281

编译错误:npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

[INFO] Added 40 dependencies to 'D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json'
[INFO] Updated npm D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json.
[INFO] Running npm install ...
npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'

npm ERR! A complete log of this run can be found in:
npm ERR! E:\Program Files\nodejs\node_cache_logs\2019-12-16T05_49_17_435Z-debug.log
[ERROR] >>> Dependency ERROR. Check that all required dependencies are deployed in npm repositories.

maven 版本 :3.63
nodejs 版本:v12.13.1

是不是我本地网络比较慢,包下载的不完整?

Alibaba RSocket Broker 1.0.0.M2

准备1.0.0.M2发布:

  • RSocket
    • RSocket Java SDK 1.0.0
    • Spring Boot 2.3.0
  • Development
    • Performance testing for memory leak
  • Deployment
    • Maven Central Deployment
  • Documents
    • M2 Release Note

Alpha Release Roadmap

准备Alpha1的版本发布,一些要解决的核心问题:

  • RSocket
    • RSocket Java SDK 1.0.0
  • Development
    • Deploy artifacts to Maven central repository(进行中)
    • External apps integration from outside of the cluster network
    • Integration testing for all features
    • JDK 8, 11 & 14 compatible testing
  • Deployment
    • Dev deployment
    • Gossip cluster deployment
  • Documents
    • Alibaba RSocket Broker Website
    • Alibaba RSocket Broker Wiki

如果大家还有一些比较关注的问题,这里可以留言一下。

Hessian序列化增加Java 8数据类型支持,同时兼容Dubbo Hessian Lite

Hessian序列化增加Java 8数据类型支持,如Java time类型,Optional支持。

兼容Dubbo Hessian Lite,相关的代码来自Dubbo Hessian Lite,去除Java反射逻辑,RSocket支持Java 8+版本,不需要使用反射来支持Java 8数据类型。

实现逻辑: META-INF/hessian/serializers添加对应的Serializer类。

其他类型支持,如Joda time, 需要自己添加对应的Serializer类。

手动执行 Example 代码长时间未操作后再次调用服务出现 500 错误

代码为 Example 中的代码,按照文档步骤操作后访问正常,
过了六小时未操作后再次调用接口服务器会报 500 异常,手动将 Broker、Respones、Request 重启后正常访问

2019-12-11 10:27:20.625  INFO 97481 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8181
2019-12-11 10:27:20.628  INFO 97481 --- [           main] c.a.s.b.r.demo.RSocketRequesterApp       : Started RSocketRequesterApp in 1.702 seconds (JVM running for 2.302)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115)
	at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110)
	at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146)
	at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-12-11 16:14:48.833 ERROR 97481 --- [ctor-http-nio-4] a.w.r.e.AbstractErrorWebExceptionHandler : [fc5bd7d9]  500 Server Error for HTTP GET "/user/1"

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler com.alibaba.spring.boot.rsocket.demo.PortalController#user(Integer) [DispatcherHandler]
	|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP GET "/user/1" [ExceptionHandlingWebHandler]
Stack trace:
		at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146) ~[rsocket-core-1.0.0-RC5.jar:na]
		at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54) ~[rsocket-core-1.0.0-RC5.jar:na]
		at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
		at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]

服务路由性能提升

目前路由的算法是 group + service name + version,然后对其进行hashcode,然后在进行bitmap匹配查找。 可以考虑在调用方提前将路由信息进行HashCode化,然后基于Integer Hashcode进行bitmap匹配,这样只需要读取composite metadata的前8个字节(1 + 3 + 4)就可以完成路由匹配。 考虑到Hashcode各个语言的一致性性,还是采用 MurmurHash3 算法。

支持Mon<ByteBuffer>避免反复序列化的问题

在不少场景中,我们调用远程服务, 获取资源对象,然后再对象输出。 如下述代码中,我们调用远程服务,获取用户,然后再将用户信息以REST API方式输出。 但是这里有序列化的问题: RPC网络调用有两次序列化和反序列化,然后REST API输出又再进行JSON序列化,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<User> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

能否有一个机制,在服务端就输出对应的数据格式,如JSON,而通讯的过程中都不要涉及多次序列化的问题,而是将服务端输出的bytes直接输出给最终的调用者,代码如下:

    @GetMapping(value = "/user/{id}", produces = "application/json")
    public Mono<ByteBuffer> jsonBytes(@PathVariable Integer id) {
        return userService.findUserById(id);
    }

为RSocketRemoteServiceBuilder添加Interceptor支持

RSocket默认支持Interceptor机制,你可以通过ClientRSocketFactory.addRequesterPlugin() 添加你自定义的RSocketInterceptor,但是这些interceptor相对比较低级一些,如果你要获取具体的信息还需要进行ByteBuf解析,尤其是metadata。 如目前的Zipkin Trace实现,实现相对麻烦一些,如果以Interceptor实现,就会简单很多。

Vaadin to Vue or React?

目前RSocket Broker的控制台使用Vaadin开发,主要是方便Java程序员,同时减少各种REST API调用的问题,不知道多少同学对其他JS框架的了解程度。 这里调查一下,是否需要将控制台UI调整到Vue、React等框架? 但是Node环境和基本开发这个都是需要的。

RSocket Broker Banner

目前使用Spring Boot默认的banner,调整为粉色的 Alibaba RSocket Broker

gRPC到RSocket协议转换gateway

和HTTP REST API到RSocket转换一样,是否要增加gRPC到RSocket协议转换的gateway? 有需求的留言一下,目前内部已经有一个原型版本。

Reactive兼容RxJava 3

目前RSocket Broker支持RxJava 2, RxJava 3 估计在在1月底发布,要兼容 RxJava 3。 主要在RSocketRequestRpcProxy 和 RSocketResponderSupport 进行调整。

Upgrade to RSocket 1.0.0-RC7

还是有一些变化,主要是UriHandler.java 和 UriTransportRegistry.java 被删除啦,目前RSocket Broker要根据schema做transport层识别,可能需要将这些代码转换到broker内部,RSocket-CLI是添加了这些代码。

如下改变:

  • 保留 UriHandler和UriTransportRegistry
  • RSocketFactory调整到 RSocketServer 和 RSocketConnector

Java Proxy机制调整

目前RSocket Service是基于Java Interface的,所以需要通过Proxy机制代理进行RSocket接口调用。
对比JDK Proxy, Javassist 和 Byte Buddy 发现,ByteBuddy的性能最高。 一个简单的性能测试数据如下,而且ByteBuddy对Java Interface的default method处理也比较友好。

ByteBuddyProxyTest.testOperation   thrpt    2  1391028488.716         ops/s
JavassistProxyTest.testOperation   thrpt    2  582515791.244          ops/s
JdkProxyTest.testOperation         thrpt    2  233792969.032          ops/s

所以Java Proxy代理机制调整到ByteBuddy上。

alibaba-rsocket-spring-boot-starter-1.0.0.M1问题反馈

  1. 自动化配置RSocketListenerAutoConfiguration中,当properties中没有配置rsocket.port时,以下表达式会返回true,导致应用启动了监听处理逻辑。建议改为 ${rsocket.port:0}!=0
@Configuration
@ConditionalOnExpression("'${rsocket.port}'!='0'")
public class RSocketListenerAutoConfiguration {
  1. 没有生成spring-autoconfigure-metadata.properties文件导致RSocketAutoConfiguration中@ConditionalOnClass加载了类而报错
    @Bean
    @ConditionalOnClass(PrometheusMeterRegistry.class)
    public MetricsService metricsService(PrometheusMeterRegistry meterRegistry) {
        return new MetricsServicePrometheusImpl(meterRegistry);
    }

文档建议添加如下依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure-processor</artifactId>
            <optional>true</optional>
        </dependency>

参考文档
报错信息如下

java.lang.IllegalStateException: Failed to introspect Class [com.alibaba.spring.boot.rsocket.RSocketAutoConfiguration] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:743) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[na:1.8.0_241]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:681) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:649) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1605) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:494) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:616) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:608) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1242) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
	at com.example.spring.webflux.SpringWebfluxDemoApplication.main(SpringWebfluxDemoApplication.java:29) [classes/:na]
Caused by: java.lang.NoClassDefFoundError: io/micrometer/prometheus/PrometheusMeterRegistry
	at java.lang.Class.getDeclaredMethods0(Native Method) ~[na:1.8.0_241]
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[na:1.8.0_241]
	at java.lang.Class.getDeclaredMethods(Class.java:1975) ~[na:1.8.0_241]
	at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
	... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.micrometer.prometheus.PrometheusMeterRegistry
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_241]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_241]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_241]
	... 25 common frames omitted

Disconnected from the target VM, address: '127.0.0.1:1064', transport: 'socket'

Process finished with exit code 1

Prometheus Service Discovery支持

目前RSocket Broker已经提供对各个应用的Prometheus的采集支持,主要是通过MetricsService这个RSocket服务完成的。 而且提供了MetricsScrapeController可以输出Prometheus的格式。

实现一个基于RSocket Broker的Prometheus Service Discovery,完成对接入到RSocket Broker的所有应用进行Prometheus的metrics抓取。

https://prometheus.io/blog/2018/07/05/implementing-custom-sd/

https://github.com/prometheus/prometheus/tree/master/discovery

添加Travis CI支持

使用Travis CI进行持续集成,同时在README.md添加build status图标。

RSocket HTTP Gateway的调整

考虑到开发和部署的便捷性,RSocket Broker Server内置支持RSocket Service的HTTP访问,格式如下,请注意 "/api" 前缀。

POST http://127.0.0.1:9998/api/com.alibaba.user.UserService/findById
Authorization: Bearer jwt_token
Content-Type: application/json

[
  1
]

这里有一个小问题: Broker Console目前是用Vaadin开发,而Vaadin现在还不能支持WebFlux,所以RSocket REST API并不能很好地利用WebFlux的Reactive特性,性能和thread模型还是Spring MVC的。
Vaadin的reactive支持要等待这里: vaadin/spring#565

而alibaba-broker-http-gateway还继续保留,这个是完全Reactive的,同时方便开发者对其他协议进行集成,如gRPC, Dubbo等。 如果大家有gRPC -> RSocket的需求,我们会考虑在http gateway中添加gRPC的集成。

如果大家有更好的建议,欢迎留言。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.