Giter Club home page Giter Club logo

newlife.rocketmq's Introduction

NewLife.RocketMQ - RocketMQ客户端组件

GitHub top language GitHub License Nuget Downloads Nuget Nuget (with prereleases)

RocketMQ,支持.Net Framework/.netstandard/Mono

教程:https://newlifex.com/core/rocketmq

新生命项目矩阵

各项目默认支持net8.0/net7.0/netstandard2.1/netstandard2.0/net4.61/net4.5,旧版(2023.0401)支持net4.0/net2.0

项目 年份 说明
基础组件 支撑其它中间件以及产品项目
NewLife.Core 2002 核心库,日志、配置、缓存、网络、序列化、APM性能追踪
NewLife.XCode 2005 大数据中间件,单表百亿级,MySql/SQLite/SqlServer/Oracle/TDengine/达梦,自动分表
NewLife.Net 2005 网络库,单机千万级吞吐率(2266万tps),单机百万级连接(400万Tcp)
NewLife.Remoting 2011 RPC通信框架,内网高吞吐或物联网硬件设备场景
NewLife.Cube 2010 魔方快速开发平台,集成了用户权限、SSO登录、OAuth服务端等,单表100亿级项目验证
NewLife.Agent 2008 服务管理组件,把应用安装成为操作系统守护进程,Windows服务、Linux的Systemd
NewLife.Zero 2020 Zero零代脚手架,基于NewLife组件生态的项目模板,Web、WebApi、Service
中间件 对接知名中间件平台
NewLife.Redis 2017 Redis客户端,微秒级延迟,百万级吞吐,丰富的消息队列,百亿级数据量项目验证
NewLife.RocketMQ 2018 RocketMQ纯托管客户端,支持Apache RocketMQ和阿里云消息队列,十亿级项目验
NewLife.MQTT 2019 物联网消息协议,MqttClient/MqttServer,客户端支持阿里云物联网
NewLife.IoT 2022 IoT标准库,定义物联网领域的各种通信协议标准规范
NewLife.Modbus 2022 ModbusTcp/ModbusRTU/ModbusASCII,基于IoT标准库实现,支持IoT平台和IoTEdge
NewLife.Siemens 2022 西门子PLC协议,基于IoT标准库实现,支持IoT平台和IoTEdge
NewLife.Map 2022 地图组件库,封装百度地图、高德地图和腾讯地图
NewLife.Audio 2023 音频编解码库,PCM/ADPCMA/G711A/G722U/WAV/AAC
产品平台 产品平台级,编译部署即用,个性化自定义
Stardust 2018 星尘,分布式服务平台,节点管理、APM监控中心、配置中心、注册中心、发布中心
AntJob 2019 蚂蚁调度,分布式大数据计算平台(实时/离线),蚂蚁搬家分片**,万亿级数据量项目验证
NewLife.ERP 2021 企业ERP,产品管理、客户管理、销售管理、供应商管理
CrazyCoder 2006 码神工具,众多开发者工具,网络、串口、加解密、正则表达式、Modbus
EasyIO 2023 简易文件存储,支持分布式系统中文件集中存储。
XProxy 2005 产品级反向代理,NAT代理、Http代理
HttpMeter 2022 Http压力测试工具
GitCandy 2015 Git源代码管理系统
SmartOS 2014 嵌入式操作系统,完全独立自主,支持ARM Cortex-M芯片架构
SmartA2 2019 嵌入式工业计算机,物联网边缘网关,高性能.NET6主机,应用于工业、农业、交通、医疗
FIoT物联网平台 2020 物联网整体解决方案,建筑、环保、农业,软硬件及大数据分析一体化,单机十万级点位项目验证
UWB高精度室内定位 2020 厘米级(10~20cm)高精度室内定位,软硬件一体化,与其它系统联动,大型展厅项目验证

新生命开发团队

XCode

新生命团队(NewLife)成立于2002年,是新时代物联网行业解决方案提供者,致力于提供软硬件应用方案咨询、系统架构规划与开发服务。
团队主导的开源NewLife系列组件已被广泛应用于各行业,Nuget累计下载量高达60余万次。
团队开发的大数据核心组件NewLife.XCode、蚂蚁调度计算平台AntJob、星尘分布式平台Stardust、缓存队列组件NewLife.Redis以及物联网平台NewLife.IoT,均成功应用于电力、高校、互联网、电信、交通、物流、工控、医疗、文博等行业,为客户提供了大量先进、可靠、安全、高质量、易扩展的产品和系统集成服务。

我们将不断通过服务的持续改进,成为客户长期信赖的合作伙伴,通过不断的创新和发展,成为国内优秀的IT服务供应商。

新生命团队始于2002年,部分开源项目具有20年以上漫长历史,源码库保留有2010年以来所有修改记录
网站:https://newlifex.com
开源:https://github.com/newlifex
QQ群:1600800/1600838
微信公众号:
智能大石头

newlife.rocketmq's People

Contributors

christan avatar guma-ark avatar inversionhourglass avatar leeveel avatar lindexi avatar niuzhaosen avatar nnhy avatar t18724723333 avatar thxu avatar xi3892 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

