forked from docs/doc-exports
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
117 lines
7.0 KiB
HTML
117 lines
7.0 KiB
HTML
<a name="kafka-python"></a><a name="kafka-python"></a>
|
|
|
|
<h1 class="topictitle1">Python</h1>
|
|
<div id="body1557124185655"><p id="kafka-python__p8060118">This section describes how to access a Kafka premium instance using a Kafka client in Python on the Linux CentOS, including how to install the client, and produce and consume messages.</p>
|
|
<p id="kafka-python__p465419346576">Before getting started, ensure that you have collected the information listed in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<div class="section" id="kafka-python__section134111182153"><h4 class="sectiontitle">Preparing the Environment</h4><ul id="kafka-python__ul1778112019164"><li id="kafka-python__li1078620151610">Python<p id="kafka-python__p7501123131611"><a name="kafka-python__li1078620151610"></a><a name="li1078620151610"></a>Generally, Python is pre-installed in the system. Enter <strong id="kafka-python__b24491343123118">python</strong> in a CLI. If the following information is displayed, Python has already been installed.</p>
|
|
<pre class="screen" id="kafka-python__screen18357172916175">[root@ecs-test python-kafka]# python3
|
|
Python 3.7.1 (default, Jul 5 2020, 14:37:24)
|
|
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
|
|
Type "help", "copyright", "credits" or "license" for more information.
|
|
>>></pre>
|
|
<p id="kafka-python__p1674175419467">If Python is not installed, run the following command:</p>
|
|
<p id="kafka-python__p1139872261712"><strong id="kafka-python__b193940597467">yum install python</strong></p>
|
|
</li><li id="kafka-python__li87882017163">Kafka clients in Python<p id="kafka-python__p7700213612"><a name="kafka-python__li87882017163"></a><a name="li87882017163"></a>Run the following command to install a Python client of the recommended version:</p>
|
|
<p id="kafka-python__p127353116543"><strong id="kafka-python__b7294122914547">pip install kafka-python==2.0.1</strong></p>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-python__section175541107415"><h4 class="sectiontitle">Producing Messages</h4><div class="note" id="kafka-python__note1681753756"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="kafka-python__p18426113152515">Replace the following information in bold with the actual values.</p>
|
|
</div></div>
|
|
<ul id="kafka-python__ul1650483517910"><li id="kafka-python__li1850418358910">With SASL<pre class="screen" id="kafka-python__screen1273974420157">from kafka import KafkaProducer
|
|
import ssl
|
|
##Connection information
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b18626113512615">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b1063392431614">topic_name</strong>',
|
|
'sasl_plain_username': '<strong id="kafka-python__b3120193791617">username</strong>',
|
|
'sasl_plain_password': '<strong id="kafka-python__b143551640181611">password</strong>'
|
|
}
|
|
|
|
context = ssl.create_default_context()
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
|
|
context.load_verify_locations("<strong id="kafka-python__b5602124191716">phy_ca.crt</strong>")
|
|
|
|
print('start producer')
|
|
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
|
|
sasl_mechanism="PLAIN",
|
|
ssl_context=context,
|
|
security_protocol='SASL_SSL',
|
|
sasl_plain_username=conf['sasl_plain_username'],
|
|
sasl_plain_password=conf['sasl_plain_password'])
|
|
|
|
data = bytes("hello kafka!", encoding="utf-8")
|
|
producer.send(conf['topic_name'], data)
|
|
producer.close()
|
|
print('end producer')</pre>
|
|
</li><li id="kafka-python__li185041335193">Without SASL<pre class="screen" id="kafka-python__screen15854105662114">from kafka import KafkaProducer
|
|
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b319324744718">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b10131521162210">topic-name</strong>',
|
|
}
|
|
|
|
print('start producer')
|
|
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
|
|
|
|
data = bytes("hello kafka!", encoding="utf-8")
|
|
producer.send(conf['topic_name'], data)
|
|
producer.close()
|
|
print('end producer')</pre>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-python__section965662719339"><h4 class="sectiontitle">Consuming Messages</h4><ul id="kafka-python__ul11656102733319"><li id="kafka-python__li3656627113319">With SASL<pre class="screen" id="kafka-python__screen10656142716334">from kafka import KafkaConsumer
|
|
import ssl
|
|
##Connection information
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b548515934715">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b12657162703310">topic_name</strong>',
|
|
'sasl_plain_username': '<strong id="kafka-python__b17657122714335">username</strong>',
|
|
'sasl_plain_password': '<strong id="kafka-python__b765719271337">password</strong>',
|
|
'consumer_id': '<strong id="kafka-python__b265792733316">consumer_id</strong>'
|
|
}
|
|
|
|
context = ssl.create_default_context()
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
## Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
|
|
context.load_verify_locations("<strong id="kafka-python__b1765782763312">phy_ca.crt</strong>")
|
|
|
|
print('start consumer')
|
|
consumer = KafkaConsumer(conf['topic_name'],
|
|
bootstrap_servers=conf['bootstrap_servers'],
|
|
group_id=conf['consumer_id'],
|
|
sasl_mechanism="PLAIN",
|
|
ssl_context=context,
|
|
security_protocol='SASL_SSL',
|
|
sasl_plain_username=conf['sasl_plain_username'],
|
|
sasl_plain_password=conf['sasl_plain_password'])
|
|
|
|
for message in consumer:
|
|
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
|
|
|
|
print('end consumer')</pre>
|
|
</li><li id="kafka-python__li2657112753312">Without SASL<p id="kafka-python__p31571915183013"><a name="kafka-python__li2657112753312"></a><a name="li2657112753312"></a>Replace the information in bold with the actual values.</p>
|
|
<pre class="screen" id="kafka-python__screen1876614210300">from kafka import KafkaConsumer
|
|
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b98773117489">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b195221955163019">topic-name</strong>',
|
|
'consumer_id': '<strong id="kafka-python__b5627977314">consumer-id</strong>'
|
|
}
|
|
|
|
print('start consumer')
|
|
consumer = KafkaConsumer(conf['topic_name'],
|
|
bootstrap_servers=conf['bootstrap_servers'],
|
|
group_id=conf['consumer_id'])
|
|
|
|
for message in consumer:
|
|
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
|
|
|
|
print('end consumer')</pre>
|
|
</li></ul>
|
|
</div>
|
|
</div>
|
|
|