Giter Club home page Giter Club logo

aiokafka's Introduction

aiokafka

|Build status|

|Coverage|

|Chat on Gitter|

asyncio client for Kafka

AIOKafkaProducer

AIOKafkaProducer is a high-level, asynchronous message producer.

Example of AIOKafkaProducer usage:

from aiokafka import AIOKafkaProducer
import asyncio

async def send_one():
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    # Get cluster layout and initial topic/partition leadership information
    await producer.start()
    try:
        # Produce message
        await producer.send_and_wait("my_topic", b"Super message")
    finally:
        # Wait for all pending messages to be delivered or expire.
        await producer.stop()

asyncio.run(send_one())

AIOKafkaConsumer

AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:

from aiokafka import AIOKafkaConsumer
import asyncio

async def consume():
    consumer = AIOKafkaConsumer(
        'my_topic', 'my_other_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group")
    # Get cluster layout and join group `my-group`
    await consumer.start()
    try:
        # Consume messages
        async for msg in consumer:
            print("consumed: ", msg.topic, msg.partition, msg.offset,
                  msg.key, msg.value, msg.timestamp)
    finally:
        # Will leave consumer group; perform autocommit if enabled.
        await consumer.stop()

asyncio.run(consume())

Running tests

Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux. NOTE: You will also need a valid java installation. It's required for the keytool utility, used to generate ssh keys for some tests.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+):

sudo apt-get install -y libkrb5-dev krb5-user
make setup

Running tests with coverage:

make cov

To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable:

make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1

Test running cheatsheat:

  • make test FLAGS="-l -x --ff" - run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor.
  • make test FLAGS="-k consumer" - run only the consumer tests.
  • make test FLAGS="-m 'not ssl'" - run tests excluding ssl.
  • make test FLAGS="--no-pull" - do not try to pull new docker image before test run.

aiokafka's People

Contributors

ask avatar asvetlov avatar dependabot[bot] avatar dmitry-moroz avatar dmrz avatar fabregas avatar fediralifirenko avatar iamsinghrajat avatar jameshilliard avatar jeffwidman avatar jettify avatar jsurloppe avatar michalmazurek avatar multani avatar ods avatar originalgremlin avatar pawelrubin avatar pyup-bot avatar romantolkachyov avatar rooterkyberian avatar selevit avatar shargan avatar sobolevn avatar sylwia-budzynska avatar taraslevelup avatar tvainika avatar tvoinarovskyi avatar ultrabug avatar wbarnha avatar webknjaz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiokafka's Issues

Add type schema for mypy.

As mypy is becoming more of a talk, it would be great to be have a defined schema for all public API in aiokafka. Might as well look if we can benefit from using it internally.

kafka-python and Kafka 0.10.1 compatibility

Next release (0.2.0) will feature full compatibility with lastest Kafka and kafka-python. This is just a simple TODO for the compatibility review (mostly build from the kafka-python changelog):

  • Fetch/Produce new versions
    KAFKA-2136: support Fetch and Produce v1 throttle_time_ms
    KAFKA-3025: Message v1 โ€“ add timetamp and relative offsets
    Verify that message format on broker is v0.10.0, and that broker does not
    convert those on consumption phase. See https://kafka.apache.org/documentation#upgrade_10_performance_impact
  • Fix consumer iteration on compacted topics (dpkp PR 752)
  • Use explicit subscription state flag to handle seek() during message iteration
  • Fix log messages to be the same as new logs in kafka-python.
    KAFKA-3318: clean up consumer logging and error messages
  • KAFKA-3013: Include topic-partition in exception for expired batches
  • Randomize order of topics/partitions processed by fetcher to improve balance (dpkp PR 732)
    Note that https://kafka.apache.org/documentation#upgrade_1010_new_protocols
    introduces imiting the response size, so topic order is significant.
  • Allow client.check_version timeout to be set in Producer and Consumer constructors (eastlondoner PR 647)
  • Fix documentation to state 0.10.0 support

Add IPv6 support

Kafka-python added it with 1.1.1. After migrating we should add it too.
Update tests and docs

Fix docker-ip obtaining on OSX

Running into an issue while running tests on OSX, it always fails with:

    @pytest.fixture(scope='session')
    def docker_ip_address(docker):
        """Returns IP address of the docker daemon service."""
        # Fallback docker daemon bridge name
        ifname = 'docker0'
        try:
            for network in docker.networks():
                _ifname = network['Options'].get(
                    'com.docker.network.bridge.name')
                if _ifname is not None:
                    ifname = _ifname
                    break
        except libdocker.errors.InvalidVersion:
            pass
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
E       OSError: [Errno 6] Device not configured

The code that causes this is:

fcntl.ioctl(
        s.fileno(),
        0x8915,  # SIOCGIFADDR
        struct.pack('256s', ifname[:15].encode('utf-8')))

from what I can tell the struct in my case is:

b'docker0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0
0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x
00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\
x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

and fileno is 11. (Not sure if this helps to debug this)

I am able to run tests when changing the method to just return 127.0.0.1 using native docker for osx

Add metrics

  • We have some research to do before. Here's the list of metrics http://kafka.apache.org/documentation.html#monitoring (correct if wrong). Select ones of interest, some don't have any meaning in asyncio context (like number of select calls).
  • kafka-python has an implementation for metrics already. It would be best to reuse it to not support too much code. Some measurements are needed before thou.

Strange error in aiokafka consumer introduced in 0.2.2

Hi,

I'm running latest PyPi aiokafka 0.2.2. with kafka-python 1.3.1, and when running the following code I get the following mysterious error, which disappears when I downgrade to 0.2.1 - can you please take a look?

from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import asyncio, random

topic = 'test_topic_' + str(random.randint(1, 100000))

async def produce():
    # Just adds message to sending queue
    future = await producer.send(topic, b'some_message_bytes')
    resp = await future
    print("Message produced: partition {}; offset {}".format(
        resp.partition, resp.offset))

async def consume_task(consumer):
    try:
        msg = await consumer.getone()
        print("consumed: ", msg.topic, msg.partition, msg.offset,
              msg.key, msg.value, msg.timestamp)
    except KafkaError as err:
        print("error while consuming message: ", err)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce())