newlife.rocketmq's Issues

Consumer的FromLastOffset问题

请问Consumer.FromLastOffset的设定是不是对应java原版里的ConsumeFromWhere,看名字应该是这个意思,但是看源码实现没有达到这个效果,并且没有ConsumeFromWhere的相关实现。

// 查询偏移量,可能首次启动-1
if (st.Offset < 0 && FromLastOffset)
{
var p = QueryOffset(mq);
if (SkipOverStoredMsgCount > 0)
{
// 设置了跳过积压的消息,此时判断积压的消息条数,若消息条数大于设定的数量,则强制从消费最大偏移量的位置消费
var maxOffset = QueryMaxOffset(mq);
if (maxOffset >= p + SkipOverStoredMsgCount) p = maxOffset;
}
//if (p == -1) p = 0;
//第一次消费新的队列,强制从消费最大偏移量的位置消费(避免由于第一次从最小偏移量消费而导致的数据大量积压问题)
//if (p <= 0) p = QueryMaxOffset(mq);
st.Offset = st.CommitOffset = p;
if (st.Offset >= 0) WriteLog("开始消费[{0}@{1}] Offset={2:n0}", mq.BrokerName, mq.QueueId, st.Offset);
}
// 拉取一批,阻塞等待
var offset = st.Offset >= 0 ? st.Offset : 0;

上面这段代码截取自DoPull方法,首先明确一点,在DoPull首次被调用时QueueStore.Offset取到的值必定是-1。然后根据上面这部分代码:
FromLastOffset=false

首次运行offset取值必定为0。这样应用每次重启就必须重新消费,但ConsumeFromWhere是针对首次消费的配置,也就是group创建后的首次消费,重启应用后不再判断该配置,应该在之前的offset继续消费

FromLastOffset=true & SkipOverStoredMsgCount<=0

首次运行时offset从broker取当前消费offset。这个对之前有过消费记录的没有问题,但对首次消费来说QueryOffset查询到的一定是0,这样也就需要重头开始消费,这和FromLastOffset=true不符,理论应该FromLastOffset=true时首次消费从最大偏移量开始消费,反之从0开始

FromLastOffset=true & SkipOverStoredMsgCount>0

首次运行时将offset + SkipOverStoredMsgCountmaxOffset进行对比,这样存在两种情况
1.首次消费,如果最大偏移量大于最大积压数量,那么就从最大偏移量开始消费,反之从0开始消费
2.非首次消费,如果最大偏移量大于当前偏移量+最大积压数量,那么就从最大偏移量开始消费,反之从当前偏移量开始消费
第一种情况有可能从0开始消费,与FromLastOffset=true定义不符,第二种情况按照你SkipOverStoredMsgCount的设定是没问题的

按照“FromLastOffset=true时首次消费从最大偏移量开始消费,反之从0开始消费,非首次消费从上一次消费的偏移量继续消费”的逻辑,我自己重新改了一下代码,完成后准备提交一个pullrequest,现在有一个问题需要跟作者讨论一下,SkipOverStoredMsgCount这个配置应该是你自己定义的吧,官方应该没有这个配置,我认为在已经消费过的情况下,即使有积压也不应该通过一个配置进行自动跳过,应该由人工判断后手动跳过,自动的风险太大,所以我更改后的逻辑是只在首次消费且FromLastOffset=false时才判断积压情况,这样就是在首次消费时,如果期望从头开始收,但发现从头开始数据量过大,那么就从尾开始收,同时因为这个配置是自定义的,我认为默认应该保持与官方一致,避免使用的人在不知情的情况下做出了与官方不一样的操作,所以我认为将SkipOverStoredMsgCount的默认值设置为0更合适。作者你看看SkipOverStoredMsgCount按我上面说的这个逻辑改是否可行,如果可行我就准备提交pullrequest了,如果有不妥的地方你说说你的想法,确定后我再提交pullrequest

Void DoWork(System.Object) 耗时过长 59,800ms

一直跟踪不到,这个错误
System.AggregateException: One or more errors occurred. (A task was canceled.)
....Void DoWork(System.Object) 耗时过长 59,800ms
是哪个操作产生的?
感觉是创建生产者(Producer)的时候触发的。。

10:10:00.648 30 N Worker #14 发现Broker[broker-clz]: 192.168.10.201:10911
10:10:00.648 30 N Worker #14 SELECT
p.*
FROM ProductTemp AS p
inner join Product jzjpp on jzjpp.Id = p.Pid and jzjpp.Status = 0
order by p.Pid
10:10:09.555 41 N T System.AggregateException: One or more errors occurred. (A task was canceled.)
---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError) in \NewLife.RocketMQ\ClusterClient.cs:line 211
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic) in \NewLife.RocketMQ\NameClient.cs:line 66
at NewLife.RocketMQ.NameClient.DoWork(Object state) in \NewLife.RocketMQ\NameClient.cs:line 58
at NewLife.Threading.TimerScheduler.Execute(Object state) in \NewLife.Core\Threading\TimerScheduler.cs:line 273
10:10:09.555 41 N T 任务 [7043]Void DoWork(System.Object) 耗时过长 59,800ms,建议使用异步任务Async=true
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53799)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53800)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53802)[0]
10:10:11.256 52 N CT3 [broker-clz]=> PULL_MESSAGE(53803)[0]
10:10:11.256 9 Y - [broker-clz]<= #PULL_NOT_FOUND(53801)[0]
10:10:11.256 49 N CT0 [broker-clz]=> PULL_MESSAGE(53804)[0]
10:10:11.256 51 N CT2 [broker-clz]=> PULL_MESSAGE(53805)[0]
10:10:11.256 50 N CT1 [broker-clz]=> PULL_MESSAGE(53806)[0]

