Giter Club home page Giter Club logo

mockafka-py's Issues

Type hints cause TypeError under Python 3.8

Describe the bug
Type hinting in Mockafka uses standard collection types as generics, eg, list[str], rather than the capitalised type alias from the typing module List[str]. This ability is only available in Python 3.8 when using the annotations future statement: from __future__ import annotations. Without the future statement, the code results in a type error.

See PEP 585.

To Reproduce
Run the tests under Python 3.8. Running mypy under Python 3.8 will also identify the problem.

Expected behavior
There are two options:

  1. Revert to the types from the typing module, eg, Dict instead of dict
  2. Add the future statement to affected files.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: Fedora 38
  • Python 3.8.18

Additional context
Add any other context about the problem here.

Generic and unsolicited emails are not a good way to bring attention to your project

I just received a cold email from one of the owners of this repo asking me to contribute. Reaching out to strangers through unsolicited emails, or "cold emailing", to ask for contributions to an open source project is generally not a good idea. It comes across as impersonal and intrusive, and is unlikely to be well-received by the recipients like me. Open source contributors are often passionate about the projects they work on, and they tend to prefer getting involved organically through their own interests and connections, rather than being solicited. Cold emailing also risks being seen as spam, which can damage the reputation of the project and its maintainers. A better approach is to focus on building an active community around your open source project, engaging with potential contributors through forums, social media, and personal connections. This will help you foster a collaborative and welcoming environment that is more likely to attract meaningful contributions.

`FakeAIOKafkaProducer` need `stop` function

When I use mockafka-py to write tests, I encountered the following error:
AttributeError: 'FakeAIOKafkaProducer' object has no attribute 'stop'.

Here is my code(I referred to this document.):

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

I think FakeAIOKafkaProducer should have a stop function if I want to use mockafka to write a test for aiokafka.

Missing 0.1.56 pypi release

Describe the bug
There is no 0.1.56 release on pypi but there is a 0.1.56 tag on github.

Expected behavior
I should see a 0.1.56 release on pypi.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • python version
  • redis-py version
  • full requirements.txt?

Additional context
I can work around this by specifying the github repo and tag in my dependencies.

I couldn't find a way to remove the bug label but I think this is more of an operational thing than a bug.

Message timestamp interface doesn't match confluent_kafka

Describe the bug
The Message class in confluent_kafka returns a tuple of timestamp type and the timestamp itself, I infer the type is tuple[int, int] but the corresponding class in mockafka returns int | None.

To Reproduce
Code designed for parsing the timestamp from a confluent_kafka Message will fail:

consumer = FakeConsumer()
consumer.subscribe(["my_topic"])
msg = consumer.poll()
timestamp_type, timestamp = msg.timestamp()  # Results in TypeError: cannot unpack non-iterable int/NoneType object

Expected behavior
I hope that compatibility with confluent_kafka can be maintained!

mockafka-py==0.1.55
confluent-kafka==2.3.0
Python 3.11.8

Problem in commit single message

Bug Description:

In this implementation, we require a mechanism to either commit a single message or not.

The issue arises from our consumer strategy, where we begin looping from the first topic and first partition to consume a message. Consequently, if a message is not committed in the first topic and partition, and we attempt to consume messages from other topics, we encounter the same message repeatedly.

To address this, we need a mechanism to recall which topic or partition we've already consumed messages from.

I will address this in #63

Add more test coverage and Edge case

Description:
We're aiming to improve the reliability and robustness of Mockafka by adding additional tests that cover various scenarios, including edge cases. Testing these scenarios helps ensure that Mockafka behaves predictably and consistently, even in uncommon or extreme situations. By enhancing our test coverage, we can increase confidence in the stability of Mockafka, benefiting both developers and users.

Error handling in CustomDict

The CustomDict class overrides the __getitem__ method to suppress KeyError exceptions and return None if a key is not found. While this may be intentional, it could potentially mask errors in the code. It's essential to handle errors appropriately and provide meaningful feedback to users.

class CustomDict(dict):
    def __getitem__(self, key):
        try:
            return super().__getitem__(key)
        except KeyError:
            return

Remove unused `parameterized` dependency from package

Describe the bug

Thanks for this package.
Unfortunately it looks like one of the test dependencies has leaked into the package itself -- parameterized is pulled in when I install this package, yet parameterized doesn't seem to be needed at runtime by this package.

To Reproduce

$ poetry add --group=dev mockafka-py
Using version ^0.1.58 for mockafka-py

Updating dependencies
Resolving dependencies... (12.2s)

Package operations: 2 installs, 0 updates, 0 removals

  - Installing parameterized (0.9.0)
  - Installing mockafka-py (0.1.58)

Writing lock file

