alm0ra / mockafka-py Goto Github PK
View Code? Open in Web Editor NEWMockafka-py is a Python library designed for in-memory mocking of Kafka.[aiokafka - confluence-kafka-python]
Home Page: https://mockafka-py.readthedocs.io
License: MIT License
Mockafka-py is a Python library designed for in-memory mocking of Kafka.[aiokafka - confluence-kafka-python]
Home Page: https://mockafka-py.readthedocs.io
License: MIT License
The real aiokafka
consumer's getone
implementation blocks indefinitely rather than returning None
when there are no messages. Currently FakeAIOKafkaConsumer.getone
differs from that.
Spotted while working on #112.
update readme for aiokafka support with examples
Describe the bug
Type hinting in Mockafka uses standard collection types as generics, eg, list[str]
, rather than the capitalised type alias from the typing
module List[str]
. This ability is only available in Python 3.8 when using the annotations future statement: from __future__ import annotations
. Without the future statement, the code results in a type error.
See PEP 585.
To Reproduce
Run the tests under Python 3.8. Running mypy under Python 3.8 will also identify the problem.
Expected behavior
There are two options:
typing
module, eg, Dict
instead of dict
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
Additional context
Add any other context about the problem here.
I just received a cold email from one of the owners of this repo asking me to contribute. Reaching out to strangers through unsolicited emails, or "cold emailing", to ask for contributions to an open source project is generally not a good idea. It comes across as impersonal and intrusive, and is unlikely to be well-received by the recipients like me. Open source contributors are often passionate about the projects they work on, and they tend to prefer getting involved organically through their own interests and connections, rather than being solicited. Cold emailing also risks being seen as spam, which can damage the reputation of the project and its maintainers. A better approach is to focus on building an active community around your open source project, engaging with potential contributors through forums, social media, and personal connections. This will help you foster a collaborative and welcoming environment that is more likely to attract meaningful contributions.
When I use mockafka-py to write tests, I encountered the following error:
AttributeError: 'FakeAIOKafkaProducer' object has no attribute 'stop'.
Here is my code(I referred to this document.):
from aiokafka import AIOKafkaProducer
import asyncio
async def send_one():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# Produce message
await producer.send_and_wait("my_topic", b"Super message")
finally:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
I think FakeAIOKafkaProducer
should have a stop
function if I want to use mockafka to write a test for aiokafka.
AIOKafkaConsumer.subscribe
has pattern
and listener
arguments, however they are not present on FakeAIOKafkaConsumer.subscribe
.
FakeAIOKafkaConsumer
doesn't (that I've seen) raise any errors about having been closed, yet it probably should if it wants to match the real aiokafka
consumer.
Spotted while perusing the aiokafka
source for #112.
Describe the bug
The consume function of the FakeConsumer class returns either a Message or None. This doesn't meet the confluent-kafka spec which lists the return type as list([Message])
Expected behavior
list(Message) is returned
Describe the bug
There is no 0.1.56 release on pypi but there is a 0.1.56 tag on github.
Expected behavior
I should see a 0.1.56 release on pypi.
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
Additional context
I can work around this by specifying the github repo and tag in my dependencies.
I couldn't find a way to remove the bug
label but I think this is more of an operational thing than a bug.
Describe the bug
The Message class in confluent_kafka returns a tuple of timestamp type and the timestamp itself, I infer the type is tuple[int, int]
but the corresponding class in mockafka returns int | None
.
To Reproduce
Code designed for parsing the timestamp from a confluent_kafka Message will fail:
consumer = FakeConsumer()
consumer.subscribe(["my_topic"])
msg = consumer.poll()
timestamp_type, timestamp = msg.timestamp() # Results in TypeError: cannot unpack non-iterable int/NoneType object
Expected behavior
I hope that compatibility with confluent_kafka can be maintained!
mockafka-py==0.1.55
confluent-kafka==2.3.0
Python 3.11.8
The Message
class provided by this project and returned from e.g: FakeAIOKafkaConsumer.getone
appears to be incompatible with the ConsumerRecord
class provided by aiokafka
.
Is this expected? Perhaps instead the structs from the mocked libraries could be used directly?
Bug Description:
In this implementation, we require a mechanism to either commit a single message or not.
The issue arises from our consumer strategy, where we begin looping from the first topic and first partition to consume a message. Consequently, if a message is not committed in the first topic and partition, and we attempt to consume messages from other topics, we encounter the same message repeatedly.
To address this, we need a mechanism to recall which topic or partition we've already consumed messages from.
I will address this in #63
Description:
We're aiming to improve the reliability and robustness of Mockafka
by adding additional tests that cover various scenarios, including edge cases. Testing these scenarios helps ensure that Mockafka
behaves predictably and consistently, even in uncommon or extreme situations. By enhancing our test coverage, we can increase confidence in the stability of Mockafka
, benefiting both developers and users.
The CustomDict
class overrides the __getitem__
method to suppress KeyError
exceptions and return None
if a key is not found. While this may be intentional, it could potentially mask errors in the code. It's essential to handle errors appropriately and provide meaningful feedback to users.
class CustomDict(dict):
def __getitem__(self, key):
try:
return super().__getitem__(key)
except KeyError:
return
mockafka-py/mockafka/cluster_metadata.py
Line 47 in 2eb4b51
Describe the bug
Thanks for this package.
Unfortunately it looks like one of the test dependencies has leaked into the package itself -- parameterized
is pulled in when I install this package, yet parameterized
doesn't seem to be needed at runtime by this package.
To Reproduce
$ poetry add --group=dev mockafka-py
Using version ^0.1.58 for mockafka-py
Updating dependencies
Resolving dependencies... (12.2s)
Package operations: 2 installs, 0 updates, 0 removals
- Installing parameterized (0.9.0)
- Installing mockafka-py (0.1.58)
Writing lock file
Expected behavior
Should not install packages not needed at runtime.
Describe the bug
Created a mock producer and consumer as shown in the snippet. When a unit test is run, code uses mock producer to produce multiple messages to 2 topics. Testcase checks if the produced messages are received, via polling the mock consumer.
When multiple messages are produced on > 1 topic, the behavior is inconsistent.
Even with a single topic is used and only produce multiple messages on that topic-
To Reproduce
previously consumed messages are still shown.
import json
import uuid
from random import randint
from mockafka import FakeProducer, setup_kafka, FakeAdminClientImpl
from mockafka.admin_client import NewTopic
class MockProducer:
def __init__(self):
admin = FakeAdminClientImpl()
admin.create_topics([
NewTopic(topic=KAFKA_TOPIC_AUDITS, num_partitions=2),
NewTopic(topic=KAFKA_TOPIC_RAW_DOCS, num_partitions=2)
])
self._producer = FakeProducer()
def produce(self, topic: str, key: str, message: dict):
try:
logger.info(f'sent msg to {topic}, key: {key}, {json.dumps(message, indent=4)}')
self._producer.produce(
key=key + str(uuid.uuid4()),
value=message,
topic=topic,
partition=randint(0, 1)
)
except Exception as e:
logger.error(f"kafka produce error:{e} for topic:{topic}, key:{key}, message:{message}")
from typing import Any
from mockafka import FakeConsumer
class MockConsumer:
def __init__(self):
self._consumer = FakeConsumer()
def consume(self, topics, msg_cnt=1) -> Any:
self._consumer.subscribe(topics)
messages = []
cnt = 1
while cnt <= msg_cnt:
message = self._consumer.poll(timeout=1.0)
print("mock consumer received:", message.value())
if message is not None:
messages.append(message.value())
cnt = cnt + 1
self._consumer.commit(message)
# self._consumer.unsubscribe(topics)
return messages
def close(self):
self._consumer.close()
Expected behavior
The messages should be removed from topic once consumed.
Screenshots
If applicable, add screenshots to help explain your problem.
Desktop (please complete the following information):
Additional context
The link to the documentation as included in the package (and thus shown on PyPI) goes to https://github.com/alm0ra/mockafka-py/docs, which results in a 404. From the repo, it appears that this should instead be https://mockafka-py.readthedocs.io/en/latest/.
Describe the bug
aiokafka's version of send_and_wait
has 2 positional arguments (see https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaProducer.send_and_wait). However, in FakeAIOKafkaProducer
these arguments aren't there and the *args and **kwargs are not properly passed from send_and_wait
to _produce
and you get an exception when using this function:
TypeError: FakeAIOKafkaProducer._produce() missing 1 required positional argument: 'topic'
To Reproduce
Steps to reproduce the behavior:
producer = FakeAIOKafkaProducer()
producer.send_and_wait("topic_name", "sdfjhasdfhjsa", key="datakey")
๐ฅ
Expected behavior
It should not throw an exception
Screenshots
/usr/local/lib/python3.12/site-packages/sentry_sdk/tracing_utils_py3.py:64: in func_with_tracing
return func(*args, **kwargs)
src/kafka.py:90: in process
asyncio.run(self.send_updated_records(plant_data.plant_key, updated_data))
/usr/local/lib/python3.12/asyncio/runners.py:194: in run
return runner.run(main)
/usr/local/lib/python3.12/asyncio/runners.py:118: in run
return self._loop.run_until_complete(task)
/usr/local/lib/python3.12/asyncio/base_events.py:685: in run_until_complete
return future.result()
src/kafka.py:106: in send_updated_records
await self.producer.send_and_wait(ct, serialized_record, key=plant_key)
/usr/local/lib/python3.12/site-packages/mockafka/aiokafka/aiokafka_producer.py:40: in send_and_wait
await self.send()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <mockafka.aiokafka.aiokafka_producer.FakeAIOKafkaProducer object at 0x7f5302d6dd30>, args = ()
kwargs = {}
async def send(self, *args, **kwargs):
> await self._produce(**kwargs)
E TypeError: FakeAIOKafkaProducer._produce() missing 1 required positional argument: 'topic'
Desktop (please complete the following information):
AIOkafka user here trying to make some tests ๐ก๏ธ
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.