调用阿里云(tcp) Producer和 Consumer 都报topic route info in name server for the topic: guanzhu

14:39:23.710 1 N - [Name]集群地址:Http://****aliyuncs.com:80
14:39:23.720 1 N - [Name]正在连接[Http://****aliyuncs.com:80]
14:39:23.733 1 N - Tcp.Open Tcp://0.0.0.0:51925=>118.190.213.147:80
14:39:23.954 1 N - NewLife.Core v8.10.2021.0101 Build 2021-01-01 19:31:46 .NET Framework 4.6
14:39:23.955 1 N - X组件核心库 ?2002-2020 NewLife
14:39:23.956 1 N - MqProducer v1.0.0.0 Build 2000-01-01 .NET Framework 4.7.2
14:39:23.957 1 N - MqProducer Copyright ? 2021
14:39:23.983 1 N - NewLife.RocketMQ.Protocol.ResponseException: 17: ### No topic route info in name server for the topic: guanzhu
See http://rocketmq.apache.org/docs/faq/ for further details.
在 NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError) 位 置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\ClusterClient.cs:行号 216
在 NewLife.RocketMQ.NameClient.GetRouteInfo(String topic) 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\NameClient.cs: 行号 66
在 NewLife.RocketMQ.Client.MqBase.Start() 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\MqBase.cs:行号 154
在 NewLife.RocketMQ.Producer.Start() 位置 D:\X\NewLife.RocketMQ\NewLife.RocketMQ\Producer.cs:行号 36
在 MqProducer.Program.Main(String[] args) 位置 D:\工作\MQ\Demo\rocketMQ\rocketMQDemo\MqProducer\Program.cs:行号 34
14:39:23.984 1 N - 异常退出!

consumer.Start();出现问题

微信截图_20211209175813

System.AggregateException
HResult=0x80131500
Message=One or more errors occurred. (A task was canceled.)
Source=System.Private.CoreLib
StackTrace:
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task1.GetResultCore(Boolean waitCompletionNotification) at System.Threading.Tasks.Task1.get_Result()
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError)
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic)
at NewLife.RocketMQ.Client.MqBase.Start()
at NewLife.RocketMQ.Consumer.Start()
at MQConsumer.Program.Main(String[] args) in C:\Users\lenovo\Downloads\RocketMQTest-main\MQConsumer\Program.cs:line 39

内部异常 1:
TaskCanceledException: A task was canceled.

@hipeace86 @annuo1111 你好,麻烦请问一下为什么会出现这种问题,十分感谢。

集群所有地址连接失败!

是我的端口没打开吗?
可是我已经打开了啊
-A INPUT -p tcp -m tcp --dport 10911 -j ACCEPT
-A INPUT -p tcp -m tcp --dport 9876 -j ACCEPT
-A FORWARD -j DOCKER-USER
-A FORWARD -j DOCKER-ISOLATION-STAGE-1
-A FORWARD -o docker0 -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
-A FORWARD -o docker0 -j DOCKER
-A FORWARD -i docker0 ! -o docker0 -j ACCEPT
-A FORWARD -i docker0 -o docker0 -j ACCEPT
-A DOCKER-ISOLATION-STAGE-1 -i docker0 ! -o docker0 -j DOCKER-ISOLATION-STAGE-2
-A DOCKER-ISOLATION-STAGE-1 -j RETURN
-A DOCKER-ISOLATION-STAGE-2 -o docker0 -j DROP
-A DOCKER-ISOLATION-STAGE-2 -j RETURN

10911 9876 这两个端口我都打开了

.NET5 Could not load file or assembly 'NewLife.Core, Version=9.0.8098.19786

.NET5 环境,已引用
NewLife.Core 9.0.2022.304
NewLife.RocketMQ 2.0.2022.304

启动包找不到引用
System.IO.FileLoadException:“Could not load file or assembly 'NewLife.Core, Version=9.0.8098.19786, Culture=neutral, PublicKeyToken=null'. The located assembly's manifest definition does not match the assembly reference. (0x80131040)”

请问有实现 OneWay 发送的计划吗?

目前发送方确实不关心消息是否在 Broker 落地,测试发现没有 OneWay 的情况下每次 publish 需要花费 30ms 以上,很多情况下需要 200ms 才能收到 ACK,导致发送方消息堆积。这边没法改变 broker,希望能有 OneWay 或其它曲线救国的实现方式减少请求的耗时。感谢!

