Giter Club home page Giter Club logo

Comments (10)

tylertreat avatar tylertreat commented on September 13, 2024 3

Good catch. This isn't the first time this behavior tripped someone up. Apologies for the poor documentation!

from liftbridge.

tylertreat avatar tylertreat commented on September 13, 2024 2

Check asyncError on the PublishResponse.

from liftbridge.

FZambia avatar FZambia commented on September 13, 2024 2

I think the reason is this code in server - it strikes when publishing without provided deadline (even if policy is ALL).

from liftbridge.

LaPetiteSouris avatar LaPetiteSouris commented on September 13, 2024 1

According to my understanding, detailed in #296 , in a single partition, the partition leader subscribes to the NATS topic, and only the partition leader processes incoming message from NATS.

Thus, given that:

  • NATS topic respects the order of the incoming message.

You can presume that the order of the message on a single partition is FIFO (First In First Out). This is very similar to Kafka ( Refer to Topics and Logs

Attention: This guarantee cannot stay true for the case of network failure. As in any distributed system, network failure may happen at anytime. For example:

  • (1) You send message M0 at time t0 to partition 0 of topic A
  • (2) You send message M1 at time t1 to partition 0 of topic A, given that t1 > t0.

If there is no network failure, it is guaranteed that the order of message on partition 0 of topic A will be M0 | M1.

However, if there is a network failure in (1), then M0 will arrives to NATS topic at moment t2 in which t2 > t1, then from the system view point, M0 is received after M1, thus the order would be M1 | M0.

In case you actually receive ACK of M1 before sending M2, then in such a case, the above-mentioned consistency will be respected.

This is my deduction from reading the codebase.

Moreover, in case you want to enforce stronger order consistency, you may consider checking the docs on consistency control. In short, you explicitly check the HighestOffset of the partition before pushing new messages. In such a case, it is guaranteed that the message orders on the partition will be, indeed, FIFO.

from liftbridge.

tylertreat avatar tylertreat commented on September 13, 2024 1

@LaPetiteSouris' explanation is correct. Ordering is assured at the partition level.

It sounds to me like messages may be getting dropped for some reason in your example. Out of curiosity, what happens if you enable acking for the messages?

from liftbridge.

tylertreat avatar tylertreat commented on September 13, 2024 1

I would expect a PublishAsyncError since you have enabled concurrency control on the stream but are not setting ExpectedOffset on the publish request.

It's strange that everything is empty. If you check the partition data on the server, can you see if messages are being persisted?

from liftbridge.

gc-ss avatar gc-ss commented on September 13, 2024

Appreciate the input!

You can presume that the order of the message on a single partition is FIFO (First In First Out).

That was my assumption. I am running liftbridge/standalone-dev.

However, when I sent between 1000 - 5000 messages over a few seconds (the system works as expected below this message volume), where each message is a counter - during the counter rollup/accumulation phase I am noticing a difference between the actual vs. expected count.

Default settings are being used (no ACKs) and a Python GRPC client that's based directly off the .proto file from the repo.

There could be 2 potential explanations for the observed difference between the actual vs. expected count.
(if you can think of anything else, can you let me know?):

  1. A few messages are getting dropped
  2. A few messages are arriving out of order

I can create a repo with Python 3.8 code running in a docker container that could help reproduce this issue - but would you be interested in checking it out, if I did that?

from liftbridge.

gc-ss avatar gc-ss commented on September 13, 2024

Hmm, with this liftbridge.yaml:

host: 0.0.0.0
clustering.raft.bootstrap.seed: true
nats.embedded.config: nats-server.conf
activity.stream.enabled: true
cursors.stream.partitions: 2
logging.level: debug
logging.recovery: false
logging.raft: false
logging.nats: false
batch.max.messages: 2048
streams.concurrency.control: true
streams.auto.pause.time: 30s
streams.auto.pause.disable.if.subscribers: true

When I do:

            publish_result = await self.client.publish(stream="arbitrary.counter.announce", value=serialized_data, ack_policy=AckPolicy.ALL)
            self.log.debug(f"publish_result = {publish_result, publish_result.ack.ack_policy, publish_result.ack.reception_timestamp, publish_result.ack.commit_timestamp}")

I see:

publish_result = (PublishResponse(ack=Ack(ack_policy=0, reception_timestamp=0, commit_timestamp=0)), 0, 0, 0)

and docker container logs liftbridge-main says:

time="2021-05-27 04:10:39" level=info msg="Liftbridge Version:        v1.5.1"
time="2021-05-27 04:10:39" level=info msg="Server ID:                 uLt7BkmIWS6etBF1xmCXjX"
time="2021-05-27 04:10:39" level=info msg="Namespace:                 liftbridge-default"
time="2021-05-27 04:10:39" level=info msg="NATS Servers:              [nats://127.0.0.1:4222]"
time="2021-05-27 04:10:39" level=info msg="Default Retention Policy:  [Age: 1 week, Compact: false]"
time="2021-05-27 04:10:39" level=info msg="Default Partition Pausing: 30 seconds"
time="2021-05-27 04:10:39" level=info msg="Starting embedded NATS server on 0.0.0.0:4222"
time="2021-05-27 04:10:39" level=info msg="Starting Liftbridge server on 0.0.0.0:9292..."
time="2021-05-27 04:10:40" level=debug msg="Raft node initialized"
time="2021-05-27 04:10:40" level=debug msg="Bootstrapping metadata Raft group as seed node"
time="2021-05-27 04:10:40" level=debug msg="Successfully bootstrapped metadata Raft group"
time="2021-05-27 04:10:41" level=info msg="Server became metadata leader, performing leader promotion actions
...
time="2021-05-27 04:11:14" level=debug msg="fsm: Created stream [name=arbitrary.counter.announce, subject=arbitrary.counter.announce, partitions=1]"
time="2021-05-27 04:11:14" level=debug msg="api: Publish [stream=__activity, partition=0]"
time="2021-05-27 04:11:14" level=debug msg="Published CREATE_STREAM event to activity stream"
...
time="2021-05-27 04:11:31" level=debug msg="api: Publish [stream=arbitrary.counter.announce, partition=0]"

No matter what AckPolicy I choose, the publish_result is always the same.

Am I using the right options when starting up liftbridge/standalone-dev?

I thought the default AckPolicy would be AckPolicy.LEADER which is 0 and that would imply that all the publish above should be using acks:

class AckPolicy(betterproto.Enum):
    """AckPolicy controls the behavior of message acknowledgements."""

    LEADER = 0
    ALL = 1
    NONE = 2

from liftbridge.

gc-ss avatar gc-ss commented on September 13, 2024

It's all empty
ack

from liftbridge.

gc-ss avatar gc-ss commented on September 13, 2024

@FZambia , Thank You! Internally @gc-pv also located this exact code and turns out indeed, this is the reason why messages were getting dropped.

So what we did was to explicitly add in a deadline kwarg to the publish call.

Now, when LiftBridge drops messages when the deadline is exceed, an exception gets raised on the client/publisher.

I plan to get around to documenting this in this ticket and opening a PR to LiftBridge documentation to clarify the effect deadline has

from liftbridge.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.