Giter Club home page Giter Club logo

confluent-kafka-python's Introduction

Confluent's Python Client for Apache KafkaTM

confluent-kafka-python provides a high-level Producer, Consumer and AdminClient compatible with all Apache KafkaTM brokers >= v0.8, Confluent Cloud and Confluent Platform. The client is:

  • Reliable - It's a wrapper around librdkafka (provided automatically via binary wheels) which is widely deployed in a diverse set of production scenarios. It's tested using the same set of system tests as the Java client and more. It's supported by Confluent.

  • Performant - Performance is a key design consideration. Maximum throughput is on par with the Java client for larger message sizes (where the overhead of the Python interpreter has less impact). Latency is on par with the Java client.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

Usage

For a step-by-step guide on using the client see Getting Started with Apache Kafka and Python.

Aditional examples can be found in the examples directory or the confluentinc/examples github repo, which include demonstration of:

  • Exactly once data processing using the transactional API.
  • Integration with asyncio.
  • (De)serializing Protobuf, JSON, and Avro data with Confluent Schema Registry integration.
  • Confluent Cloud configuration.

Also refer to the API documentation.

Finally, the tests are useful as a reference for example usage.

Basic Producer Example

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in some_data_source:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message. The delivery report callback will
    # be triggered from the call to poll() above, or flush() below, when the
    # message has been successfully delivered or failed permanently.
    p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

For a discussion on the poll based producer API, refer to the Integrating Apache Kafka With Python Asyncio Web Applications blog post.

Basic Consumer Example

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'mybroker',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

Basic AdminClient Example

Create topics:

from confluent_kafka.admin import AdminClient, NewTopic

a = AdminClient({'bootstrap.servers': 'mybroker'})

new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.

# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = a.create_topics(new_topics)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

Thread Safety

The Producer, Consumer and AdminClient are all thread safe.

Install

Install self-contained binary wheels

$ pip install confluent-kafka

NOTE: The pre-built Linux wheels do NOT contain SASL Kerberos/GSSAPI support. If you need SASL Kerberos/GSSAPI support you must install librdkafka and its dependencies using the repositories below and then build confluent-kafka using the instructions in the "Install from source" section below.

Install from source

For source install, see the Install from source section in INSTALL.md.

Broker Compatibility

The Python client (as well as the underlying C library librdkafka) supports all broker versions >= 0.8. But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it is not safe for a client to assume what protocol version is actually supported by the broker, thus you will need to hint the Python client what protocol version it may use. This is done through two configuration settings:

  • broker.version.fallback=YOUR_BROKER_VERSION (default 0.9.0.1)
  • api.version.request=true|false (default true)

When using a Kafka 0.10 broker or later you don't need to do anything (api.version.request=true is the default). If you use Kafka broker 0.9 or 0.8 you must set api.version.request=false and set broker.version.fallback to your broker version, e.g broker.version.fallback=0.9.0.1.

More info here: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility

SSL certificates

If you're connecting to a Kafka cluster through SSL you will need to configure the client with 'security.protocol': 'SSL' (or 'SASL_SSL' if SASL authentication is used).

The client will use CA certificates to verify the broker's certificate. The embedded OpenSSL library will look for CA certificates in /usr/lib/ssl/certs/ or /usr/lib/ssl/cacert.pem. CA certificates are typically provided by the Linux distribution's ca-certificates package which needs to be installed through apt, yum, et.al.

If your system stores CA certificates in another location you will need to configure the client with 'ssl.ca.location': '/path/to/cacert.pem'.

Alternatively, the CA certificates can be provided by the certifi Python package. To use certifi, add an import certifi line and configure the client's CA location with 'ssl.ca.location': certifi.where().

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-python. confluent-kafka-python has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

Instructions on building and testing confluent-kafka-python can be found here.

Confluent Cloud

For a step-by-step guide on using the Python client with Confluent Cloud see Getting Started with Apache Kafka and Python on Confluent Developer.

confluent-kafka-python's People

Contributors

alexlod avatar anchitj avatar cchristous avatar choogeboom avatar confluentjenkins avatar confluenttools avatar ctrochalakis avatar edenhill avatar elismaga avatar emasab avatar ewencp avatar ffissore avatar hqin avatar hrchu avatar jliunyu avatar joel-hamill avatar johnistan avatar kwilcox avatar lowercase24 avatar mahajanadhitya avatar maxzheng avatar mhowlett avatar milindl avatar norwood avatar pranavrth avatar qix avatar rayokota avatar rnpridgeon avatar slominskir avatar xli1996 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

confluent-kafka-python's Issues

Improper usage of Consumer constructor crashes python interpreter

If you call the Consumer constructor with no parameters, python segfaults.

Python 3.5.2 (default, Aug 16 2016, 08:16:10)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import confluent_kafka
>>> c = confluent_kafka.Consumer()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: expected configuration dict
Fatal Python error: take_gil: NULL tstate
[1]    68631 abort (core dumped)  python

Note, that this only happens if there are strictly no parameters. Simply changing the constructor call to Consumer(**{}) avoids the crash.

Python 3.5.2 (default, Aug 16 2016, 08:16:10)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import confluent_kafka
>>> c = confluent_kafka.Consumer(**{})
>>>

consumer.committed([TopicPartition('test', 0)]) blocks and hangs

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, kafka 0.10.1.0
I have a topic "test" p = TopicPartition('test', 0), to which I've pushed some messages but haven't commited an offset yet (on purpose). c.position([p]) gives -1001 (gave me a "wtf" moment but as I've googled (took quite some time though) it is ok and means that there is nothing comitted yet). But c.committed([p]) just hangs (and can't be interrupted by ctrl-c).
I've turned on 'debug': 'all' and this is the tail of the log:

%7|1478876711.684|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 99)
%7|1478876711.784|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 99, rtt 100.80ms)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.784|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9455ms
%7|1478876711.785|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 100)
%7|1478876711.886|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 100, rtt 101.10ms)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.886|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9556ms
%7|1478876711.886|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 101)
%7|1478876711.993|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 101, rtt 106.74ms)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478876711.993|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-9663ms
%7|1478876711.993|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 102)

The messages look quite similar and are printed very fast. Also note the negative backoff, I guess it's not normal

"ALL_BROKERS_DOWN 2/2 brokers are down" with a single broker

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, kafka 0.10.1.0
I'm testing on a single broker, the code is like this:

from confluent_kafka import Consumer, KafkaError, TopicPartition

def on_commit (err, partitions):
	print err, partitions
print 'consuming'
c = Consumer(**{
	'client.id': 'test-client',
	'bootstrap.servers': 'localhost',
	'debug': 'all',
	'group.id': 'test-group',

	'enable.auto.commit': False,
	'enable.auto.offset.store': True,
	'offset.store.method': 'broker',
	'enable.partition.eof': True,

	
	'default.topic.config': {'auto.offset.reset': 'smallest'},
	'on_commit': on_commit,
	'error_cb': error_cb,
})

print 'assigning'
p = TopicPartition('test', 0)
c.assign([p])
print 'getting position'
pos = c.position([p])
print pos, pos[0].offset

print 'polling'
try:
	while True:
		msg = c.poll()
		if msg is None:
			continue

		if not msg.error():
		    print 'Received message: %s' % msg.value().decode('utf-8')
		elif msg.error().code() != KafkaError._PARTITION_EOF:
		    print '***error', msg.error()
		    break
		elif msg.error().code() == KafkaError._PARTITION_EOF:
			print '_PARTITION_EOF'
			# break
finally:
	print 'closing'
	c.close()
	print 'closed'

If the broker is down before I run the script, I get _ALL_BROKERS_DOWN 1/1 brokers are down (expected). But if the broker is up and first I run the script for some time (it fetches some messages and gets _PARTITION_EOF) and then stop the broker, I get _ALL_BROKERS_DOWN 2/2 brokers are down, which is suspicious as the broker is only single. Some debug logging around the message:

%7|1478883055.398|RECV|test-client#consumer-1| 127.0.0.1:9092/0: Received FetchResponse (v1, 36 bytes, CorrId 158, rtt 100.79ms)
%7|1478883055.398|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Topic test [0] MessageSet size 0, error "Success", MaxOffset 27, Ver 2/2
%7|1478883055.398|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch reply: Success
%7|1478883055.398|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch topic test [0] at offset 27 (v2)
%7|1478883055.398|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch 1/1/1 toppar(s)
%7|1478883055.398|FETCH|test-client#consumer-1| 127.0.0.1:9092/0: Fetch for 1 toppars, fetching=1, backoff=-15660ms
%7|1478883055.398|SEND|test-client#consumer-1| 127.0.0.1:9092/0: Sent FetchRequest (v1, 67 bytes @ 0, CorrId 159)
%7|1478883055.400|BROKERFAIL|test-client#consumer-1| localhost:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478883055.400|FAIL|test-client#consumer-1| localhost:9092/bootstrap: Receive failed: Disconnected
%7|1478883055.400|STATE|test-client#consumer-1| localhost:9092/bootstrap: Broker changed state UP -> DOWN
%7|1478883055.400|BROADCAST|test-client#consumer-1| Broadcasting state change
%7|1478883055.400|BUFQ|test-client#consumer-1| localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1478883055.400|BUFQ|test-client#consumer-1| localhost:9092/bootstrap: Updating 0 buffers on connection reset
%7|1478883055.400|METADATA|test-client#consumer-1| 127.0.0.1:9092/0: Request metadata for locally known topics: leader query: scheduled: not in broker thread
***error_cb _TRANSPORT localhost:9092/bootstrap: Receive failed: Disconnected
%7|1478883055.401|BROKERFAIL|test-client#consumer-1| 127.0.0.1:9092/0: failed: err: Local: Broker transport failure: (errno: Resource temporarily unavailable)
%3|1478883055.401|FAIL|test-client#consumer-1| 127.0.0.1:9092/0: Receive failed: Disconnected
%7|1478883055.401|STATE|test-client#consumer-1| 127.0.0.1:9092/0: Broker changed state UP -> DOWN
%7|1478883055.401|BROADCAST|test-client#consumer-1| Broadcasting state change
%7|1478883055.401|BUFQ|test-client#consumer-1| 127.0.0.1:9092/0: Purging bufq with 1 buffers
%7|1478883055.401|RETRY|test-client#consumer-1| 127.0.0.1:9092/0: Retrying FetchRequest (v1, 67 bytes, retry 1/2)
%7|1478883055.401|BUFQ|test-client#consumer-1| 127.0.0.1:9092/0: Updating 0 buffers on connection reset
***error_cb _TRANSPORT 127.0.0.1:9092/0: Receive failed: Disconnected
***error_cb _ALL_BROKERS_DOWN 2/2 brokers are down
%7|1478883055.475|CGRPSTATE|test-client#consumer-1| Group "test-group" changed state up -> query-coord (v5)
%7|1478883055.475|BROADCAST|test-client#consumer-1| Broadcasting state change
%7|1478883055.476|CGRPQUERY|test-client#consumer-1| Group "test-group": no broker available for coordinator query: intervaled in state query-coord
%7|1478883055.500|CONNECT|test-client#consumer-1| localhost:9092/bootstrap: broker in state DOWN connecting
%7|1478883055.501|CONNECT|test-client#consumer-1| localhost:9092/bootstrap: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 3
%7|1478883055.501|STATE|test-client#consumer-1| localhost:9092/bootstrap: Broker changed state DOWN -> CONNECT
%7|1478883055.501|BROADCAST|test-client#consumer-1| Broadcasting state change
%7|1478883055.502|CONNECT|test-client#consumer-1| 127.0.0.1:9092/0: broker in state DOWN connecting
%7|1478883055.502|CONNECT|test-client#consumer-1| 127.0.0.1:9092/0: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 4

My wild guess is that the library treats both bootstrap broker and the advertised worker as separate workers (intentionally or because of formally different hostnames "localhost" vs advertised "127.0.0.1"). It may be ok but it may lead to some bugs so please review it.
Anyway, the message about 2/2 is confusing.

close consumer without committing

Documentation says Consumer.close() commits all offsets
But what if something failed in my processing/batch? Say, my disk was full, I was able to read the messages but not write my processing of them? I'd like to be able to exit properly... And them maybe clear my disk, and then relaunch and be able to re-process, not pretend that the unprocessed messages were processed - which is the current undesirable behavior.

Is there some way to close() without committing?

--thanks

producer config property 'acks' is not passed through

Using Confluent 3.0.1 on Debian:

Traceback (most recent call last):
  File "./main.py", line 22, in <module>
    producer = Producer({'bootstrap.servers': bootstrap_servers, 'api.version.request': 'true', 'request.required.acks': -1})
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "request.required.acks""}

All combinations with the above settings, short version acks, and '-1' has the same effect.

cffi vs. Python C Extension

I am curious about the choice to use a Python C extension vs. using cffi? cffi should be easier to maintain, be much more performant on alternative runtimes (PyPy [1]), and core developers advocate for it [2]. There are high profile projects using it (pyzmq for example [3]). The documentation is pretty good [4], and instructions for packaging have gotten much simpler in recent months [5].

  1. http://doc.pypy.org/en/latest/faq.html#see-below
  2. http://www.snarky.ca/try-to-not-use-the-c-api-directly
  3. https://github.com/zeromq/pyzmq
  4. http://cffi.readthedocs.io/en/latest/overview.html
  5. https://caremad.io/posts/2015/06/distributing-a-cffi-project-redux/

KafkaError thrown when setting error_cb: Property "error_cb" must be set through dedicated .._set_..() function

After updating confluent_kafka to master, I'm getting an error when setting error_cb:

cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "error_cb" must be set through dedicated .._set_..() function"}

I'm using a basic config:

config = {
        "bootstrap.servers": self.kafka_hosts,
        "error_cb": self.dump_data_on_failure,
        "socket.keepalive.enable": True
    }
    self.producer = Producer(**config)

I installed confluent_kafka directly from the error_cb branch a week or two ago, and it worked as expected, but after upgrading this morning, im getting this error. I still have the previous version installed in a virtualenv and it works, and from looking at the tests, I thought I was doing everything right. Can anyone see what I'm doing wrong?

Integrate client with Confluent schema registry and Avro

We are looking at integrating this client with the Confluent schema registry and Avro with this client. Proposal is to add a new submodule in this repo that has:

  • Caching schema registry client
  • Message serializer
  • Wrapper around base producer/consumer

The wrapper would override send/receive methods on base producer/consumer, and pass the objects/bytes through the message serializer. The message serializer would, in turn, use the caching schema client to fetch the schema, and encode/decode the message. Wrapper would be pluggable so other serde implementations could be used as well (e.g. Thrift/Protobuf), though we're not planning to implement any others.

Does this sound OK?

/cc @roopahc

Client segfaults when threads are used

As part of #40, we started noticing that our tests were failing with a segfault in the Travis CI container, but only for Python 3.4.

We were unable to reproduce this locally on our laptops, but with Docker, and using a Travis CI container, we can. Here are the steps to trigger a segfault:

# Start Docker
docker run -it quay.io/travisci/travis-python /bin/bash

Once you're in your Docker container, run these commands:

sudo add-apt-repository ppa:fkrull/deadsnakes
sudo apt-get update
sudo apt-get install python3.4
pip install pip==7.1.0
pip install virtualenv==13.1.2
virtualenv -p /usr/bin/python3.4 /test
source /test/bin/activate
cd ~
git clone --depth=50 https://github.com/confluentinc/confluent-kafka-python.git
cd confluent-kafka-python/
git fetch origin pull/40/head:40
git checkout 40
export LD_LIBRARY_PATH=$PWD/tmp-build/lib
bash tools/bootstrap-librdkafka.sh v0.9.2 tmp-build
pip install pytest-timeout
pip install -v --global-option=build_ext --global-option="-Itmp-build/include/:/opt/python/3.4.2/include/python3.4m" --global-option="-Ltmp-build/lib" . .[avro]
git checkout 8148a79026d7324c34a24a03ba9043ebe2f931fd
py.test -v --timeout 20 --ignore=tmp-build --import-mode append