请问下 Header 里面为啥要截断异常信息,这样设计的目的是啥哦?

/// <summary>创建异常</summary>
/// <returns></returns>
public ResponseException CreateException()
        {
            var err = Remark;
            if (!err.IsNullOrEmpty())
            {
                var p = err.IndexOf("Exception: ");
                if (p >= 0) err = err.Substring(p + "Exception: ".Length);
                p = err.IndexOf(", ");
                if (p > 0) err = err.Substring(0, p);
            }

            return new ResponseException(Code, err);
        }

修改和扩展了Producer.cs中的Publish方法

\NewLife.RocketMQ\NewLife.RocketMQ\Producer.cs

因为要用到keys所以,建议还在多加个Publish方法好点:

        /// <summary>发布消息</summary>
        /// <param name="body"></param>
        /// <param name="tags"></param>
        /// <param name="timeout"></param>
        /// <returns></returns>
        public virtual SendResult Publish(Object body, String tags = null, Int32 timeout = -1) => Publish(body, tags, null, timeout);

        /// <summary>发布消息</summary>
        /// <param name="body"></param>
        /// <param name="tags">传null则为空</param>
	/// <param name="keys">传null则为空</param>
        /// <param name="timeout"></param>
        /// <returns></returns>
        public virtual SendResult Publish(Object body, String tags, String keys, Int32 timeout = -1)
        {
            if (!(body is Byte[] buf))
            {
                if (!(body is String str)) str = body.ToJson();

                buf = str.GetBytes();
            }

            return Publish(new Message { Body = buf, Tags = tags, Keys = keys }, timeout);
        }

消费者 consume 支持异步

看了下源码,消费者是 pull 模型。 拉取消息都做了 await,Consume 就改下异步嘛? 能不能安排上?

var pr = await Pull(mq, offset, BatchSize, SuspendTimeout);
                    if (pr != null)
                    {
                        switch (pr.Status)
                        {
                            case PullStatus.Found:
                                if (pr.Messages != null && pr.Messages.Length > 0)
                                {
                                    // 触发消费
                                    **var rs = Consume(mq, pr);**  这边能改成异步么? await ConsumeAsync

                                    // 更新偏移
                                    if (rs)
                                    {
                                        st.Offset = pr.NextBeginOffset;
                                        // 提交消费进度
                                        await UpdateOffset(mq, st.Offset);
                                    }
                                }

rmq版本4.7.1,运行一段时间后会提示: FATAL | The service threw an unhandled exceptio

2023-02-09 04:14:52,033 | FATAL | The service threw an unhandled exception | 44
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.SocketAsyncEventArgs'.
at System.Net.Sockets.SocketAsyncEventArgs.ThrowForNonFreeStatus(Int32 status)
at System.Net.Sockets.SocketAsyncEventArgs.SetBuffer(Byte[] buffer, Int32 offset, Int32 count)
at NewLife.Net.SessionBase.ReleaseRecv(SocketAsyncEventArgs se, String reason)
at NewLife.Net.SessionBase.ProcessEvent(SocketAsyncEventArgs se, Int32 bytes, Boolean ioThread)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.ThreadPoolWorkQueue.Dispatch()

Is there size limit for a message?

I have some trouble to send large size of message. When i send large size message from Sender Program written in java, NewLife.RocketMQ client can receive it. but message body broken. (small size message is OK)
How long message size can be support? Is there size limit for a message?

Additional Test:

  1. Combination Sender written in NewLife.RocketMQ and Listener written in NewLife.RocketMQ : SUCCESS
  2. Combination Sender written in Java and Listener written in Java : SUCCESS
  3. Combination Sender written in NewLife.RocketMQ and Listener written in Java : SUCCESS
  4. Combination Sender written in Java and Listener written in NewLife.RocketMq : FAIL

在 RocketMq-console-ng 页面无法查询到消息设置的 Keys 与 Tags

在本项目 Message 类的 GetProperties 实现中有以下两行代码

 if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", nameof(Tags), Tags);
 if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", nameof(Keys), Keys);

我参考了 Java 客户端中的实现,Tags 和 Keys 应当为 TAGS KEYS 并区分大小写
我尝试修改上述代码为

 if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "TAGS", Tags);
 if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "KEYS", Keys);

测试后发现,这样在 RocketMq-console-ng 页面就能查询到设置的 Keys 和 Tags 了,并且在 Java 客户端也可以正常使用了。

rocketmq origin msgid is diffent with sdk parse