loop.run_until_complete(producer.stop())

consumer = AIOKafkaConsumer(topic, loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
loop.run_until_complete(consume_task(consumer))
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())

Error:

Task exception was never retrieved
future: <Task finished coro=<AIOKafkaConsumer._update_fetch_positions() done, defined at C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py:576> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 10, in _pack
    return pack(f, value)
struct.error: required argument is not an integer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
    result = coro.send(None)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py", line 595, in _update_fetch_positions
    yield from self._fetcher.update_fetch_positions(partitions)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 514, in update_fetch_positions
    x.result()
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py", line 274, in result
    raise self._exception
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
    result = coro.send(None)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 535, in _reset_offset
    offset = yield from self._offset(partition, timestamp)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 560, in _offset
    partition, timestamp)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 598, in _proc_offset_request
    response = yield from self._client.send(node_id, request)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\client.py", line 375, in send
    request, expect_response=expect_response)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\conn.py", line 141, in send
    message = header.encode() + request.encode()
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\struct.py", line 34, in _encode_self
    [self.__dict__[name] for name in self.SCHEMA.names]
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 56, in encode
    return _pack('>q', value)
  File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 12, in _pack
    raise ValueError(error)
ValueError: <class 'struct.error'>
Message produced: partition 0; offset 0

Can't run the test suite

I'm trying to get the test suite running but no luck. I'm on OSX und use Python 3.5.

When I run make cov as suggested, I get this error:

$ make cov
extra=$(python -c "import sys;sys.stdout.write('--exclude tests/test_pep492.py') if sys.version_info[:3] < (3, 5, 0) else sys.stdout.write('')"); \
    flake8 aiokafka tests $extra
tests/test_pep492.py:23:17: F999 'break' outside loop
tests/test_pep492.py:46:21: F999 'break' outside loop

If i fix those by adding # noqa behind them, flake8 passes but the tests can't seem to communicate with Docker:

$ KAFKA_VERSION=0.10.0.0 SCALA_VERSION=2.11 make cov
extra=$(python -c "import sys;sys.stdout.write('--exclude tests/test_pep492.py') if sys.version_info[:3] < (3, 5, 0) else sys.stdout.write('')"); \
    flake8 aiokafka tests $extra
=========================================================================================================== test session starts ============================================================================================================
platform darwin -- Python 3.5.0, pytest-2.9.2, py-1.4.31, pluggy-0.3.1
rootdir: /Users/fkochem/workspace/code/aiokafka, inifile:
plugins: catchlog-1.2.2, cov-2.3.0
collected 55 items

tests/test_client.py EEEEEEEEE
tests/test_conn.py EEEEEEEE
tests/test_consumer.py EEEEEEEEEEEEEE
tests/test_coordinator.py EEEEEEEEE
tests/test_fetcher.py ..
tests/test_message_accumulator.py ..
tests/test_pep492.py EEE
tests/test_producer.py EEEEEEEE

---------- coverage: platform darwin, python 3.5.0-final-0 -----------
Coverage HTML written to dir htmlcov


================================================================================================================== ERRORS ==================================================================================================================
_________________________________________________________________________________________ ERROR at setup of TestAIOKafkaClient.test_init_with_csv __________________________________________________________________________________________

docker = <docker.client.Client object at 0x10a6decf8>

    @pytest.fixture(scope='session')
    def docker_ip_address(docker):
        """Returns IP address of the docker daemon service."""
        # Fallback docker daemon bridge name
        ifname = 'docker0'
        try:
            for network in docker.networks():
                _ifname = docker.networks()[0]['Options'].get(
                    'com.docker.network.bridge.name')
                if _ifname is not None:
                    ifname = _ifname
                    break
        except libdocker.errors.InvalidVersion:
            pass
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        return socket.inet_ntoa(fcntl.ioctl(
            s.fileno(),
            0x8915,  # SIOCGIFADDR
>           struct.pack('256s', ifname[:15].encode('utf-8')))[20:24])
E       OSError: [Errno 6] Device not configured

tests/conftest.py:42: OSError

All tests error out with the same exception.

Topic leaders change/broker crash causes aiokafka to stop producing messages

Version information:

Kafka cluster: 0.10.2.1
aiokafka: 0.2.2 (kafka-python 1.3.1)
Python: 3.5.2

Producer script:

import asyncio
from aiokafka import AIOKafkaProducer

@asyncio.coroutine
def produce(loop):
    while True:
        resp = yield from producer.send_and_wait('my-topic', value=b'some_message_bytes')
        print("Message produced: partition {}; offset {}".format(resp.partition, resp.offset))
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='bootstrap-server1:9092,bootstrap-server2:9092,bootstrap-server3:9092')
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())
loop.close()

Bug reproduce flow:

  1. Start producer script, it will send message every 1 second.
  2. Stop one of Kafka brokers(partitions leaders will change).
  3. Script stops producing messages, no logs, anything, just hangs.

If you send kill command to the script, you will see this exception:

Task exception was never retrieved
future: <Task finished coro=<AIOKafkaProducer._send_produce_req() done, defined at lib/python3.5/site-packages/aiokafka/producer.py:331> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 17, in _unpack
    (value,) = unpack(f, data)