This will show a segfault before the tests finish.

Furthermore, I'm able to trigger this on master without any of our changes. After the above commands, run:

git checkout master

Then apply this diff:

diff --git a/tests/test_threads.py b/tests/test_threads.py
index 09f6467..e4f76e8 100644
--- a/tests/test_threads.py
+++ b/tests/test_threads.py
@@ -64,6 +64,35 @@ def test_thread_safety():
 
     print('Done')
 
+import time
+class myThread (threading.Thread):
+    def __init__(self, threadID, name, counter):
+        threading.Thread.__init__(self)
+        self.threadID = threadID
+        self.name = name
+        self.counter = counter
+    def run(self):
+        print("Starting " + self.name)
+        print_time(self.name, self.counter, 5)
+        print("Exiting " + self.name)
+
+def print_time(threadName, delay, counter):
+    while counter:
+        if exitFlag:
+            threadName.exit()
+        time.sleep(delay)
+        print("%s: %s" % (threadName, time.ctime(time.time())))
+        counter -= 1
+
+def test_foo():
+    # Create new threads
+    thread1 = myThread(1, "Thread-1", 1)
+    thread2 = myThread(2, "Thread-2", 2)
+
+    # Start new Threads
+    thread1.start()
+    thread2.start()
+

It's just some example of Python threading that I found on Stack Overflow. After the diff is applied, run the same py.test command again. It will again segfault.

Is produce() fully non-blocking?

I've read the documentation for confluent-kafka-python and librdkafka, but it is not very clear (after related experience with kafka-python package) if produce() is guaranteed non-blocking (except when configured for bounding the queue). I'm working on some latency-sensitive web app and I need to send some data into kafka without any blocking regardless of brocker downtime, re-election or whatever. So will this work with confluent-kafka-python or i need to add another level of queuing?

Consumer disconnects after polling for about an hour

Although I expect data flowing pretty constantly, I tested consumer polling after topic reached EOF and in about an hour it disconnected. I did not set the timeout parameter on the poll() method so expect it to keep polling indefinitely. Is there some ENV setting for librdkafka or something to prevent premature disconnects?

2016-07-03 18:34:26,177 - DEBUG - stream_consumer.pyc - rets.nnrmls.Property [1] reached end at offset 67
%3|1467592759.828|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467592759.828|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467592949.425|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467592949.425|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593066.174|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467593066.174|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467593549.569|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593549.569|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467593666.255|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467593666.255|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467593849.440|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467593849.440|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594149.695|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594149.695|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594266.302|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467594266.302|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467594749.356|FAIL|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467594749.356|ERROR|app2#producer-2| queue4:9092/4: Receive failed: Disconnected
%3|1467594749.742|FAIL|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594749.742|ERROR|app2#producer-2| queue3:9092/3: Receive failed: Disconnected
%3|1467594866.341|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467594866.341|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467595349.819|FAIL|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467595349.819|ERROR|app2#producer-2| queue1:9092/1: Receive failed: Disconnected
%3|1467595466.452|FAIL|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
%3|1467595466.452|ERROR|app2#producer-2| queue2:9092/2: Receive failed: Disconnected
2016-07-03 19:25:06,541 - ERROR - stream_consumer.pyc - Error polling messages.

Implementation code:

class StreamConsumer:
...

    def consume(self, topics, auto_offset='latest', processor=_printer, *args):
        """Connects to topic and listens for messages, handing them off to processor"""

        logger = self.__get_logger()

        try: 
            running = True

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'group.id': self.group_id,
                'default.topic.config': {'auto.offset.reset': auto_offset}
            }

            try:
                c = kafka.Consumer(**conf)
            except:
                logger.error( "Error creating Consumer with config [{}]".format(conf) )

            try:
                if auto_offset == 'earliest' or auto_offset == 'smallest':
                    c.subscribe(topics, on_assign=_on_assign)
                else:
                    c.subscribe(topics)
            except:
                logger.error( "Error subscribing to topics [{}]".format(topics) )
                c = None

            if c:
                logger.info( "Starting to poll topics [{}]...".format(topics) )
                while running:
                    try:
                        msg = c.poll()
                        if not msg.error():
                            processor(msg)
                        elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                            # End of partition event
                            logger.debug( "{} [{}] reached end at offset {}".format(
                                            msg.topic(), msg.partition(), msg.offset()) )
                        else:
                            logger.debug( "Unknown error [{}]. Quitting...".format(msg.error()) )
                            raise kafka.KafkaException(msg.error())
                    except:
                        running = False
                        logger.error( "Error polling messages." )
                # end while running loop
            else:
                logger.error( "Consumer object missing. Nothing to do!" )

            try:
                c.close() # close connection
            except:
                logger.warn( "Could not close connection (c). Nothing to do!" )

        except Exception, e:
            logger.error( "Error consuming topics [{}]".format(topics) )
            logger.debug( traceback.format_exc() )

        return None

I leveraged the suggested _on_assign syntax suggested in #11 (thanks again) if I decided to request --from-beginning upon startup.

def _on_assign (c, ps):
    """Resets the consumer offset"""
    for p in ps:
        p.offset=-2
    c.assign(ps)

Any ideas why it disconnected after an hour of inactivity after EOF? Thanks!

Auto Offset Reset for existing Consumer Group

Is there an equivalent to the --from-beginning flag in console consumer?

I'm migrating from kafka-python client to this one and in some initial tests, it's not behaving as expected. I start a consumer with group testgroup1 consuming topic test which I've populated with a dozen messages or so. I have auto.offset.reset set to smallest and expect it to replay the topic from beginning. The first time it plays all. I stop and restart it and it does not.

I assume that it starts back again with latest offset, but am confused of the purpose of auto.offset.reset values. I expect earliest | smallest to consume from beginning. I expect largest | latest to pick up where last offset.

import confluent_kafka as kafka

conf_consumer = {'bootstrap.servers': "localhost:9092",
        'group.id': "testgroup1",
        'default.topic.config': {'auto.offset.reset': 'smallest',
        'offset.store.sync.interval.ms': 5000}
       }

def consume(topics=['test'], group='testgroup2', conf=None):
    """Connects to topic and listens for incoming messages and prints them"""

    running = True

    if conf is not None and isinstance(conf, dict):
        if group:
            print "Setting consumer group to [{}]".format(group)
            conf['group.id'] = group

        try:
            c = kafka.Consumer(**conf)
        except:
            print "Error creating Consumer with config [{}]".format(conf)

        try:
            c.subscribe(topics)
        except:
            print "Error subscribing to topics [{}]".format(topics)
            c = None

        if c:
            print "Starting to poll topics [{}]...".format(topics)
            while running:
                try:
                    msg = c.poll()
                    if not msg.error():
                        print "Received message [{}]".format(msg.value().decode('utf-8'))
                    elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write( "{} [{}] reached end at offset {}".format(
                                        msg.topic(), msg.partition(), msg.offset()))
                    else:
                        print "Unknown error [{}]. Quitting...".format(msg.error())
                        raise kafka.KafkaException(msg.error())
                except:
                    running = False
                    print "Error polling messages."
            # end while running loop
        else:
            print "Consumer object missing. Nothing to do!"
    else:
        print "Missing configuration or not dictionary. Nothing to do!"

    try:
        c.close() # close connection
    except:
        print "Could not close connection (c). Nothing to do!"


def main():
    print "Consuming content..."
    consume(conf=conf_consumer)


if __name__ == '__main__':
    main()

If I run in shell the kafka-console-consumer and add the --from-beginning flag it will replay all messages every time. I checked the librdkafka config documentation to see if there's an extra flag I'm missing like offset.store.sync.interval thinking it has to reset or something.

What am I missing or is it possible to restart the script and re-run without changing the consumer group.id

Pick a better name :0