原始消息属性,其中UNIQ_KEY为:AC17043B001E1E9804B955C1DE160B6C
`

{"REAL_TOPIC":"zyq_mq_contract_tby_topic","sw8-x":"0-1684309168662","sw8":"1-NzgwOWIwNmE1ZDBmNDQ3ZTlhZDFmNGFmNTA5YWIzYWEuNDAwLjE2ODQzMDkxNjg2NjIwMjU1-NzgwOWIwNmE1ZDBmNDQ3ZTlhZDFmNGFmNTA5YWIzYWEuNDAwLjE2ODQzMDkxNjg2NjIwMjU0-0-cWEtZS1zaWduYXR1cmUtc2VydmVy-MTcyLjIzLjQuNTlAOTE5NTQ1NGQwNjFiNGE3Zjk2NDRjMGMxZDc3ZTVlZDg=-Um9ja2V0TVEvenlxX21xX2NvbnRyYWN0X3RieV90b3BpYy9Qcm9kdWNlcg==-cm9ja2V0bXExLXFhLnl6dy5jbjo5ODc2O3JvY2tldG1xMi1xYS55encuY246OTg3Njtyb2NrZXRtcTMtcWEueXp3LmNuOjk4NzY=","id":"5cdfe77e-e743-3156-ded2-010abbb7a31e","UNIQ_KEY":"AC17043B001E1E9804B955C1DE160B6C","WAIT":"false","contentType":"application/json","DELAY":"1","TAGS":"contract_info_tag","timestamp":"1684309168662","REAL_QID":"1"}

`

版本为【2.2.2023.401】的SDK解析解析出来之后,msgid:AC1001740000BCFD000000573DA3275F
能找时间看看是因为协议问题导致解析逻辑变了吗

MQ 解包失败

特别感谢大佬的这个库

在运行的时候发现有解包失败,但是缺少PDB符号文件,不知道具体是哪里炸了

调用堆栈

>	NewLife.Core.dll!System.IOHelper.ToUInt32(byte[] data, int offset, bool isLittleEndian)	C#
 	NewLife.Core.dll!NewLife.Net.Handlers.MessageCodec<System.__Canon>.GetLength(NewLife.Data.Packet pk, int offset, int size)	C#
 	NewLife.Core.dll!NewLife.Messaging.PacketCodec.Parse(NewLife.Data.Packet pk)	C#
 	NewLife.RocketMQ.dll!NewLife.RocketMQ.Protocol.MqCodec.Decode(NewLife.Model.IHandlerContext context, NewLife.Data.Packet pk)	C#
 	NewLife.Core.dll!NewLife.Net.Handlers.MessageCodec<NewLife.RocketMQ.Protocol.Command>.Read(NewLife.Model.IHandlerContext context, object message)	C#
 	NewLife.Core.dll!NewLife.Net.SessionBase.ProcessReceive(NewLife.Data.Packet pk, System.Net.IPEndPoint remote)	C#
 	NewLife.Core.dll!NewLife.Net.SessionBase.ProcessEvent(System.Net.Sockets.SocketAsyncEventArgs se)	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	C#
 	System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	C#
 	[本机到托管的转换]	

信息:

名称 类型
$exception {System.IndexOutOfRangeException: Index was outside the bounds of the array. at System.IOHelper.ToUInt32(Byte[] data, Int32 offset, Boolean isLittleEndian)} System.IndexOutOfRangeException

上面代码的 Offset 被优化掉了,现在能找到的是包的信息

按照 MqCodec 的 Decode 的代码,可以猜这是 0 的值

        protected override IList<Command> Decode(IHandlerContext context, Packet pk)
        {
            var ss = context.Owner as IExtend;
            var pc = ss["Codec"] as PacketCodec;
            if (pc == null) ss["Codec"] = pc = new PacketCodec { GetLength = p => GetLength(p, 0, -4) };
            // 忽略代码
        }