Expected behavior

Should not install packages not needed at runtime.

The messages are not cleared from topic once consumed.

Describe the bug
Created a mock producer and consumer as shown in the snippet. When a unit test is run, code uses mock producer to produce multiple messages to 2 topics. Testcase checks if the produced messages are received, via polling the mock consumer.
When multiple messages are produced on > 1 topic, the behavior is inconsistent.

  • The messages are not cleared from topic once consumed.
  • All messages are not received on the expected topic.

Even with a single topic is used and only produce multiple messages on that topic-

  • The messages are not cleared from topic once consumed.

To Reproduce

  • replace constants in files below. Run with TestCase to produce multiple messages on same topic, check consumed messages.
  • replace constants in files below. Run with TestCase to produce multiple messages on different topic, check consumed messages.

previously consumed messages are still shown.

import json
import uuid
from random import randint

from mockafka import FakeProducer, setup_kafka, FakeAdminClientImpl
from mockafka.admin_client import NewTopic

class MockProducer:

    def __init__(self):
        admin = FakeAdminClientImpl()
        admin.create_topics([
            NewTopic(topic=KAFKA_TOPIC_AUDITS, num_partitions=2),
            NewTopic(topic=KAFKA_TOPIC_RAW_DOCS, num_partitions=2)
        ])
        self._producer = FakeProducer()

    def produce(self, topic: str, key: str, message: dict):
        try:
            logger.info(f'sent msg to {topic}, key: {key}, {json.dumps(message, indent=4)}')
            self._producer.produce(
                key=key + str(uuid.uuid4()),
                value=message,
                topic=topic,
                partition=randint(0, 1)
            )
        except Exception as e:
            logger.error(f"kafka produce error:{e} for topic:{topic}, key:{key}, message:{message}")
from typing import Any
from mockafka import FakeConsumer

class MockConsumer:
    def __init__(self):
        self._consumer = FakeConsumer()

    def consume(self, topics, msg_cnt=1) -> Any:
        self._consumer.subscribe(topics)
        messages = []
        cnt = 1
        while cnt <= msg_cnt:
            message = self._consumer.poll(timeout=1.0)
            print("mock consumer received:", message.value())
            if message is not None:
                messages.append(message.value())
                cnt = cnt + 1
            self._consumer.commit(message)

        # self._consumer.unsubscribe(topics)
        return messages

    def close(self):
        self._consumer.close()

Expected behavior
The messages should be removed from topic once consumed.

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: Mac
  • python ver- 3.11
  • lib version mockafka-py==0.1.53

Additional context

`send_and_wait` doesn't properly mock `aiokafka.send_and_wait`

Describe the bug

aiokafka's version of send_and_wait has 2 positional arguments (see https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaProducer.send_and_wait). However, in FakeAIOKafkaProducer these arguments aren't there and the *args and **kwargs are not properly passed from send_and_wait to _produce and you get an exception when using this function:

TypeError: FakeAIOKafkaProducer._produce() missing 1 required positional argument: 'topic'

To Reproduce
Steps to reproduce the behavior:

producer = FakeAIOKafkaProducer()
producer.send_and_wait("topic_name", "sdfjhasdfhjsa", key="datakey")

๐Ÿ’ฅ

Expected behavior
It should not throw an exception

Screenshots

/usr/local/lib/python3.12/site-packages/sentry_sdk/tracing_utils_py3.py:64: in func_with_tracing
    return func(*args, **kwargs)
src/kafka.py:90: in process
    asyncio.run(self.send_updated_records(plant_data.plant_key, updated_data))
/usr/local/lib/python3.12/asyncio/runners.py:194: in run
    return runner.run(main)
/usr/local/lib/python3.12/asyncio/runners.py:118: in run
    return self._loop.run_until_complete(task)
/usr/local/lib/python3.12/asyncio/base_events.py:685: in run_until_complete
    return future.result()
src/kafka.py:106: in send_updated_records
    await self.producer.send_and_wait(ct, serialized_record, key=plant_key)
/usr/local/lib/python3.12/site-packages/mockafka/aiokafka/aiokafka_producer.py:40: in send_and_wait
    await self.send()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <mockafka.aiokafka.aiokafka_producer.FakeAIOKafkaProducer object at 0x7f5302d6dd30>, args = ()
kwargs = {}

    async def send(self, *args, **kwargs):
>       await self._produce(**kwargs)
E       TypeError: FakeAIOKafkaProducer._produce() missing 1 required positional argument: 'topic'

Desktop (please complete the following information):

  • OS: linux
  • python version: 3.12
  • mockafka-py version: 0.1.52

Async support?

AIOkafka user here trying to make some tests ๐Ÿ—ก๏ธ

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.