PSHHH hosting this under confluentinc/ makes total sense, but cmoon, this is a python librdkafka client! call this python-librdkafka!

:)

build librdkafka when installing confluent-kafka

currently when i do a pip install confluent-kafka, it fails because i haven't built librdkafka yet. it'd be nice if your setup.py could also build librdkafka to make the installation process easier. there are multiple internal services using our client that's a wrapper around confluent-kafka, and we'd like it if they could just do a simple pip install our-client. this is a little more difficult for us since we then have to automate the build process on our end.

basically i'm hoping you can do something similar to what librabbitmq (https://pypi.python.org/pypi/librabbitmq) is doing.

Install librdkafka dependency on Mac OSX

I was unable to install this via PIP on my Mac given the latest Homebrew formula for librdkafka was only up to 0.9 and I saw errors during PIP install. To solve I edited the Homebrew formula for librdkafka to download the latest version. After that, the PIP install worked without errors.

You can type:

  • brew edit librdkafka and then paste in content above
  • you might have to brew unlink librdkafka before installing new version
  • afterwords in your setup.py file add dependency confluent-kafka and then run
    • python setup.py install (or with sudo)

Good luck!

undefined symbol: rd_kafka_msg_partitioner_consistent

hi all, in my machine I tried to use conlfuent kafka python like this

first, I installed confluent kafka python
sudo pip install confluent-kafka

then I tried to use it. somehow I always get this error message
>>> from confluent_kafka import Producer Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/dist-packages/confluent_kafka/__init__.py", line 2, in <module> from .cimpl import * ImportError: /usr/local/lib/python2.7/dist-packages/confluent_kafka/cimpl.so: undefined symbol: rd_kafka_msg_partitioner_consistent

I guess it is because of librdkafka is not there, then I do git clone and make & install. but the error still appears

I also tried to git clone confluent-kafka-python instead of installing from apt-get, but it is still error

how to solve this problem?

Support regex topic pattern in consumer group topic subscribe?

I'm not seeing anything in the docs about how to pass a regex topic pattern for a consumer group subscription?

My coworker checked a few months back and didn't see it, but in this morning's blog post @jkreps said the Python client has feature parity (I assume with Java) so did we just miss it?

We're redoing our internal Kafka python wrapper and trying to decide whether to bet on the Confluent python library or the Kafka-python library.

Re-subscribe to a topic after disconnection/reconnection

Here is my test scenario. It's limited to the bare basics to isolate my problem

I have single kafka broker. Single topic, with a single partition.

Producer creates random messages to topic patricktest
Consumer reads the messages from topic patricktest

Producer

from confluent_kafka import Producer
from time import gmtime

p = Producer({'bootstrap.servers':'localhost'})
p.produce('patricktest','Kwak kwak I got data on %s' % gmtime())
p.flush()

Consumer

from confluent_kafka import Consumer, KafkaError
import sys
import time

c = Consumer({
  'bootstrap.servers':'localhost',
  'group.id':'patricklaptop',
  'default.topic.config': {'auto.offset.reset':'smallest'}
})

c.subscribe(['patricktest'])


while True:

  msg = c.poll(timeout=1.0)

  if msg is None:
    print "msg is None"
    #time.sleep(1)
    continue


  if msg.error():
    if msg.error().code() == KafkaError._PARTITION_EOF:
      # Not an error really
      sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
    else:
      print "raising exception"
      raise KafkaException(msg.error())
  else:
    # Good stuff here.
    print('Received message: %s' % msg.value().decode('utf-8'))

  print "end of while cycle"

c.close()

So as you can see, it's pretty basic. In my test scenario, I run the consumer, and it will read the messages, then print "msg is None" quite a bit, until I produce more messages by running the producer code.

Now here is the problem: if I stop kafka while the consumer is running, then restart kafka, the consumer will reconnect BUT it won't re-subscribe to the topic.
I have not found ANY WAY to know that I was disconnected. All I know is that poll returns None. That's it.

For now the workaround that I have found is to add this at the beginning of the loop.

last_resubscribe = 0
while True:
  # resub every 10sec just in case I got disconnected
  if time.time() - last_resubscribe > 10:
    print "resubscribing - just in case"
    c.subscribe(['patricktest'])
    last_resubscribe = time.time()

[...]

But sadly I also get this kind of thing every 10 seconds on kafka server:

kafka/server.out

[2016-11-04 22:26:27,851] INFO [GroupCoordinator 1]: Preparing to restabilize group patricklaptop with old generation 7 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:27,852] INFO [GroupCoordinator 1]: Stabilized group patricklaptop generation 8 (kafka.coordinator.GroupCoordinator)
[2016-11-04 22:26:28,114] INFO [GroupCoordinator 1]: Assignment received from leader for group patricklaptop for generation 8 (kafka.coordinator.GroupCoordinator)

Is there any way to catch the reconnect so that I can re-subscribe to a topic?
Or maybe have a way to query the consumer object to know my current status?
Or maybe have the client automatically resubscribe to the topics itself?

While my workaround is functional, it's pretty awful AND produces extra load/logs on the broker.
I'm suspecting that it would even trigger a full rebalance and other things if I had several consumers / partitions rather than this minimal one consumer / partition / topic minimal scenario...

Thanks!
--Patrick

subscribe doesn't seem to accept wildcard topic names

Is there anyway to subscribe to topics using a wildcard/regex using confluent-kafka-python? I know there are other clients (based on librdkafka) that support this functionality (e.g. kafka-python), but this one doesn't seem to.

Building on Linux

I am having some problems to run pip install confluent-kafka or python setup.py build, I already have installed common build dependencies (like librdkafka-dev librdkafka1 build-essential autoconf libtool python-dev)

Here is the log:

โ–ถ python setup.py build
running build
running build_py
creating build
creating build/lib.linux-x86_64-2.7
creating build/lib.linux-x86_64-2.7/confluent_kafka
copying confluent_kafka/__init__.py -> build/lib.linux-x86_64-2.7/confluent_kafka
creating build/lib.linux-x86_64-2.7/confluent_kafka/kafkatest
copying confluent_kafka/kafkatest/verifiable_producer.py -> build/lib.linux-x86_64-2.7/confluent_kafka/kafkatest
copying confluent_kafka/kafkatest/__init__.py -> build/lib.linux-x86_64-2.7/confluent_kafka/kafkatest
copying confluent_kafka/kafkatest/verifiable_consumer.py -> build/lib.linux-x86_64-2.7/confluent_kafka/kafkatest
copying confluent_kafka/kafkatest/verifiable_client.py -> build/lib.linux-x86_64-2.7/confluent_kafka/kafkatest
running build_ext
building 'confluent_kafka.cimpl' extension
creating build/temp.linux-x86_64-2.7
creating build/temp.linux-x86_64-2.7/confluent_kafka
creating build/temp.linux-x86_64-2.7/confluent_kafka/src
x86_64-linux-gnu-gcc -pthread -fno-strict-aliasing -DNDEBUG -g -fwrapv -O2 -Wall -Wstrict-prototypes -fPIC -I/usr/include/python2.7 -c confluent_kafka/src/confluent_kafka.c -o build/temp.linux-x86_64-2.7/confluent_kafka/src/confluent_kafka.o
In file included from confluent_kafka/src/confluent_kafka.c:17:0:
confluent_kafka/src/confluent_kafka.h:114:1: error: unknown type name โ€˜rd_kafka_topic_partition_list_tโ€™
 PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts);
 ^
confluent_kafka/src/confluent_kafka.h:115:1: error: unknown type name โ€˜rd_kafka_topic_partition_list_tโ€™
 rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
 ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜KafkaError_nameโ€™:
confluent_kafka/src/confluent_kafka.c:74:2: warning: implicit declaration of function โ€˜rd_kafka_err2nameโ€™ [-Wimplicit-function-declaration]
  return cfl_PyUnistr(_FromString(rd_kafka_err2name(self->code)));
  ^