包的模拟数据如下:

            var buffer = new byte[]
{
                    0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70,
                    105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104,
                    66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103,
                    105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115,
                    101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48,
                    34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34,
                    58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 55, 44, 34,
                    114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81,
                    85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117,
                    114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115,
                    105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34,
                    58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103,
                    101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44,
                    34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44,
                    34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97,
                    110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101,
                    34, 58, 50, 56, 52, 57, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83,
                    65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122,
                    101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78,
                    34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225,
                    123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34,
                    58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114,
                    73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115,
                    101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97,
                    103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103,
                    101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 52, 55,
                    44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78,
                    95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101,
                    67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 56, 53, 51, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107,
                    101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58,
                    34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 52, 56, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34,
                    48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105,
                    110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49,
                    44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97,
                    113, 117, 101, 34, 58, 50, 56, 53, 48, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95,
                    77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97,
                    108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74,
                    83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0,
                    0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108,
                    100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111,
                    107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 50, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104,
                    105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116,
                    66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103,
                    101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 52,
                    44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78,
                    95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101,
                    67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 56, 53, 54, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107,
                    101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58,
                    34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34,
                    102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86,
                    65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 49, 44, 34, 114, 101, 109, 97, 114,
                    107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44,
                    34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116,
                    82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50,
                    53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101,
                    120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104,
                    105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116,
                    66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102,
                    115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101,
                    34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 56, 53, 53, 44, 34,
                    114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81,
                    85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117,
                    114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115,
                    105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101, 34,
                    58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103, 103,
                    101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44,
                    34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44,
                    34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102,
                    102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34, 108, 97,
                    110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113, 117, 101, 34, 58, 50, 55, 55, 51, 44,
                    34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95,
                    81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108, 105, 122, 101, 84, 121, 112, 101, 67,
                    117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 55, 55, 50, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83, 79, 78, 34, 44, 34, 118, 101, 114,
                    115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0, 0, 225, 123, 34, 99, 111, 100, 101,
                    34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100, 115, 34, 58, 123, 34, 115, 117, 103,
                    103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34, 48, 34,
                    44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34,
                    44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49, 44, 34,
                    108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97, 113,
                    117, 101, 34, 58, 50, 55, 55, 48, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95, 77,
                    69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97, 108,
                    105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74, 83,
                    79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0, 0,
                    0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108, 100,
                    115, 34, 58, 123, 34, 115, 117,
                    103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111, 107, 101, 114, 73, 100, 34, 58, 34,
                    48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34,
                    48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 105,
                    110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58, 49,
                    44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112, 97,
                    113, 117, 101, 34, 58, 50, 55, 50, 51, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79, 95,
                    77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105, 97,
                    108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34, 74,
                    83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0, 0, 0, 229, 0,
                    0, 0, 225, 123, 34, 99, 111, 100, 101, 34, 58, 49, 57, 44, 34, 101, 120, 116, 70, 105, 101, 108,
                    100, 115, 34, 58, 123, 34, 115, 117, 103, 103, 101, 115, 116, 87, 104, 105, 99, 104, 66, 114, 111,
                    107, 101, 114, 73, 100, 34, 58, 34, 48, 34, 44, 34, 110, 101, 120, 116, 66, 101, 103, 105, 110, 79,
                    102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 44, 34, 109, 97, 120, 79, 102, 102, 115, 101, 116, 34,
                    58, 34, 48, 34, 44, 34, 109, 105, 110, 79, 102, 102, 115, 101, 116, 34, 58, 34, 48, 34, 125, 44, 34, 102, 108, 97, 103, 34, 58,
                    49, 44, 34, 108, 97, 110, 103, 117, 97, 103, 101, 34, 58, 34, 74, 65, 86, 65, 34, 44, 34, 111, 112,
                    97, 113, 117, 101, 34, 58, 50, 55, 50, 50, 44, 34, 114, 101, 109, 97, 114, 107, 34, 58, 34, 78, 79,
                    95, 77, 69, 83, 83, 65, 71, 69, 95, 73, 78, 95, 81, 85, 69, 85, 69, 34, 44, 34, 115, 101, 114, 105,
                    97, 108, 105, 122, 101, 84, 121, 112, 101, 67, 117, 114, 114, 101, 110, 116, 82, 80, 67, 34, 58, 34,
                    74, 83, 79, 78, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 50, 53, 50, 125, 0
};

            var data = new byte[65535];
            Array.Copy(buffer, data, buffer.Length);
            var packet = new Packet(data, 0, 234);

Producer.cs 的 SelectQueue 方法里面获取的 Brokers 属性非线程安全

现象

在大概 30 秒左右的高速调用 Producer.cs 的 Publish 方法发送消息,此时偶尔将会出现 System.ArgumentOutOfRangeException:“Index was out of range. Must be non-negative and less than the size of the collection. ”System.InvalidOperationException:“Collection was modified; enumeration operation may not execute.” 提示,导致消息发送失败

原因

在 Producer.cs 的 SelectQueue方法里面,将会调用 Brokers.Where 方法获取 BrokerInfo 代理

var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();

而在 Producer.cs 的 Brokers 属性本质是调用 NameClient.cs 的 Brokers 属性

但是在初始化时,将会在 TimerX 初始化逻辑,代码如下

        public override void Start()
        {
            var cfg = Config;
            var ss = cfg.NameServerAddress.Split(";");

            Servers = ss.Select(e => new NetUri(e)).ToArray();

            base.Start();

            if (_timer == null) _timer = new TimerX(DoWork, null, cfg.PollNameServerInterval, cfg.PollNameServerInterval);
        }

更新时间差不多就是 30 秒左右,但是在更新完成之后,调用的 GetRouteInfo 方法里面,将变更集合,同时没有加上任何锁,导致了集合被更改

public IList<BrokerInfo> GetRouteInfo(String topic)
{
   // 忽略代码
   Brokers.Clear();
   if (Brokers is List<BrokerInfo> bks) bks.AddRange(list);
}

如果在刚好调用 GetRouteInfo 方法时,在进行消息发送,那么此时将会因为 Brokers 集合的变更而导致发送失败

消费者的消费问题

我在使用的时候发现了2个问题,能否指教
1.经过测试,消费端消费的能力似乎等同于java版的有序消费模式(MessageListenerOrderly), 见
https://www.jb51.net/article/215566.htm
我的环境是1个topic 2个队列,共1000条消息。 1个消费者,BatchSize默认32,使用Thread.Sleep(100) 模拟耗时,最后全部消费完,耗时53s,平均每个包耗时53ms, 与上面网址java版的MessageListenerOrderly速度相当。 而非java默认情况下的并发消费模式MessageListenerConcurrently

2.消费完了,返回的也是true。但是为什么从web-console上看到的 [主题]-nx_test-状态 的2个队列的最小位点始终为0。 [主题]-nx_test-状态-consumer管理里面的代理者位点和消费者位点是相同的,差值为0; 而消息的traceType全部是PULL,

