Sample Kafka Python Producer and Consumer

This topic provides kafka-python examples with SASL and SSL client configurations for Apache Kafka Wire Protocol Service.

  1. If you have not done so already, install kafka-python:
    pip3 install kafka-python
            $pip3 install kafka-python
            $pip3 list | grep kafka-python
  2. Save the Sample Kafka Python Producer and Consumer code in a file with the following name:
    • SASL: saslPlaintextKafkaPythonClient.py
    • SSL: sslPlaintextKafkaPythonClient.py
  3. Run the sample code with the following command:
    NOTE When running the SASL sample code, you must fill in the username and password. When running the SSL sample code, you must fill the ssl_password from the file /opt/mapr/conf/store-passwords.txt.
    python3 saslPlaintextKafkaPythonClient.py <broker> <topicName>
    For example:
    python3 saslPlaintextKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SASL
    python3 sslPlaintextKafkaPythonClient.py <broker> <topicName>
    For example:
    python3 sslPlaintextKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SSL

Sample Kafka Python Producer and Consumer

from kafka import KafkaProducer
from kafka import KafkaConsumer
import sys
import time

#Specify 'broker' and 'topic' arguments. Example: 'python3 saslPlaintestKafkaPythonClient.py localhost:9092 topicTestKafkaPythonClient_SASL'
server = sys.argv[1]
print("BROKER: " + server)
topic = sys.argv[2]
print("TOPIC: " + topic)

#PRODUCER
print("\n**Starting Producer**")
producer=KafkaProducer(bootstrap_servers=[server],
                        security_protocol='SASL_PLAINTEXT',
                        sasl_mechanism='PLAIN',
                        sasl_plain_username='<username>',
                        sasl_plain_password='<user-password>')

numMsgProduced = 0
for _ in range(100):
    producer.send(topic, b'msg')
    numMsgProduced += 1
producer.flush()
print("Messages produced: " + str(numMsgProduced))
time.sleep(2)

# CONSUMER
print("\n**Starting Consumer**")
consumer = KafkaConsumer(bootstrap_servers=[server],
                         auto_offset_reset='earliest',
                        security_protocol='SASL_PLAINTEXT',
                        sasl_mechanism='PLAIN',
                        sasl_plain_username='<username>',
                        sasl_plain_password='<user-password>')

consumer.subscribe(topic)
numMsgConsumed = 0
for _ in range(10):
    records = consumer.poll(timeout_ms=500)
    for topic_data, consumer_records in records.items():
        for consumer_record in consumer_records:
#            print("Received message: " + str(consumer_record.value.decode('utf-8')))
            numMsgConsumed += 1
print("Messages consumed: " + str(numMsgConsumed))
from kafka import KafkaProducer
from kafka import KafkaConsumer
import sys
import time

#Specify 'broker' and 'topic' arguments. Example 'python3 sslPlaintestKafkaPythonClientJSK.py localhost:9092 topicTestKafkaPythonClient_SSL'
server = sys.argv[1]
print("BROKER: " + server)
topic = sys.argv[2]
print("TOPIC: " + topic)

#PRODUCER
print("\n**Starting Producer**")
producer=KafkaProducer(bootstrap_servers=[server],
                        security_protocol='SSL',
                        ssl_check_hostname=False,
                        ssl_password='<>', #from /opt/mapr/conf/store-passwords.txt ssl.server.keystore.password
                        ssl_cafile='/opt/mapr/conf/ssl_truststore.pem',
                        ssl_certfile='/opt/mapr/conf/ssl_keystore-signed.pem',
                        ssl_keyfile='/opt/mapr/conf/ssl_keystore.pem')

numMsgProduced = 0
for _ in range(100):
    producer.send(topic, b'msg')
    numMsgProduced += 1
producer.flush()
print("Messages produced: " + str(numMsgProduced))
time.sleep(2)

# CONSUMER
print("\n**Starting Consumer**")
consumer = KafkaConsumer(bootstrap_servers=[server],
                        auto_offset_reset='earliest',
                        security_protocol='SSL',
                        ssl_check_hostname=False,
                        ssl_password='<>', #from /opt/mapr/conf/store-passwords.txt ssl.server.keystore.password
                        ssl_cafile='/opt/mapr/conf/ssl_truststore.pem',
                        ssl_certfile='/opt/mapr/conf/ssl_keystore-signed.pem',
                        ssl_keyfile='/opt/mapr/conf/ssl_keystore.pem')

consumer.subscribe(topic)
numMsgConsumed = 0
for _ in range(10):
    records = consumer.poll(timeout_ms=500)
    for topic_data, consumer_records in records.items():
        for consumer_record in consumer_records:
#            print("Received message: " + str(consumer_record.value.decode('utf-8')))
            numMsgConsumed += 1
print("Messages consumed: " + str(numMsgConsumed))