confluent_kafka/src/confluent_kafka.c:74:2: warning: passing argument 1 of โ€˜PyUnicodeUCS4_FromStringโ€™ makes pointer from integer without a cast [enabled by default]
In file included from /usr/include/python2.7/Python.h:85:0,
                 from confluent_kafka/src/confluent_kafka.h:17,
                 from confluent_kafka/src/confluent_kafka.c:17:
/usr/include/python2.7/unicodeobject.h:281:31: note: expected โ€˜const char *โ€™ but argument is of type โ€˜intโ€™
 # define PyUnicode_FromString PyUnicodeUCS4_FromString
                               ^
/usr/include/python2.7/unicodeobject.h:477:23: note: in expansion of macro โ€˜PyUnicode_FromStringโ€™
 PyAPI_FUNC(PyObject*) PyUnicode_FromString(
                       ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜TopicPartition_newโ€™:
confluent_kafka/src/confluent_kafka.c:551:21: error: โ€˜RD_KAFKA_OFFSET_INVALIDโ€™ undeclared (first use in this function)
  long long offset = RD_KAFKA_OFFSET_INVALID;
                     ^
confluent_kafka/src/confluent_kafka.c:551:21: note: each undeclared identifier is reported only once for each function it appears in
confluent_kafka/src/confluent_kafka.c: At top level:
confluent_kafka/src/confluent_kafka.c:746:1: error: unknown type name โ€˜rd_kafka_topic_partition_list_tโ€™
 PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
 ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜c_parts_to_pyโ€™:
confluent_kafka/src/confluent_kafka.c:750:28: error: request for member โ€˜cntโ€™ in something not a structure or union
  parts = PyList_New(c_parts->cnt);
                            ^
confluent_kafka/src/confluent_kafka.c:752:26: error: request for member โ€˜cntโ€™ in something not a structure or union
  for (i = 0 ; i < c_parts->cnt ; i++) {
                          ^
confluent_kafka/src/confluent_kafka.c:753:3: error: unknown type name โ€˜rd_kafka_topic_partition_tโ€™
   const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
   ^
confluent_kafka/src/confluent_kafka.c:753:54: error: request for member โ€˜elemsโ€™ in something not a structure or union
   const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i];
                                                      ^
In file included from /usr/include/python2.7/Python.h:100:0,
                 from confluent_kafka/src/confluent_kafka.h:17,
                 from confluent_kafka/src/confluent_kafka.c:17:
confluent_kafka/src/confluent_kafka.c:756:12: error: request for member โ€˜topicโ€™ in something not a structure or union
      rktpar->topic, rktpar->partition,
            ^
/usr/include/python2.7/listobject.h:62:74: note: in definition of macro โ€˜PyList_SET_ITEMโ€™
 #define PyList_SET_ITEM(op, i, v) (((PyListObject *)(op))->ob_item[i] = (v))
                                                                          ^
confluent_kafka/src/confluent_kafka.c:756:27: error: request for member โ€˜partitionโ€™ in something not a structure or union
      rktpar->topic, rktpar->partition,
                           ^
/usr/include/python2.7/listobject.h:62:74: note: in definition of macro โ€˜PyList_SET_ITEMโ€™
 #define PyList_SET_ITEM(op, i, v) (((PyListObject *)(op))->ob_item[i] = (v))
                                                                          ^
confluent_kafka/src/confluent_kafka.c:757:12: error: request for member โ€˜offsetโ€™ in something not a structure or union
      rktpar->offset, rktpar->err));
            ^
/usr/include/python2.7/listobject.h:62:74: note: in definition of macro โ€˜PyList_SET_ITEMโ€™
 #define PyList_SET_ITEM(op, i, v) (((PyListObject *)(op))->ob_item[i] = (v))
                                                                          ^
confluent_kafka/src/confluent_kafka.c:757:28: error: request for member โ€˜errโ€™ in something not a structure or union
      rktpar->offset, rktpar->err));
                            ^
/usr/include/python2.7/listobject.h:62:74: note: in definition of macro โ€˜PyList_SET_ITEMโ€™
 #define PyList_SET_ITEM(op, i, v) (((PyListObject *)(op))->ob_item[i] = (v))
                                                                          ^
confluent_kafka/src/confluent_kafka.c: At top level:
confluent_kafka/src/confluent_kafka.c:769:1: error: unknown type name โ€˜rd_kafka_topic_partition_list_tโ€™
 rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
 ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜py_to_c_partsโ€™:
confluent_kafka/src/confluent_kafka.c:770:2: error: unknown type name โ€˜rd_kafka_topic_partition_list_tโ€™
  rd_kafka_topic_partition_list_t *c_parts;
  ^
confluent_kafka/src/confluent_kafka.c:779:2: warning: implicit declaration of function โ€˜rd_kafka_topic_partition_list_newโ€™ [-Wimplicit-function-declaration]
  c_parts = rd_kafka_topic_partition_list_new(PyList_Size(plist));
  ^
confluent_kafka/src/confluent_kafka.c:779:10: warning: assignment makes pointer from integer without a cast [enabled by default]
  c_parts = rd_kafka_topic_partition_list_new(PyList_Size(plist));
          ^
confluent_kafka/src/confluent_kafka.c:790:4: warning: implicit declaration of function โ€˜rd_kafka_topic_partition_list_destroyโ€™ [-Wimplicit-function-declaration]
    rd_kafka_topic_partition_list_destroy(c_parts);
    ^
confluent_kafka/src/confluent_kafka.c:794:3: warning: implicit declaration of function โ€˜rd_kafka_topic_partition_list_addโ€™ [-Wimplicit-function-declaration]
   rd_kafka_topic_partition_list_add(c_parts,
   ^
confluent_kafka/src/confluent_kafka.c:796:23: error: invalid type argument of โ€˜->โ€™ (have โ€˜intโ€™)
         tp->partition)->offset =
                       ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜producer_conf_set_specialโ€™:
confluent_kafka/src/confluent_kafka.c:915:13: error: โ€˜rd_kafka_msg_partitioner_consistentโ€™ undeclared (first use in this function)
      tconf, rd_kafka_msg_partitioner_consistent);
             ^
confluent_kafka/src/confluent_kafka.c:918:13: error: โ€˜rd_kafka_msg_partitioner_consistent_randomโ€™ undeclared (first use in this function)
      tconf, rd_kafka_msg_partitioner_consistent_random);
             ^
In file included from confluent_kafka/src/confluent_kafka.c:17:0:
confluent_kafka/src/confluent_kafka.c:945:6: error: โ€˜RD_KAFKA_RESP_ERR__NOT_IMPLEMENTEDโ€™ undeclared (first use in this function)
      RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED,
      ^
confluent_kafka/src/confluent_kafka.h:97:35: note: in definition of macro โ€˜cfl_PyErr_Formatโ€™
   PyObject *_eo = KafkaError_new0(err, __VA_ARGS__); \
                                   ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜common_conf_setupโ€™:
confluent_kafka/src/confluent_kafka.c:1108:2: warning: implicit declaration of function โ€˜rd_kafka_conf_set_default_topic_confโ€™ [-Wimplicit-function-declaration]
  rd_kafka_conf_set_default_topic_conf(conf, tconf);
  ^
confluent_kafka/src/confluent_kafka.c: In function โ€˜KafkaError_add_errsโ€™:
confluent_kafka/src/confluent_kafka.c:1174:2: warning: implicit declaration of function โ€˜rd_kafka_get_err_descsโ€™ [-Wimplicit-function-declaration]
  rd_kafka_get_err_descs(&descs, &cnt);
  ^
confluent_kafka/src/confluent_kafka.c:1210:3: error: invalid use of undefined type โ€˜struct rd_kafka_err_descโ€™
   if (!descs[i].desc)
   ^
confluent_kafka/src/confluent_kafka.c:1210:13: error: dereferencing pointer to incomplete type
   if (!descs[i].desc)
             ^
confluent_kafka/src/confluent_kafka.c:1213:3: error: invalid use of undefined type โ€˜struct rd_kafka_err_descโ€™
   code = PyLong_FromLong(descs[i].code);
   ^