struct.error: unpack requires a bytes object of length 4

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "lib/python3.5/site-packages/aiokafka/producer.py", line 365, in _send_produce_req
    response = yield from self.client.send(node_id, request)
  File "lib/python3.5/site-packages/aiokafka/client.py", line 375, in send
    request, expect_response=expect_response)
  File "lib/python3.5/site-packages/aiokafka/conn.py", line 141, in send
    message = header.encode() + request.encode()
  File "lib/python3.5/site-packages/kafka/protocol/struct.py", line 34, in _encode_self
    [self.__dict__[name] for name in self.SCHEMA.names]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in encode
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 162, in <listcomp>
    [self.array_of.encode(item) for item in items]
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in encode
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 124, in <listcomp>
    for i, field in enumerate(self.fields)
  File "lib/python3.5/site-packages/kafka/protocol/message.py", line 154, in encode
    size = Int32.decode(items)
File "lib/python3.5/site-packages/kafka/protocol/types.py", line 50, in decode
    return _unpack('>i', data.read(4))
  File "lib/python3.5/site-packages/kafka/protocol/types.py", line 20, in _unpack
    raise ValueError(error)
ValueError: <class 'struct.error'>

We are looking into this, but feel free to comment.

Race condition in heartbeat

During testing our code we've tweaked some parameters of AIOKafkaConsumer. One of them is heartbeat_interval_ms that we set to 100 ms. After this we run a test that was waiting 100 ms between subscribing to more topics. Strange thing occurred sometimes (at first 1 of 20 cases, then 1 of 200 cases) that when I subscribed and unsubscribed a topic too fast kafka/aiokafka raised this exception:

    raise self._exception
/usr/lib/python3.5/asyncio/tasks.py:239: in _step
    result = coro.send(None)
messaging_client/mqclient/kafka_consumer.py:227: in stop
    await self._consumer.stop()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/consumer.py:279: in stop
    yield from self._coordinator.close()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:132: in close
    yield from self.heartbeat_task
/usr/lib/python3.5/asyncio/futures.py:363: in __iter__
    return self.result()  # May raise too.
/usr/lib/python3.5/asyncio/futures.py:274: in result
    raise self._exception
/usr/lib/python3.5/asyncio/tasks.py:239: in _step
    result = coro.send(None)
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:789: in _heartbeat_task_routine
    yield from self.ensure_active_group()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:593: in ensure_active_group
    yield from self._perform_group_join()
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:677: in _perform_group_join
    self.protocol, member_assignment_bytes)
../../.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py:266: in _on_join_complete
    self._subscription.assign_from_subscribed(assignment.partitions())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <kafka.consumer.subscription_state.SubscriptionState object at 0x7f62fcb32240>, assignments = [TopicPartition(topic='TestSchema.c6d8cc1a2fb242b38bee95a4a189e1fa.1', partition=0)]

    def assign_from_subscribed(self, assignments):
        """Update the assignment to the specified partitions
    
            This method is called by the coordinator to dynamically assign
            partitions based on the consumer's topic subscription. This is different
            from assign_from_user() which directly sets the assignment from a
            user-supplied TopicPartition list.
    
            Arguments:
                assignments (list of TopicPartition): partitions to assign to this
                    consumer instance.
            """
        if self.subscription is None:
            raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
    
        for tp in assignments:
            if tp.topic not in self.subscription:
>               raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
E               ValueError: Assigned partition TopicPartition(topic='TestSchema.c6d8cc1a2fb242b38bee95a4a189e1fa.1', partition=0) for non-subscribed topic.

I tried to capture some logs and will show important part of it:

[1480672858.4598098][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Received successful heartbeat response.
[1480672858.5570168][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.consumer.subscription_state][INFO] Updating subscribed topics to: {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.557299][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.consumer][DEBUG] Subscribed to topic(s): {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.558063][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka][DEBUG] Sending metadata request MetadataRequest(topics=['TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3']) to 0
[1480672858.559933][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Revoking previously assigned partitions set()
[1480672858.560181][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] (Re-)joining group 794401da87a540daba226dee38a81b1c
[1480672858.5605147][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Issuing request (JoinGroupRequest(group='794401da87a540daba226dee38a81b1c', session_timeout=30000, member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', protocol_type='consumer', group_protocols=[(protocol_name='roundrobin', protocol_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])) to coordinator 0
[1480672858.669306][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.consumer.subscription_state][INFO] Updating subscribed topics to: {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.6696045][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.consumer][DEBUG] Subscribed to topic(s): {'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'}
[1480672858.760019][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 7: MetadataResponse(brokers=[(node_id=0, host='10.107.20.205', port=9092)], topics=[(error_code=0, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=5, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', partitions=[]), (error_code=5, topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.3', partitions=[])])
[1480672858.7602575][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][WARNING] Topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.2 is not available during auto-create initialization
[1480672858.7603266][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][WARNING] Topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.3 is not available during auto-create initialization
[1480672858.773429][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.cluster][DEBUG] Updated cluster metadata to Cluster(brokers: 1, topics: 1, groups: 0)
[1480672858.7737765][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 8: JoinGroupResponse(error_code=0, generation_id=2, group_protocol='roundrobin', leader_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', members=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])
[1480672858.7740512][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Join group response JoinGroupResponse(error_code=0, generation_id=2, group_protocol='roundrobin', leader_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', members=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x03\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.2\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.3\x00\x00\x00\x00')])
[1480672858.7741585][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][INFO] Joined group '794401da87a540daba226dee38a81b1c' (generation 2) with member_id aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa
[1480672858.774216][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][INFO] Elected group leader -- performing partition assignments using roundrobin
[1480672858.7743132][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Performing roundrobin assignment for subscriptions {'aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa': ConsumerProtocolMemberMetadata(version=0, subscription=['TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.2', 'TestSchema.aaa34fbd914a4d90ad20335be9061c77.3'], user_data=b'')}
[1480672858.7743835][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.coordinator.assignors.roundrobin][WARNING] No partition metadata for topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.2
[1480672858.774436][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][kafka.coordinator.assignors.roundrobin][WARNING] No partition metadata for topic TestSchema.aaa34fbd914a4d90ad20335be9061c77.3
[1480672858.774536][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Finished assignment: {'aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic='TestSchema.aaa34fbd914a4d90ad20335be9061c77.1', partitions=[0])], user_data=b'')}
[1480672858.774634][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Issuing leader SyncGroup (SyncGroupRequest(group='794401da87a540daba226dee38a81b1c', generation_id=2, member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', group_assignment=[(member_id='aiokafka-0.1.2-030f6183-dbbb-4733-895a-8f8fe4527dfa', member_metadata=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])) to coordinator 0
[1480672858.7753685][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.conn][DEBUG] <AIOKafkaConnection host=10.107.20.205 port=9092> Response 9: SyncGroupResponse(error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')
[1480672858.77553][ubuntu][/home/artimi/.virtualenvs/messaging/bin/pytest][aiokafka.group_coordinator][DEBUG] Received successful sync group response for group 794401da87a540daba226dee38a81b1c: SyncGroupResponse(error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00-TestSchema.aaa34fbd914a4d90ad20335be9061c77.1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')

You can see on line 2 that we are subscribing to topics ending with 1, 2 and 3. Then on line 4 MetadataRequest is sent. Then on line 8 topics are updated to leave out 1 and only keep 2 and 3. But on line 10 MetadataResponse finally arrives with topic 1 as successful and topic 2 and 3 with error 5 (that does not bother us right now). So we are now in situation where we have subscriptions == {2, 3}, assignments == {1}. Then when kafka method SubscriptionState.assign_from_subscribed() is called it discovers that the topic partion 1 is not in list of subscription and raises that previously described error.

Basically, assignment partitions are in state that does not correspond with current subscriptions because metadata are not updated fast (often?) enough. We hopefully found a workaround this by calling force_metadata_update after every unsubscription (it is quite uncommon in our workflow so we can afford it).

This happens rarely and unfortunately I could not create reliable minimal working example of this bug. I hope this helps somebody because I spent some time tracking this one.

This happened with aiokafka == 0.1.2 and kafka in docker container quantlane/kafka:0.9.

AIOKafkaProducer.send_and_wait() throws error when writing to a non-existent topic

When I try to write to a topic that doesn't yet exist, both vanilla kafka-python and AIOKafkaProducer.send() silently create the topic before writing to it, which is how it should be.
However, when I try to do the same in a call to send_and_wait, I get an error, see below:

AIOKafkaProducer
async def produce(loop):
    # Just adds message to sending queue
    future = await producer.send(topic+'1', b'some_message_bytes')
    # waiting for message to be delivered
    resp = await future
    print("Message produced: partition {}; offset {}".format(
          resp.partition, resp.offset))
    # Also can use a helper to send and wait in 1 call
    resp = await producer.send_and_wait(
         topic + '2', key=b'foo', value=b'bar')
    #resp = yield from producer.send_and_wait(
    #   'foobar', b'message for partition 1', partition=1)
โ€‹
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers=kafka_host, api_version="0.9")
 # Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
 # Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
 #loop.close()

Running that produces

Message produced: partition 0; offset 3
---------------------------------------------------------------------------
UnknownTopicOrPartitionError              Traceback (most recent call last)
<ipython-input-7-d00f93f5f3ce> in <module>()
     16 # Bootstrap client, will get initial cluster metadata
     17 loop.run_until_complete(producer.start())
---> 18 loop.run_until_complete(produce(loop))
     19 # Wait for all pending messages to be delivered or expire
     20 loop.run_until_complete(producer.stop())

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\base_events.py in run_until_complete(self, future)
    385             raise RuntimeError('Event loop stopped before Future completed.')
    386 
--> 387         return future.result()
    388 
    389     def stop(self):

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py in result(self)
    272             self._tb_logger = None
    273         if self._exception is not None:
--> 274             raise self._exception
    275         return self._result
    276 

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py in _step(***failed resolving arguments***)
    237                 # We use the `send` method directly, because coroutines
    238                 # don't have `__iter__` and `__next__` methods.
--> 239                 result = coro.send(None)
    240             else:
    241                 result = coro.throw(exc)

<ipython-input-7-d00f93f5f3ce> in produce(loop)
      8     # Also can use a helper to send and wait in 1 call
      9     resp = await producer.send_and_wait(
---> 10          topic + '2', key=b'foo', value=b'bar')
     11     #resp = yield from producer.send_and_wait(
     12     #   'foobar', b'message for partition 1', partition=1)

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send_and_wait(self, topic, value, key, partition)
    302     def send_and_wait(self, topic, value=None, key=None, partition=None):
    303         """Publish a message to a topic and wait the result"""
--> 304         future = yield from self.send(topic, value, key, partition)
    305         return (yield from future)
    306 

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send(self, topic, value, key, partition)
    286 
    287         # first make sure the metadata for the topic is available
--> 288         yield from self._wait_on_metadata(topic)
    289 
    290         key_bytes, value_bytes = self._serialize(topic, key, value)

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in _wait_on_metadata(self, topic)
    238         yield from self.client.force_metadata_update()
    239         if topic not in self.client.cluster.topics():
--> 240             raise UnknownTopicOrPartitionError()
    241 
    242         return self._metadata.partitions_for_topic(topic)

UnknownTopicOrPartitionError: [Error 3] UnknownTopicOrPartitionError: 

Recursion in Fetcher.next_record() causes RecursionError

There is recursion in Fetcher.next_record() that unhlandled may lead to RecursionError eventually. In our environment we are using single AIOKafkaConsumer for consuming multiple topics. But we want to get message only from desired topics, that's why we've used AIOKafkaConsumer.getone(*partitions) with partitions that consists of all TopicPartitions that corresponds to topic we want to consume in given coroutine. Using AIOKafkaConsumer.getone() calls Fetcher.next_record() where I encountered said problem. We may consume multiple partitions in Fetcher but it is synchronized by only one Fetcher._wait_empty_future (that serves to synchronization on new data). Here follows next_record() method:

@asyncio.coroutine
def next_record(self, partitions):
	for tp in list(self._records.keys()):
		if partitions and tp not in partitions:
			continue
		res_or_error = self._records[tp]
		if type(res_or_error) == FetchResult:
			message = res_or_error.getone()
			if message is None:
				# We already processed all messages, request new ones
				del self._records[tp]
				self._notify(self._wait_consume_future)
			else:
				return message
		else:
			# Remove error, so we can fetch on partition again
			del self._records[tp]
			self._notify(self._wait_consume_future)
			res_or_error.check_raise()
	# No messages ready. Wait for some to arrive
	if self._wait_empty_future is None or self._wait_empty_future.done():
		self._wait_empty_future = asyncio.Future(loop=self._loop)
	yield from asyncio.shield(self._wait_empty_future, loop=self._loop)
	return (yield from self.next_record(partitions))   

Imagine that this coroutine is awaited multiple times with partitions corresponding to each topic we have. Each of this coroutines awaits at one line before end on asyncio.shield. When one, e.g. first, topic receives data it sets self._wait_empty_future and all of those waiting coroutines recurses to the method next_record and again await on the same line until that first topic receives data. When this happens 1000 times (standard recursion limit) and no message comes for other topics all those coroutines fails with RecursionError because they get down one level each time the first topic receives data.

One thing that can solve this bug is removing recursion from Fetcher.next_record(). Maybe it would be even better to synchronize that coroutines per partition and not all by one _wait_empty_future but that would be maybe to complex and expensive.

Adjust Heartbeats to follow KIP-62

As mentioned in KIP-62 Kafka will (as of 0.10.1.0) have an ability to send HeartBeats from a Background thread.
aiokafka has done this from the start by design, as long as the event loop itself is spinning, but this KIP also proposes to limit the processing time by a different parameter max.poll.interval.ms. I think it is good to adapt the same idea for aiokafka so we assure message consumption progress on the client itself.

Change commit API to allow passing message object for commit

Currently commiting is a bit complicated cause we need to increment the offset. Instead of writing something like: consumer.commit({tp: msg.offset+1}) it would make sense to add another, more human API. For example:

consumer.commit(msg1, msg2)  # messages are from different partitions.

We can see that it's a message and properly handle offset increment.

Fetcher concurrency issue

There is a problem with Fetcher, I don't know if its bad design or simply "ported as is" issue, anyway:
fetched_records and next_record methods use _wait_empty_future to halt until results are read,
but this would only work if you had only one consumer task running. If you'd need to run several
asyncio tasks for consuming and processing messages you'll definetly end up with
_wait_empty_future being overriden with other one from other task and thus leave first task
waiting for future's result until loop is closed.

https://github.com/aio-libs/aiokafka/blob/master/aiokafka/fetcher.py#L604
Basically there is no check if _wait_empty_future already set (not None)

Reconnect on every exception from conn

Hi,
we've encountered again the problem with stale connection that does not reconnect. First I get some an error in AIOKafkaClient.send() from AIOKafkaConnection.send() method.

Got error produce response: ConnectionError: Connection at kafka.example.com closed

This may happen and we should be able to recover. However, we do not reconnect and in some time after this we start to get exceptions from MessageAccumulator that is full and not drained:

Traceback (most recent call last):
  File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run
    await self._producer.send(topic, payload)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send
    tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message
    raise KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

I was thinking that we could do the same thing that was done in #149: when there is any error from AIOKafkaConnection.send() close connection which is then reconnected. Now it would wrap whole method AIOKafkaConnection.send() and if any exception was raised from this function we would close the connection. This approach would solve this for every usage of AIOKafkaConnection.send() in the code (I found 2 in Fetcher, 1 in GroupCoordinator and 1 in AIOKafkaClient). However, I'm not sure if there could some case that in infinite loop reopens connection because of some recurrent error.
What do you think about that? I'm willing to implement this on my own if you think it is a good idea.

aiokafka version: 0.2.2
kafka version: 0.9.0

consumer_timeout ignored?

Hi,

when I run aiokafka consumer with consumer_timeout=100, it should terminate 100ms after reaching the end of the topic; instead (as demonstrated in the attached notebook) it hangs on, presumably waiting for the next record to show up, until terminated from keyboard.

Is it a bug or am I doing something wrong?

Thanks a lot!
consumer_timeout ignored.zip

Cannot Build RPM

RPM build fails as it cannot find CHANGES.rst

python3 setup.py bdist_rpm

  • STATUS=0
  • '[' 0 -ne 0 ']'
  • cd aiokafka-0.0.1
  • /usr/bin/chmod -Rf a+rX,u+w,g-w,o-w .
  • exit 0
    Executing(%build): /bin/sh -e /var/tmp/rpm-tmp.iKQpUb
  • umask 022
  • cd /home/salehi/projects/need-seek/backend/aiokafka-master/build/bdist.linux-x86_64/rpm/BUILD
  • cd aiokafka-0.0.1
  • python3 setup.py build
    Traceback (most recent call last):
    File "setup.py", line 53, in
    long_description='\n\n'.join((read('README.rst'), read('CHANGES.rst'))),
    File "setup.py", line 20, in read
    return open(os.path.join(os.path.dirname(file), f)).read().strip()
    FileNotFoundError: [Errno 2] No such file or directory: 'CHANGES.rst'
    error: Bad exit status from /var/tmp/rpm-tmp.iKQpUb (%build)

RPM build errors:
Bad exit status from /var/tmp/rpm-tmp.iKQpUb (%build)
error: command 'rpmbuild' failed with exit status 1

Reconnect to broker

Hi,
we are having a few network interruption lately and so I'm digging in reconnecting. I was not able to find reconnect feature in aiokafka in conn.py nor in client.py. Is it implented in aiokafka elsewhere or is it missing?
I was looking in the kafka documentation and there is an option reconnect.backoff.ms for consumer and producer (https://kafka.apache.org/0102/documentation/#newconsumerconfigs) that prevents too fast reconnecting in case of failure. In kafka-python it is implemented in KafkaClient._maybe_refresh_metadata and another part in KafkaClient._boostrap.

I was trying to drop connection using tcpkill and I discovered that it blocks connection from rising sequence of ports

10.107.20.205:39614 > 172.17.0.2:9092: R 2713530037:2713530037(0) win 0
...
10.107.20.205:39616 > 172.17.0.2:9092: R 3723147408:3723147408(0) win 0
...
10.107.20.205:39618 > 172.17.0.2:9092: R 830983828:830983828(0) win 0

Do you have an idea how can this happen? Is there some reconnect after all? However when I stopped tcpkill no new messages went through although in kafka-python it reconnected and started to accepting messages again.

I'm willing to implement this feature but I would welcome any advice where it would be best to put (I was thinking about putting it in conn.py in case of some error in sending message).

Heartbeat fails if there are no consumed topics

Hi,
I've encountered strange behavior. When I create an AIOKafkaConsumer and I do not subscribe any topic it fails during next hearbeat on following error:

Traceback (most recent call last):
  File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 32, in <module>
    main()
  File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 18, in main
    loop.run_until_complete(subscribe_later(consumer))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/artimi/.PyCharmCE2016.3/config/scratches/scratch_4.py", line 28, in subscribe_later
    await consumer.stop()
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/consumer.py", line 272, in stop
    yield from self._coordinator.close()
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 134, in close
    yield from self.heartbeat_task
  File "/usr/lib/python3.5/asyncio/futures.py", line 363, in __iter__
    return self.result()  # May raise too.
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 806, in _heartbeat_task_routine
    yield from self._send_req(self.coordinator_id, request)
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/group_coordinator.py", line 159, in _send_req
    resp = yield from self._client.send(node_id, request)
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 309, in send
    if not (yield from self.ready(node_id)):
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 287, in ready
    conn = yield from self._get_conn(node_id)
  File "/home/artimi/.virtualenvs/messaging/lib/python3.5/site-packages/aiokafka/client.py", line 268, in _get_conn
    assert broker, 'Broker id %s not in current metadata' % node_id
AssertionError: Broker id None not in current metadata

You can try it with following piece of code:

import asyncio
import aiokafka
import uuid

HEARTBEAT_INTERVAL = 1.0

def main():
	loop = asyncio.get_event_loop()
	consumer = aiokafka.AIOKafkaConsumer(
		loop = loop,
		bootstrap_servers = 'localhost:9092',
		group_id = uuid.uuid4().hex,
		enable_auto_commit = False,
		auto_offset_reset = 'earliest',
		heartbeat_interval_ms = HEARTBEAT_INTERVAL * 1000,
		metadata_max_age_ms = 30000,
	)
	loop.run_until_complete(subscribe_later(consumer))


async def subscribe_later(consumer: aiokafka.AIOKafkaConsumer):
	await consumer.start()
	await asyncio.sleep(1.1 * HEARTBEAT_INTERVAL)
	await consumer.stop()


if __name__ == '__main__':
	main()

This happens even if I subscribe and then unsubscribe from some topic before hearbeat is performed.

I tried this on aiokafka 0.1.2 and 0.1.4 with kafka docker container quantlane/kafka:0.9.

kafka-python 1.3.3 breaks aiokafka producer?

Hi,

when I run the following code using kafka-python 1.3.1, it runs fine.

from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import asyncio
topic = 'test_topic'
def produce(loop):
    # Just adds message to sending queue
    future = yield from producer.send(topic, b'some_message_bytes')
    resp = yield from future
    print("Message produced: partition {}; offset {}".format(
        resp.partition, resp.offset))

loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
loop.run_until_complete(producer.stop())

Yet when I update to kafka-python 1.3.3, it produces the error below.
Could you please take a look?
Thanks a lot!

AttributeError                            Traceback (most recent call last)
<ipython-input-1-81a83ca85581> in <module>()
     14 # Bootstrap client, will get initial cluster metadata
     15 loop.run_until_complete(producer.start())
---> 16 loop.run_until_complete(produce(loop))
     17 loop.run_until_complete(producer.stop())

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\base_events.py in run_until_complete(self, future)
    385             raise RuntimeError('Event loop stopped before Future completed.')
    386 
--> 387         return future.result()
    388 
    389     def stop(self):

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py in result(self)
    272             self._tb_logger = None
    273         if self._exception is not None:
--> 274             raise self._exception
    275         return self._result
    276 

C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py in _step(***failed resolving arguments***)
    237                 # We use the `send` method directly, because coroutines
    238                 # don't have `__iter__` and `__next__` methods.
--> 239                 result = coro.send(None)
    240             else:
    241                 result = coro.throw(exc)

<ipython-input-1-81a83ca85581> in produce(loop)
      5 def produce(loop):
      6     # Just adds message to sending queue
----> 7     future = yield from producer.send(topic, b'some_message_bytes')
      8     resp = yield from future
      9     print("Message produced: partition {}; offset {}".format(

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\producer.py in send(self, topic, value, key, partition)
    277 
    278         fut = yield from self._message_accumulator.add_message(
--> 279             tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)
    280         return fut
    281 

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\message_accumulator.py in add_message(self, tp, key, value, timeout)
    197                 self._wait_data_future.set_result(None)
    198 
--> 199         future = batch.append(key, value)
    200         if future is None:
    201             # Batch is full, can't append data atm,

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\message_accumulator.py in append(self, key, value)
     83             return None
     84 
---> 85         encoded = Message(value, key=key, magic=self._version_id).encode()
     86         msg = Int64.encode(self._relative_offset) + Int32.encode(len(encoded))
     87         msg += encoded

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\util.py in __call__(self, *args, **kwargs)
    151         Calls the method on target with args and kwargs.
    152         """
--> 153         return self.method()(self.target(), *args, **kwargs)
    154 
    155     def __hash__(self):

C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\message.py in _encode_self(self, recalc_crc)
     71 
     72     def _encode_self(self, recalc_crc=True):
---> 73         version = self.magic
     74         if version == 1:
     75             fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)

AttributeError: 'NoneType' object has no attribute 'magic'

Do we even need to support old Kafka versions?

In process of porting python-kafka code to asyncio I can see many places, that require version-dependant management of Kafka cluster. The author of python-kafka fixed those in process, but I have no idea why and what were the issues. It's hard to understand some concepts and just blindly port them I think will resolve in quite some support work to do.
For now I think we should focus on the Last Kafka release - 0.9. I can understand the need to support old clusters, as most of the existing ones are older. I doubt there are alot of 0.9 production environments (if any). But the driver was never released and 0.9 has very big changes in cluster management of the consumer.
We can always add backward compatibility after we review the 0.9 code good enough.
Any objections?

Slow commit

Hi,
I was digging in our messaging system and I noticed that consumer.commit() takes quite a lot of time. In my informal benchmarking I saw that it takes about 100 ms but I also saw 500 ms delay. I tried to compare it with kafka-python on the same topic at the same time and it took about 1 ms. Do you have any idea why it can take so long? Does commit in aiokafka do more than commit in kafka-python? We want to call commit after every message and this is really damaging performance. I would like to confirm every message and not just use auto commit.

While researching this I discovered that Java client and also kafka-python has so called ConsumerCoordinator.commit_offsets_async() https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/consumer.py#L334 . As I understand it is fire-and-forget commit that does not wait for the response from kafka server. This might help our perfomance. Why it is not present in aiokafka?

`Consumer.getmany` can return 0 results before timeout.

Nothing critical, but annoying during tests. If you execute consumer.getmany(timeout_ms=500) it can return {} before the timeout expires in case of assignment change, as it only waits on data once and that data is discarded as not assigned.
It should never be a bad thing for an actual application, as it will be running in a while True: loop and will only cause an unneeded loop iteration.

Both examples on main page give error: Correlation ids do not match: sent 1, recv 2

Hi,

when I try to run both examples from the main page of this project, I get the following error on both the producer and the consumer:

future: <Task finished coro=<wait_for() done, defined at /home/egor/anaconda3/lib/python3.5/asyncio/tasks.py:355> exception=CorrelationIdError('Correlation ids do not match: sent 1, recv 2',)>
Traceback (most recent call last):
  File "/home/egor/anaconda3/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/egor/anaconda3/lib/python3.5/asyncio/tasks.py", line 392, in wait_for
    return fut.result()
  File "/home/egor/anaconda3/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
kafka.errors.CorrelationIdError: CorrelationIdError: Correlation ids do not match: sent 1, recv 2


Any idea how to fix it? Notebook attached, am using Kafka 0.9.0.1 and Python 3.5 on Ubuntu 16.04

Thanks a lot!
E.

aiokafka_error_demo.ipynb.zip

Disconnected from group

Hello, just got to a point when events are processed with no problem until I assign my consumer to a group.

When I assign my consumer to a group the events are processed for only about 2 minutes and then it constantly returning an ERROR and not processing any messages:

storage_in_1 | 2016-05-27 12:58:06,471 - aiokafka.group_coordinator - ERROR - Skipping heartbeat: no active group: GroupCoordinatorNotAvailableError - 15 - The broker returns this error code for group coordinator requests, offset commits, and most group management request
s if the offsets topic has not yet been created, or if the group coordinator is not active.
storage_in_1 | 2016-05-27 12:58:06,573 - aiokafka.group_coordinator - ERROR - Skipping heartbeat: no active group: GroupCoordinatorNotAvailableError - 15 - The broker returns this error code for group coordinator requests, offset commits, and most group management request
s if the offsets topic has not yet been created, or if the group coordinator is not active.

Just before this happens, group_coordinator outputs:

storage_in_1 | 2016-05-27 12:58:05,778 - aiokafka.group_coordinator - INFO - Heartbeat failed: local member_id was not recognized; resetting and re-joining group
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - ERROR - Heartbeat session expired - marking coordinator dead
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - INFO - Marking the coordinator dead (node 0): None.
storage_in_1 | 2016-05-27 12:58:05,779 - aiokafka.group_coordinator - ERROR - OffsetCommit failed for group test-consumer-group due to group error (IllegalGenerationError - 22 - Returned from group membership requests (such as heartbeats) when the generation id provided i
n the request is not the current generation.), will rejoin
storage_in_1 | 2016-05-27 12:58:05,780 - aiokafka.group_coordinator - WARNING - Auto offset commit failed: IllegalGenerationError - 22 - Returned from group membership requests (such as heartbeats) when the generation id provided in the request is not the current generati
on.

The consumer is configured as:

AIOKafkaConsumer(
    'some-topic',
    loop=loop,
    bootstrap_servers='my-local-kafka-url',
    group_id='some-group-name'
)

Kafka version: flozano/kafka:0.9.0.1
aiokafka version: aiokafka (0.1.2)

Any idea how can we resolve this issue?
Thanks

edit:

Just tried version 0.1.0, 0.1.1, 0.1.2, latest master and having same issue on all the versions

Use loop.create_future instead of asyncio.Future

We should use AbstractEventLoop.create_future() in order to create futures bounded to the event loop, since third party event loop may provide custom asyncio.Future implementation. uvloop has its own implementation in C.

Metadata change can trigger 2 rebalances in a rare case

While working on #88 I noticed a strange bug. We monitor metadata change in Coordinator and if metadata changes we request a rejoin. What we don't do thou is clear metadata snapshot (_partitions_per_topic) on rebalance. This can lead to following situation:

  • We set subscription to topic1. _partitions_per_topic = {'topic1': {0, 1}. 1 Rebalance
  • We change subscription to topic2. Metadata is updated before rebalance is completed and so _partitions_per_topic = {'topic1': {0, 1}, 'topic2': {0, 1}}. This is caused by self._group_subscription in Subscription not replacing, but updating set.
  • After reassignment self._group_subscription = {'topic2'}, so next Metadata update will yield _partitions_per_topic = {'topic2': {0, 1}}, triggering an unneeded rebalance.

Another racing in Coordinator

This one's not so tricky, but can be a pain. If we start a rebalance right after subscription changes and MetadataResponse doesn't arrive before leader performs partition assignment we can end up with an assignment of 0 partitions, as leader just does not know about a new topic yet.

Stop consumer iterator on `consumer.stop` call

Now the async iterator will still continue waiting for data even after consumer.stop() call. I think this is not correct behaviour, as this makes cancellation the only option to stop our consumer's.

Fix the subscribe `listener` API

One of the idea's of listener's API is to allow setUp and tearDown of topic or partition resources. Like invalidate or populate caches, commit last batch of messages, etc.
In the current implementation we only allow to call functions on_partitions_revoked and on_partitions_assigned. It will not stall group management (new messages will be fetched right away, Join Group will happen right away) defeating the purpose.

TODO:

  • Instead of just calling callbacks, check if on_partitions_assigned and on_partitions_revoked are by any chance coroutine functions (asyncio.iscoroutinefunction) and await them if they are.
  • Add an example of usage to consumer.rst docs. Fix subscribe docstring.

Implement max_poll_records

We already have the iterator interface, so it's not that critical, but Kafka's consumers introduced it, and we could do it too. Details in KIP-41

New topics doesn't appear when consumer use pettern.

AIOKafkaClient store list of topics and pass it into MetadataRequest. So if a new topic appear in kafka,
then consumer did not know anything about it. Does it really need - to store topic list inside client? Or we just can always do MetadataRequest with empty topic list to get all inforamtion about current state of kafka?

Bump python-kafka to 1.3.1

There are a few broken tests on this version

    @classmethod
    def encode(cls, items):
        # RecordAccumulator encodes messagesets internally
        if isinstance(items, io.BytesIO):
            size = Int32.decode(items)
            # rewind and return all the bytes
            items.seek(-4, 1)
            return items.read(size + 4)

        encoded_values = []
>       for (offset, message) in items:
E       ValueError: too many values to unpack (expected 2)

>       self.assertEqual(sorted(brokers), sorted(list(c_brokers)))
E       AssertionError: Lists differ: [(0, 'broker_1', 4567), (1, 'broker_2', 5678)] != [BrokerMetadata(nodeId=0, host='broker_1',[83 chars]one)]
E       
E       First differing element 0:
E       (0, 'broker_1', 4567)
E       BrokerMetadata(nodeId=0, host='broker_1', port=4567, rack=None)
E       
E       - [(0, 'broker_1', 4567), (1, 'broker_2', 5678)]
E       + [BrokerMetadata(nodeId=0, host='broker_1', port=4567, rack=None),
E       +  BrokerMetadata(nodeId=1, host='broker_2', port=5678, rack=None)]

Missing requirement for tests in README.rst

In README.rst there is written that all you need to do is:

  1. Install docker
  2. Install libsnappy-dev
  3. run make setup
  4. run make test

However after few hours I've discovered that you need to have installed also utility keytool https://docs.oracle.com/javase/6/docs/technotes/tools/solaris/keytool.html that is part of Java (not installed on my computer before). It is used in script gen-ssl-certs.sh to generate few files and one of them was br_server.keystore.jks. gen-ssl-certs.sh on tests start silently failed and didn't make those files. Kafka in docker then failed everytime it tried to start on exception:

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: /ssl_cert/br_server.keystore.jks (No such file or directory)

I recommend to update README with keytool requirements and also raise exception in fixture ssl_folder.py if gen-ssl-certs.sh does not return 0.

Endless "Skipping heartbeat: no active group:" error messages

If I let aiokafka connect to Kafka 0.10 after it has just started, it keeps spamming out these error messages and won't stop:

[2016-07-21 08:00:04,646] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,747] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,849] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:
[2016-07-21 08:00:04,950] ERROR [aiokafka.group_coordinator]: Skipping heartbeat: no active group:

If I stop my app, keep Kafka running and start my app again it will connect just fine.

This forces me to introduce sleep() commands to my code and hope that after an arbitrary delay everything works.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.