Giter Club home page Giter Club logo

Comments (11)

Technoboy- avatar Technoboy- commented on August 15, 2024

Hi, Could you give more description about your issue?
I have tried to set the above interceptor, it's working in my local.

from mop.

fanyouyong0526 avatar fanyouyong0526 commented on August 15, 2024

KoP needs to be added after version 2.80

brokerentrymetadatainterceptors = org. Apache. Pulsar. Common. Intercept. Appendindexmetadatainterceptor

KoP works normally, but when I add MoP, the mqtt client can publish messages normally, but cannot subscribe to messages
ERROR:
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toMqttMessages(PulsarMessageConverter.java:71) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:85) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:41) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:538) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:469) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

from mop.

liangyuanpeng avatar liangyuanpeng commented on August 15, 2024

I think you can provide more info for this, some like your topic, qos, etc..

And you are use mqtt consumer and mqtt producer, right?

mqtt producer --> mqtt connsumer

from mop.

fanyouyong0526 avatar fanyouyong0526 commented on August 15, 2024

image
When I use the above configuration, remove it

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

MQTT clients can publish and subscribe to messages
image
image
but, Kafka cannot publish and subscribe messages .
image
If I modify the configuration as above add it
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
Kafka client can publish and subscribe messages.
but the mqtt client can publish messages normally, cannot subscribe to messages
image
image
View the broker log with the following error message:

java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:416) ~[org.apache.pulsar-pulsar-common-2.8.0.jar:2.8.0]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarMessageConverter.toMqttMessages(PulsarMessageConverter.java:71) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:85) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.MQTTConsumer.sendMessages(MQTTConsumer.java:41) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:538) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:469) ~[org.apache.pulsar-pulsar-broker-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.8.0.jar:2.8.0]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.14.1.jar:4.14.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.63.Final.jar:4.1.63.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]

from mop.

fanyouyong0526 avatar fanyouyong0526 commented on August 15, 2024

The above is the detailed configuration and information.
I'm using a local build nar.
commit id is f6602d8
@Technoboy-

from mop.

fanyouyong0526 avatar fanyouyong0526 commented on August 15, 2024

I tried to update the latest version of the mop code and found that the MQTT client could not subscribe to the messages with or without the addition of brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
to the configuration file

from mop.

codelipenghui avatar codelipenghui commented on August 15, 2024

@fanyouyong0526 Which Pulsar version are you using, please make sure the Pulsar version is the same as the mop plugin version.

from mop.

codelipenghui avatar codelipenghui commented on August 15, 2024

@Technoboy- Looks we need to add an integration test in the streamnative-tests to make sure MoP can work with KoP together.

from mop.

fanyouyong0526 avatar fanyouyong0526 commented on August 15, 2024

pulsar version:2.8.0
mop:A locally built version,commitid is f6602d8

from mop.

BewareMyPower avatar BewareMyPower commented on August 15, 2024

Could you try 2.8.1? It's because Commands#parseMessageMetadata didn't skip the BrokerEntryMetadata in 2.8.0, but apache/pulsar#10968 added the skipBrokerEntryMetadataIfExist call in parseMessageMetadata. After that, parseMessageMetadata should work.

from mop.

Technoboy- avatar Technoboy- commented on August 15, 2024

Please upgrade pulsar to 2.8.1.

from mop.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.