confluent_kafka/src/confluent_kafka.c:1213:31: error: dereferencing pointer to incomplete type
   code = PyLong_FromLong(descs[i].code);
                               ^
confluent_kafka/src/confluent_kafka.c:1215:3: error: invalid use of undefined type โ€˜struct rd_kafka_err_descโ€™
   PyDict_SetItemString(dict, descs[i].name, code);
   ^
confluent_kafka/src/confluent_kafka.c:1215:35: error: dereferencing pointer to incomplete type
   PyDict_SetItemString(dict, descs[i].name, code);
                                   ^
confluent_kafka/src/confluent_kafka.c:1219:3: error: invalid use of undefined type โ€˜struct rd_kafka_err_descโ€™
   _PRINT("| %-*.*s | %-*.*s |\n"
   ^
confluent_kafka/src/confluent_kafka.c:1221:33: error: dereferencing pointer to incomplete type
          _COL1_W, _COL1_W, descs[i].name,
                                 ^
confluent_kafka/src/confluent_kafka.c:1188:43: note: in definition of macro โ€˜_PRINTโ€™
   _len = snprintf(tmpdoc, sizeof(tmpdoc), __VA_ARGS__); \
                                           ^
confluent_kafka/src/confluent_kafka.c:1219:3: error: invalid use of undefined type โ€˜struct rd_kafka_err_descโ€™
   _PRINT("| %-*.*s | %-*.*s |\n"
   ^
confluent_kafka/src/confluent_kafka.c:1222:33: error: dereferencing pointer to incomplete type
          _COL2_W, _COL2_W, descs[i].desc,
                                 ^
confluent_kafka/src/confluent_kafka.c:1188:43: note: in definition of macro โ€˜_PRINTโ€™
   _len = snprintf(tmpdoc, sizeof(tmpdoc), __VA_ARGS__); \
                                           ^
error: command 'x86_64-linux-gnu-gcc' failed with exit status 1```

Timestamp is not generated in the key for 0.10 compatibility

the producer doesn't seem to add a timestamp to the producer record when working with 0.10.0.0 protocol. Actually I couldn't find a way to specify the protocol to use.

Part of Kafka Config:

inter.broker.protocol.version=0.10.0.0
log.message.format.version=0.10.0.0
log.message.timestamp.type=CreateTime

Error from KafkaStreams:

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)

This means when I have a Kafka Streams application running, I need to use a WallClockTimestampExtractor which is not ideal.
Not sure if the problem is on python client side or librdkafka side.

Expose error_cb to Python

Proposed Enhancement:

For both the Consumer and Producer, it seems necessary to expose the error_cb in the Python API so that Consumer and Producer code can deal with errors, especially w.r.t. connections with Kafka.

For example, consider the Consumer pseudo code:

while True:
    msg = consumer.poll(1.0)
    if not msg:
        logger.info('No messages found in polling window')
        continue
    if msg.error():
        logger.exception('Received error', error=msg.error())
        raise confluent_kafka.KafkaException(msg.error())

This code will run forever, even if the consumer loses connection to the brokers, which isn't always what you want. In my case, I'd like to fail if the error has persisted for the past 5 minutes.

It seems like it wouldn't be too much work to do something similar to what is done for the on_revoke and on_assign callbacks, but not sure about the intricacies of the librdkafka's error_cb.

This is blocking us from adopting confluent-kafka-python more broadly (or at all).

Thanks for your time.

confluent-kafka 0.9.2 does not provide the extra 'avro'

while install like below:

C_INCLUDE_PATH=/PATH/TO/librdkafka/0.9.1/include LIBRARY_PATH=/PATH/librdkafka/0.9.1/lib pip install --user --upgrade confluent-kafka[avro]

I got following error:

You are using pip version 7.0.3, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Collecting confluent-kafka[avro]
Using cached confluent-kafka-0.9.2.tar.gz
confluent-kafka 0.9.2 does not provide the extra 'avro'

Can you help?

what's the behavior of the client when one of the brokers is down?

I have a question regarding 0 downtime scenario.
Let's say I have 8 kafka nodes in a cluster.
A topic with 100 partitions. The partition leaders are on different nodes.
When I take one node down for OS patch, what will happen to the publisher and consumer?
The publisher and consumer both have a list of the 8 brokers.
Will they encounter errors when they try to publish and consume?
I can see that publisher and consumer can talk to any node in the cluster to find out which node
has the partition leader for the data they try to write or read. But if the node happened to be the partition leader will there be any downtime (perceived by publisher/subscriber) due to rebalancing (leader re-election, or preferred replica election and the partition rebalancing)?

AttributeError: 'cimpl.Message' object has no attribute 'timestamp'

We encounter issues when running an integration_test.py on macOS. They could be related to the incompatibility with the c implementation of Kafka in librdkafka and the python wrapper.

System components:
OS: Sierra 10.12.1
Python 2.7.12 (v2.7.12:d33e0cf91556, Jun 26 2016, 12:10:39)
librdkafka: stable 0.9.2 (bottled)
confluent-kafka (0.9.2)

Issue 1:
('==============================', 'Verifying Consumer', '==============================') Traceback (most recent call last): File "/Users/drazen/Documents/audi/workspace/confluent-kafka-python-master/examples/integration_test.py", line 465, in <module> verify_consumer() File "/Users/drazen/Documents/audi/workspace/confluent-kafka-python-master/examples/integration_test.py", line 259, in verify_consumer tstype, timestamp = msg.timestamp() AttributeError: 'cimpl.Message' object has no attribute 'timestamp'

in code snippet ...

tstype, timestamp = msg.timestamp()

Issue 2:
('==============================', 'Verifying stats_cb', '==============================') Traceback (most recent call last): File "/Users/drazen/Documents/audi/workspace/confluent-kafka-python-master/examples/integration_test.py", line 477, in <module> verify_stats_cb() File "/Users/drazen/Documents/audi/workspace/confluent-kafka-python-master/examples/integration_test.py", line 393, in verify_stats_cb c = confluent_kafka.Consumer(**conf) cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "stats_cb" must be set through dedicated .._set_..() function"}

in code snippet ...

conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 200,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}
c = confluent_kafka.Consumer(**conf)

Once the conflicting code is commented out, the rest of example will run through.

Finalization kills flush

When calling flush() on a producer and then immediately dereferencing the object, the buffer does not actually get flushed.

# send 100 messages with producer

producer.flush()
producer = None

Acknowledgement is set to none. Placing a sleep after producer.flush() allows messages to be flushed. Placing a sleep after producer = None does not allow messages to be flushed.

Note: This may only occur on fast hardware.

Is there any way to gather producer metrics?

I dont see stats_cb being exposed. Are there any plans/works to expose stats_cb so that we can consume metrics? Also possible to see a sample(types of metrics) of these metrics exposed?

.cimpl problems

I'm new to this package.

I just did:

sudo apt-get install librdkafka1
sudo apt-get install librdkafka-dev
sudo apt-get install libssl-dev
sudo apt-get install liblz4-dev
sudo apt-get install libsasl2-dev
git clone https://github.com/confluentinc/confluent-kafka-python.git
cd confluent-kafka-python; ./configure ; make ; sudo make install

This seemed to install everything into /usr/local/

sudo pip install confluent-kafka

ubuntu$ python
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.

from confluent_kafka import Consumer, KafkaError
Traceback (most recent call last):
File "", line 1, in
File "confluent_kafka/init.py", line 2, in
from .cimpl import *
ImportError: No module named cimpl

ImportError: librdkafka.so.1: cannot open shared object file: No such file or directory

Ubuntu 14 here, pip==7.1.2, setuptools==18.3.2, virtualenv==13.1.2.
First, I want to build latest stable (seems it's 0.9.2) librdkafka into /opt/librdkafka.

curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.9.2 | tar xzf - -C /tmp/
cd /tmp/librdkafka-0.9.2/
./configure --prefix=/opt/librdkafka
make
sudo make install
cd -
rm -r /tmp/librdkafka-0.9.2

I have a virtualenv and want to install latest stable confluent-kafka.

source venv/bin/activate
$ C_INCLUDE_PATH=/opt/librdkafka/include LIBRARY_PATH=/opt/librdkafka/lib pip install confluent-kafka
Collecting confluent-kafka
  Using cached confluent-kafka-0.9.2.tar.gz
Building wheels for collected packages: confluent-kafka
  Running setup.py bdist_wheel for confluent-kafka
  Stored in directory: /home/f/.cache/pip/wheels/blabla
Successfully built confluent-kafka
Installing collected packages: confluent-kafka
Successfully installed confluent-kafka-0.9.2

But then a test script fails:

$ python ktest.py 
Traceback (most recent call last):
  File "ktest.py", line 1, in <module>
    from confluent_kafka import Producer
  File "/home/bla/proj/bla/venv/local/lib/python2.7/site-packages/confluent_kafka/__init__.py", line 2, in <module>
    from .cimpl import *
ImportError: librdkafka.so.1: cannot open shared object file: No such file or directory

Am I doing something wrong?

Segmentation fault while running integration test.

I have installed kafka (kafka_2.11-0.10.0.0), confluent-kafka-python and librdkafka properly.
If I run the integration test without testing consumer it works fine. However with "verify_consumer()" it fails with error Segmentation fault (core dumped)

Consumer fails for a Kafka Cluster with large number of topics.

%4|1479929770.581|PROTOERR|rdkafka#consumer-1| 10.64.64.59:31000/bootstrap: Protocol parse failure at rd_kafka_parse_Metadata:1582
%4|1479929770.581|PROTOERR|rdkafka#consumer-1| 10.64.64.59:31000/bootstrap: TopicMetadata_cnt 17202 > TOPICS_MAX 10000
%4|1479929771.021|PROTOERR|rdkafka#consumer-1| 10.64.64.59:31000/1: Protocol parse failure at rd_kafka_parse_Metadata:1582
%4|1479929771.021|PROTOERR|rdkafka#consumer-1| 10.64.64.59:31000/1: TopicMetadata_cnt 17202 > TOPICS_MAX 10000

Producer: different callback per message

Version: librdkafka 0.9.1 and last pip confluent lib

from confluent_kafka import Producer

p = Producer({'bootstrap.servers':'kafkahost'})

def callback(err,msg,k):
  print "callback received k: %s and v: %s" % (k,msg.value())

data = {
  'k1':'v1',
  'k2':'v2',
  'k3':'v3',
}

for k,v in data.iteritems():
  print "sending k: %s" % k
  p.produce('patricktest', v, callback=lambda err,msg: callback(err,msg,k))

p.flush()

The result is:

Mac:~$ python producertest.py
sending k: k3
sending k: k2
sending k: k1
callback received k: k1 and v: v3
callback received k: k1 and v: v2
callback received k: k1 and v: v1

and I would expect in the three last lines to be k3 k2 k1 rather than three times k1

After looking at the source code, it seems that each time a callback is defined, it's defined as a global parameter for the whole producer, rather than per message.

This breaks the general expectation that if you give a callback to a function/method, it's THIS function which will be called. I didn't find a workaround either to give arbitrary extra data to the callback - which was what I attempted via this intermediate lambda.

--Thanks

Consumer poll() does not always return

I'm running into an odd scenario with Consumer class implementation where sometimes the poll() return is null or empty. I would expect this to be one of the object with message, or object with error (sometimes just EOF for partition), but never empty or null?

Here is my code and out of 25,000 messages in topic, just under 2,000 warnings in log that poll() returned no msg object. What conditions would this occur or did I implement incorrectly below?

                while running:
                    try:
                        msg = c.poll()
                        if msg:
                            if not msg.error():
                                processor(msg)
                            elif msg.error().code() == kafka.KafkaError._PARTITION_EOF:
                                # End of partition event
                                logger.debug( "{} [{}] reached end at offset {}".format(
                                                msg.topic(), msg.partition(), msg.offset()) )
                                logger.debug( "Consumed [{}] records so far (reached end of offset)".format(self.consumed) )
                            else:
                                logger.debug( "Unknown error [{}] during polling.".format(msg.error()) )
                                raise kafka.KafkaException(msg.error())

                            self.consumed += 1
                            if (self.consumed % 100) == 0:
                                logger.debug( "Consumed [{}] records since startup.".format(self.consumed) )
                        else:
                            logger.warn( "No message object returned on poll() [{}]".format(msg) )

                    except Exception, e:
                        logger.warn( "Error polling messages [{}]".format(str(e)) )
                        logger.debug( traceback.format_exc() )
                        # we do not stop loop because some polling errors anticipated
                        # https://github.com/confluentinc/confluent-kafka-python/issues/12
                # end while running loop

In logs:

2016-07-12 15:09:25,331 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,332 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,342 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,343 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,344 - DEBUG - stream_consumer.pyc - Consumed [24800] records since startup.
2016-07-12 15:09:25,344 - WARNING - stream_consumer.pyc - No message object returned on poll()
2016-07-12 15:09:25,345 - WARNING - stream_consumer.pyc - No message object returned on poll()

Access to watermark offsets API in python

I just ran across confluentinc/librdkafka#503 after spending some time hacking in a solution using the available Python APIs that I'm too embarrassed to discuss publicly. It works fine for initial offset queries, but is not sufficient for keeping track of the current highwater mark during consumption. It would be great to get access to these APIs in Python. Is this on the roadmap anywhere?

I would prefer not to hack this in myself because I'm not quite sure how to correctly manage the memory for those int64_t * output parameters (low and high) between C and Python, as I'm not proficient with the Python C-API. I would probably adapt the interface to return a tuple of low and high and raise an exception in the case of error, but I don't know if that is the best way to proceed.

New tag please!

a2fbbf6 fixed a bug that kept confluent-kafka-python from working for me. I'd like to deploy this in production, but would appreciate having a tagged release that includes this change from which to build .deb packages from.

Producer not thread-safe

I am receiving these errors frequently:

Fatal Python error: PyEval_RestoreThread: NULL tstate
Aborted (core dumped)

As well as

Fatal Python error: ceval: tstate mix-up
Aborted (core dumped)

Both cause core dumps.

I see one potential solution is to include PyEval_InitThreads(); before using threads, which I will try.

After calling close, consumers issue periodic metadata requests

If you create a consumer, connect, read some data, and then close but don't destroy every reference to the consumer, it will continue issuing metadata requests every five minutes.

I'm assuming this is because you are calling rd_kafka_consumer_close in the python close() method, but not calling rd_kafka_destroy until the consumer object is deallocated. It seems like once you call close(), the Consumer instance cannot be used for anything - is there a reason not to call rd_kafka_destroy in close()?

BufferError [Local] Queue full for producer even after changing librdkafka config

Ubuntu 14.04.4 LTS / Python 2.7.x / Kakfa 0.10 (Confluent Platform 3) / Python client (latest)

  • app servers: 4 core, 32GB RAM, SATA (running python scripts)
  • db servers: 8 core, 64GB RAM, SSD (5-node Kafka/Cassandra cluster + 3-node ES cluster)
  • 10Gb/s private NIC and bare metal stack

I'm seeing a large number of BufferError [Local] Queue full errors in logs for Producer client. I searched for the error yesterday and saw an issue from 2014 for librdkafka that was resolved by changing a few configuration parameters. I posted in this issue and changed my config and initial errors went away but as the program ran overnight, a flood of errors filled the logs. Out of 500,000 messages consumed from the topics, I'm missing over 100,000 in the subsequent topic.

I have a python stream processor that instantiates both Consumer and Producer classes and consumes from 6 topics, performing diff operation/upsert against matching record if exists in Cassandra cluster, and then publishing diff'ed object to another topic (...ListingEditEvent). When it tries to publish to the subsequent topic, messages are getting lost. Transformer program picks up from the ListingEditEvent topic and converts to our schema and publishes to ListingEditTransformed topic for Logstash consumption to Elasticsearch. I'm seeing differences in the records in ES compared to Kafka topics and trying to resolve. I appreciate any tips on how to solve or better configuration values.

I edited the config for Producer client to the following:

            conf = {
                'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
                'queue.buffering.max.messages': 500000, # is this too small?
                'queue.buffering.max.ms': 60000, # is this too long?
                'batch.num.messages': 100, # is this too small?
                'log.connection.close': False,
                'client.id': socket.gethostname(),
                'default.topic.config': {'acks': 'all'}
            }

I'm thinking of reducing the max time and increasing max messages, perhaps reduce to 5000ms, and 250 batch size, and 1 million max?

Errors not constant so must just exceed buffer as it's processing and recover and then exceed again:

2016-07-07 09:58:42,952 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160002361]
2016-07-07 10:02:55,094 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:02:55,106 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:02:55,189 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160010014]
2016-07-07 10:02:55,199 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160010014]
2016-07-07 10:02:57,466 - DEBUG - diff_processor.py - Error with lat [None], lon [None] for listing [160009744]
2016-07-07 10:02:57,475 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009744]
2016-07-07 10:08:03,292 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:121]
2016-07-07 10:08:03,311 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:9]
2016-07-07 10:08:04,807 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:1549]
2016-07-07 10:08:04,822 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:8199]
2016-07-07 10:08:08,017 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009089]
2016-07-07 10:08:09,728 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:140009614]
2016-07-07 10:13:17,459 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009935]
2016-07-07 10:13:17,468 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009935]
2016-07-07 10:13:17,541 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160009962]
2016-07-07 10:13:17,550 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160009962]
2016-07-07 10:13:17,565 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160010015]
2016-07-07 10:18:25,977 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160004679]
2016-07-07 10:18:25,985 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160004679]
2016-07-07 10:18:26,012 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160007175]
2016-07-07 10:18:26,021 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160007175]
2016-07-07 10:18:26,044 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.ListingEditEvent]:[None]-[nnrmls:160008663]
2016-07-07 10:18:26,053 - ERROR - stream_producer.pyc - BufferError publishing topic [rets.nnrmls.PhotoEditEvent]:[None]-[nnrmls:160008663]

My producer class doesn't call flush() like your example client since the calling module connects and keeps publishing. I also don't call poll(0) like example but unsure if that matters???

segfault after KeyboardInterrupt during consumer.poll(n)

confluent-kafka==0.9.2, librdkafka 0.9.2

from confluent_kafka import Consumer, KafkaError

print 'consuming'
c = Consumer(**{
	'client.id': 'test-client',
	'bootstrap.servers': 'localhost',
	'group.id': 'test-group',
	'enable.auto.commit': False,
	'enable.auto.offset.store': True,
	'offset.store.method': 'broker',
	'enable.partition.eof': True,
	'default.topic.config': {'auto.offset.reset': 'earliest'},
})
c.subscribe(['test'])
while True:
    msg = c.poll(1)

I start the script, press ctrl-c, and get

$ python ktest.py
consuming
^CTraceback (most recent call last):
  File "ktest.py", line 84, in <module>
    msg = c.poll(1)
KeyboardInterrupt
Segmentation fault (core dumped)

Without a timeout the script doesn't interrupt on ctrl-c at all (this probably needs a separate issue, not sure yet)

Producer memory leak with delivery.report.only.error

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, Ubuntu 14.04

I run a web app in 16 gunicorn workers (processes). Each has a producer, which works continually until the process is restarted (normally by deploy system). This instance serves ~100-250 req/s (some part of which produce to kafka) and has ~3.8gb or memory. After switching to confluent-kafka and running it for some time in production I'm observing worker RSS memory only growing and growing (it was not the case before with kafka-python). Here is a screenshot of memory monitoring graph:
mem monitoring
The code is roughly like this (so I poll(0) after each produce):

def error_cb (err):
    logger.error('kafka producer error_cb: {}: %s'.format(err.name()), err.str())
def on_delivery (err, msg):
    assert err
    logger.error('kafka producer on_delivery {}'.format(err.name()))
_producer = confluent_kafka.Producer(**{
    'client.id': 'qwe_producer',
    'bootstrap.servers': bootstrap_servers, #3 brokers
    'log.connection.close': False,
    'api.version.request': True,

    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.kbytes': 4000000,
    'queue.buffering.max.ms': 1000,
    'message.send.max.retries': 9000,
    'batch.num.messages': 10000,
    'delivery.report.only.error': True,

    'default.topic.config': {
        'request.required.acks': -1,
    },
    'error_cb': error_cb,
    'on_delivery': on_delivery,
})
atexit.register(_producer.flush)

def kafka_async_send (topic, value):
    _producer.produce(topic, cPickle.dumps(value, 2))
    _producer.poll(0)

def some_request_handler (request):
    #...
    kafka_async_send('foo', ('some', 'stuff'))
    #...

What should I do now? Do you probably have a suggestion how to debug it? (preferably right in production - it is kinda testing production so any overhead is currently affordable)

"Commit failed: Local: No offset stored" + messages commited anyway

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, kafka 0.10.1.0
It's me again..
I had a clean new topic 'test', in which I've produced some messages.
I have a consumer, which is configured with 'enable.auto.commit': False, 'enable.auto.offset.store': True, 'default.topic.config': {'auto.offset.reset': 'smallest'}. I'm manually assigning a partiotion via assign. I consme messages until _PARTITION_EOF, then .commit(async = False) and finally: .close().
On the first run I see printed recieved messages and the following:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
Received message: sup
...
_PARTITION_EOF
commiting
closing
**on_commit None [TopicPartition{topic=test,partition=0,offset=27,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 118, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

Moreover, I run the script again:

consuming
assigning
getting position
[TopicPartition{topic=test,partition=0,offset=-1001,error=None}] -1001
polling
_PARTITION_EOF
commiting
closing
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
**on_commit KafkaError{code=_NO_OFFSET,val=-168,str="(null)"} [TopicPartition{topic=test,partition=0,offset=-1001,error=None}]
closed
Traceback (most recent call last):
  File "ktest.py", line 118, in <module>
    c.commit(async = False)
cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}

There were no messages, which means the offset was actually commited. Also why on_commit was called two times?

TopicPartition_str0 broken on Mac OS X

This one is minor but annoying...

On Mac OS X 10.10.5, Python 2.7.9:

>>> from confluent_kafka import TopicPartition
>>> print TopicPartition('topic')
TopicPartition{topic=topic,partition=-1,offset=%lld,error=%s}

On Ubuntu Python 2.7.6

>>> from confluent_kafka import TopicPartition
>>> print TopicPartition('topic')
TopicPartition{topic=topic,partition=-1,offset=-1001,error=None}

I tried to debug this a little and the issue seems to be that mac generates "%lld" and ubuntu generates "%ld" from "PRId64". PyUnicode_FromFormat (v2) does not support "%lld" (was added in v3.2)

mac$ /usr/bin/clang -E -fno-strict-aliasing -fno-common -dynamic -arch x86_64 -g -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -I/Library/Frameworks/Python.framework/Versions/2.7/include/python2.7 -c confluent_kafka/src/confluent_kafka.c |grep "TopicPartition{"
 ret = PyUnicodeUCS2_FromFormat("TopicPartition{topic=%s,partition=%""d" ",offset=%""ll" "d"",error=%s}", self->topic, self->partition, self->offset, errstr ? PyString_AsString(PyUnicodeUCS2_AsUTF8String(errstr)) : "None");

ubuntu$ x86_64-linux-gnu-gcc -E -pthread -fno-strict-aliasing -DNDEBUG -g -fwrapv -O2 -Wall -Wstrict-prototypes -fPIC -I/usr/include/python2.7 -c confluent_kafka/src/confluent_kafka.c|grep "TopicPartition{"
 ret = PyUnicodeUCS4_FromFormat("TopicPartition{topic=%s,partition=%""d" ",offset=%""l" "d"",error=%s}", self->topic, self->partition, self->offset, errstr ? PyString_AsString(PyUnicodeUCS4_AsUTF8String(errstr)) : "None")

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.