下面分别是代码,和图:图中已经生产了3000条组左右的消息

var mq111 = new Producer()
{
    NameServerAddress = "127.0.0.1:9876",
    Log = XTrace.Log,
};
//链接默认的topic
mq111.Start();
mq111.CreateTopic("nx_test", 2);
mq111.Stop();
mq111.Dispose();
var mq = new Producer
{
    Topic = "nx_test",
    NameServerAddress = "127.0.0.1:9876",
    Log = XTrace.Log,
};
mq.Start();
for (var i = 0; i < 1001; i++)
{
    var str = "学新" + i +$" publishtime = {DateTime.Now:HHmmss.fff}";
    var sr = mq.Publish(str, "TagA");
}
Console.WriteLine("正在处理.....");
Console.ReadLine();
var consumer = new Consumer
{
    Topic = "nx_test",
    Group = "nx_g1",
    NameServerAddress = "127.0.0.1:9876",
    //FromLastOffset = false,
    //BatchSize = 3,
    Log = XTrace.Log,
};

consumer.OnConsume = (q, ms) =>
{
    XTrace.WriteLine("[{0}@{1}]收到消息[{2}] threadId={3} ms.Length={4}", q.BrokerName, q.QueueId, ms.Length, Thread.CurrentThread.ManagedThreadId,ms.Length);
    Console.WriteLine(
        $"[{q.BrokerName}@{q.QueueId}]收到消息[{ms.Length}] threadId={Thread.CurrentThread.ManagedThreadId} ms.Length={ms.Length}");
    foreach (var item in ms.ToList())
    {
        XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】queueid={q.QueueId} threadid={Thread.CurrentThread.ManagedThreadId}");
        Console.WriteLine(
            $"消息:主键【{ item.Keys}】,产生时间【{ item.BornTimestamp.ToDateTime()}】,内容【{ item.Body.ToStr()}");
        Thread.Sleep(100);
    }
    return true;
};
consumer.Start();
Console.WriteLine("正在处理.....按任意键停止消费");
Console.ReadLine();
consumer.Stop();
consumer.Dispose();
Console.WriteLine("consumer");

image
image
image

调用出错,似乎版本不兼容 rocketmq v4.9.1

MQ版本 V4_9_1

Producer发送消息会报如下错误:

the custom field <c> is null
org.apache.rocketmq.remoting.exception.RemotingCommandException: the custom field <c> is null, 
org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor.checkNotNull(AbstractSendMessageProcessor.java:376)

字段 c 对应的是 defaultTopic

消费者始终不触发OnConsume回调,不知道原因

  • 环境
  1. rocket部署在win10-wsl2的docker中,用的foxiswho/rocketmq,见 https://github.com/foxiswho/docker-rocketmq
  2. 并修改了其中要求的foxiswho/rocketmq版本为4.8.0, broker.conf文件中的brokerIP1=172.26.64.99(ip为wsl2的ifconfig)
  3. 用docker-compose成功启动,能打开console控制台
  • 新建2个工程test1,test2 分别为生产者和消费者
var mq111 = new Producer()
{
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};
mq111.Start();
mq111.CreateTopic("nx_test33", 2);
mq111.Stop();
mq111.Dispose();

var mq = new Producer
{
    Topic = "nx_test33",
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};

mq.Start();

for (var i = 0; i < 11; i++)
{
    var str = "学无先后达者为师" + i;
    //var str = Rand.NextString(1337);

    var sr = mq.Publish(str, "TagA");
}


Console.WriteLine("正在处理消息s");
Console.ReadLine();
var consumer = new Consumer
{
    Topic = "nx_test33",
    Group = "testg2",
    NameServerAddress = "127.0.0.1:9876",

    FromLastOffset = true,
    SkipOverStoredMsgCount = 0,
    BatchSize = 20,

    Log = XTrace.Log,
};

consumer.OnConsume = (q, ms) =>
{
    XTrace.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

    foreach (var item in ms.ToList())
    {
        XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}");
    }

    return true;
};

consumer.Start();


Console.WriteLine("任意键可以按,然后退出");
Console.ReadLine();
  • 执行上面2个后,发现消息的状态是PULL, 但是消费者的OnConsume回调始终不触发
    image
    image

消费端无法拉取未被消费的信息

var consumer = new Consumer
{
//Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
//AccessKey = "LTAINsp1qKfO61c5",
//SecretKey = "BvX6DpQffUz8xKIQ0u13EMxBW6YJmp",
Topic = "ntest",
Group = "test",
NameServerAddress = "127.0.0.1:9876",
FromLastOffset = false,
BatchSize = 20,
Log = XTrace.Log,
};

在上面的示例代码中,

FromLastOffset 字段设置为true的情况下,消费端仅拉取程序启动后新投递到消息队列中的信息,

FromLastOffset 字段设置为false的情况下,消费端每次启动都将拉取消息队列中所有的信息,

可否新增一个配置项,让消费端启动时仅拉取消息队列中所有未被消费的消息。

用线程创建了3个customer,程序开启一段时间后,就不消费了

