forked from docs/doc-exports
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
7.0 KiB
7.0 KiB
Python
This section describes how to access a Kafka premium instance using a Kafka client in Python on the Linux CentOS, including how to install the client, and produce and consume messages.
Before getting started, ensure that you have collected the information listed in Collecting Connection Information.
Preparing the Environment
- Python
Generally, Python is pre-installed in the system. Enter python in a CLI. If the following information is displayed, Python has already been installed.
[root@ecs-test python-kafka]# python3 Python 3.7.1 (default, Jul 5 2020, 14:37:24) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
If Python is not installed, run the following command:
yum install python
- Kafka clients in Python
Run the following command to install a Python client of the recommended version:
pip install kafka-python==2.0.1
Producing Messages
- With SASL
from kafka import KafkaProducer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information." context.load_verify_locations("phy_ca.crt") print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
- Without SASL
from kafka import KafkaProducer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', } print('start producer') producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers']) data = bytes("hello kafka!", encoding="utf-8") producer.send(conf['topic_name'], data) producer.close() print('end producer')
Consuming Messages
- With SASL
from kafka import KafkaConsumer import ssl ##Connection information conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic_name', 'sasl_plain_username': 'username', 'sasl_plain_password': 'password', 'consumer_id': 'consumer_id' } context = ssl.create_default_context() context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.verify_mode = ssl.CERT_REQUIRED ## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information." context.load_verify_locations("phy_ca.crt") print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id'], sasl_mechanism="PLAIN", ssl_context=context, security_protocol='SASL_SSL', sasl_plain_username=conf['sasl_plain_username'], sasl_plain_password=conf['sasl_plain_password']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')
- Without SASL
Replace the information in bold with the actual values.
from kafka import KafkaConsumer conf = { 'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"], 'topic_name': 'topic-name', 'consumer_id': 'consumer-id' } print('start consumer') consumer = KafkaConsumer(conf['topic_name'], bootstrap_servers=conf['bootstrap_servers'], group_id=conf['consumer_id']) for message in consumer: print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value)) print('end consumer')