for (int i = 0; i < 3; i++)
{
var j = i;
Task.Factory.StartNew(() =>
{
//测试消费消息
var consumer = new NewLife.RocketMQ.Consumer
{
Topic = "bb",
NameServerAddress = "xx",
BatchSize = 3,
Group = "bbgroup"
};
consumer.Start();
consumer.OnConsume = (q, ms) =>
{
string mInfo = $"datetime={DateTime.Now},BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
Console.WriteLine(mInfo);
if (q.QueueId % 3 == j)
{
//string mInfo = $"BrokerName={q.BrokerName},QueueId={q.QueueId},Length={ms.Length}";
//Console.WriteLine(mInfo);
LogHelper.Information("RocketMQConsumer" + j, mInfo);
foreach (var item in ms.ToList())
{
string msg = $"消息:msgId={item.MsgId},key={item.Keys},产生时间【{item.BornTimestamp.ToDateTime()}】,内容>{item.BodyString}";
//Console.WriteLine(msg);
LogHelper.Information("RocketMQConsumer" + j, msg);
//chat.SendMessageMQ(rtMQHelper.getSingnalModel(item.Body.ToString()));
}
// return false;//通知消息队:不消费消息
return true; //通知消息队:消费了消息
}
else
{
return true;
}
};
});
}
请问是不是这种写法有问题?

双主集群

请问支持双主集群吗?双主集群只配置一个consumer可以吗?测试发现起两个消费者,分表连接双主的某一个节点,是消费重复的,如何提升单个消费者的消费能力?谢谢

不支持RocketMQ4.6.0版本中增加的request-reply新特性

RocketMQ4.6.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
咱们这个中间件不支持这个新特性,期待支持新特性的版本上线!

consumer.Start();报错

System.AggregateException
HResult=0x80131500
Message=One or more errors occurred. (A task was canceled.)
Source=System.Private.CoreLib
StackTrace:
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task1.GetResultCore(Boolean waitCompletionNotification) at System.Threading.Tasks.Task1.get_Result()
at NewLife.RocketMQ.ClusterClient.Invoke(RequestCode request, Object body, Object extFields, Boolean ignoreError)
at NewLife.RocketMQ.NameClient.GetRouteInfo(String topic)
at NewLife.RocketMQ.Client.MqBase.Start()
at NewLife.RocketMQ.Consumer.Start()
at MQConsumer.Program.Main(String[] args) in C:\Users\lenovo\Downloads\RocketMQTest-main\MQConsumer\Program.cs:line 39

内部异常 1:
TaskCanceledException: A task was canceled.

@hipeace86 @annuo1111 你好,麻烦请问一下会什么会出现这种问题,十分感谢。

Producer和consumer如何使用阿里云rocketMQ

刚接触rocketMQ不太懂这个配置,看到这个类库,但是没有使用方面的文档说明

            NewLife.RocketMQ.Producer producer = new NewLife.RocketMQ.Producer
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Server = EndPoint,
                //此处为阿里云控制台rocketMQ实例下的接入点,分为TCP(只有内网Endpoint),HTTP(公网和内网为不同Endpoint)
                //是否需要NameServerAddress配置
                Topic = Topic
            };
            producer.Start();
            producer.Publish(new Message
            {
                Keys = "TheKey",
                BodyString = "TheMessage." + DateTime.Now,
                Tags = "TestTag"
            });
            producer.Stop();
NewLife.RocketMQ.Consumer consumer = new NewLife.RocketMQ.Consumer();
            consumer.Configure(new NewLife.RocketMQ.MqSetting()
            {
                AccessKey = AccessKeyId,
                SecretKey = AccessKeySecret,
                Group = Group,
                Server = EndPoint,
                Topic = Topic
            });
            consumer.OnConsume = (mq, ext) =>
            {
                Console.WriteLine(mq.BrokerName);
                return true;
            };
            consumer.Start();
            Task.Delay(TimeSpan.FromMinutes(10));
            consumer.Stop();

支持 RocketMQ 5.0 POP 消费模式

RocketMQ 5.0 中引入了一种新的消费模式:Pop 消费模式。

我们知道 RocketMQ 原来有两种消费模式:Pull 模式消费和 Push 模式消费,其中 Push 模式指的是 Broker 将消息主动“推送”给消费者,它的背后其实是消费者在不断地 Pull 消息来实现类似于 Broker “推”消息给消费者的效果。

新引入的 Pop 消费模式主要是用于 Push 消费时将拉消息的动作替换成 Pop 。Pop 消费的行为和 Pull 消费很像,区别在于 Pop 消费的重平衡是在 Broker 端做的,而之前的 Pull 和 Push 消费都是由客户端完成重平衡。

优点:

  • 负载均衡放在服务端完成,可支持超过 Queue 数量的消费者
  • 解决某个消费者 hang (挂机、僵尸)导致对应的 Queue 队列任务堆积没有被消费

参考:

是否支持集群消费

请问是否支持多节点消费,我们测试的时候发现在3个服务器上消费同一个topic数据,也是同一个消费组,发现三个节点都会消费同一条数据

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.