forked from docs/doc-exports
Reviewed-by: Antonova, Ekaterina <ekantono@noreply.gitea.eco.tsi-dev.otc-service.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
126 lines
15 KiB
HTML
126 lines
15 KiB
HTML
<a name="kafka-python"></a><a name="kafka-python"></a>
|
|
|
|
<h1 class="topictitle1">Python</h1>
|
|
<div id="body1557124185655"><p id="kafka-python__p8060118">This section takes Linux CentOS as an example to describe how to access a Kafka instance using a Kafka client in Python, including how to install the client, and produce and consume messages.</p>
|
|
<p id="kafka-python__p465419346576">Before getting started, ensure that you have collected the information listed in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<div class="section" id="kafka-python__section134111182153"><h4 class="sectiontitle">Preparing the Environment</h4><ul id="kafka-python__ul1778112019164"><li id="kafka-python__li1078620151610">Python<p id="kafka-python__p7501123131611"><a name="kafka-python__li1078620151610"></a><a name="li1078620151610"></a>Generally, Python is pre-installed in the system. Enter <strong id="kafka-python__b24491343123118">python</strong> in a CLI. If the following information is displayed, Python has already been installed.</p>
|
|
<pre class="screen" id="kafka-python__screen18357172916175">[root@ecs-test python-kafka]# python3
|
|
Python 3.7.1 (default, Jul 5 2020, 14:37:24)
|
|
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
|
|
Type "help", "copyright", "credits" or "license" for more information.
|
|
>>></pre>
|
|
<p id="kafka-python__p1674175419467">If Python is not installed, run the following command:</p>
|
|
<p id="kafka-python__p1139872261712"><strong id="kafka-python__b193940597467">yum install python</strong></p>
|
|
</li><li id="kafka-python__li87882017163">Kafka clients in Python<p id="kafka-python__p7700213612"><a name="kafka-python__li87882017163"></a><a name="li87882017163"></a>Run the following command to install a Python client of the recommended version:</p>
|
|
<p id="kafka-python__p127353116543"><strong id="kafka-python__b7294122914547">pip install kafka-python==2.0.1</strong></p>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-python__section175541107415"><h4 class="sectiontitle">Producing Messages</h4><ul id="kafka-python__ul1650483517910"><li id="kafka-python__li1850418358910">With SASL<pre class="screen" id="kafka-python__screen1273974420157">from kafka import KafkaProducer
|
|
import ssl
|
|
##Connection information
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b18626113512615">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b1063392431614">topic_name</strong>',
|
|
'sasl_username': '<strong id="kafka-python__b3120193791617">username</strong>',
|
|
'sasl_password': '<strong id="kafka-python__b143551640181611">password</strong>'
|
|
}
|
|
|
|
context = ssl.create_default_context()
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
## If <strong id="kafka-python__b55016351413">Security Protocol</strong> is set to <strong id="kafka-python__b15147193814116">SASL_PLAINTEXT</strong>, comment out the following parameters:
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If <strong id="kafka-python__b74070594314">Security Protocol</strong> is set to <strong id="kafka-python__b34078514433">SASL_PLAINTEXT</strong>, comment out the following parameters:
|
|
context.load_verify_locations("<strong id="kafka-python__b5602124191716">phy_ca.crt</strong>")
|
|
|
|
print('start producer')
|
|
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
|
|
sasl_mechanism="<strong id="kafka-python__b1771614013315">PLAIN</strong>",
|
|
ssl_context=context,
|
|
security_protocol='<strong id="kafka-python__b35618811244">SASL_SSL</strong>',
|
|
sasl_plain_username=conf['sasl_username'],
|
|
sasl_plain_password=conf['sasl_password'])
|
|
|
|
data = bytes("hello kafka!", encoding="utf-8")
|
|
producer.send(conf['topic_name'], data)
|
|
producer.close()
|
|
print('end producer')</pre>
|
|
<p id="kafka-python__p1123354315615">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-python__ul14961114618578"><li id="kafka-python__li29611746185714"><strong id="kafka-python__b619595819518">bootstrap_servers</strong>: instance connection address and port</li><li id="kafka-python__li92790567577"><strong id="kafka-python__b1334741465216">topic_name</strong>: topic name</li><li id="kafka-python__li370312335810"><strong id="kafka-python__b114314379389">sasl_plain_username/sasl_plain_password</strong>: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.</li><li id="kafka-python__li1471831925819"><strong id="kafka-python__b83675528554">context.load_verify_locations</strong>: certificate file. This parameter is mandatory when <strong id="kafka-python__b13709557439">Security Protocol</strong> is set to <strong id="kafka-python__b18720592431">SASL_SSL</strong>. CRT certificates are used for accessing instances in Python.</li><li id="kafka-python__li04101535163315"><strong id="kafka-python__b165381813153613">sasl_mechanism</strong>: SASL authentication mechanism. View it on the <strong id="kafka-python__b86602156365">Basic Information</strong> page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if <strong id="kafka-python__b174073145010">SASL Mechanism</strong> is not displayed on the instance details page, PLAIN is used by default.</li><li id="kafka-python__li144358116212"><strong id="kafka-python__b9631195920550">security_protocol</strong>: Kafka security protocol. Obtain it from the <strong id="kafka-python__b1987002373913">Basic Information</strong> page on the Kafka console. For Kafka instances that were created much earlier, if <strong id="kafka-python__b1760534614520">Security Protocol</strong> is not displayed on the instance details page, SASL_SSL is used by default.<ul id="kafka-python__ul14433141182112"><li id="kafka-python__li61051119172115">When <strong id="kafka-python__b117417275469">Security Protocol</strong> is set to <strong id="kafka-python__b2013133013468">SASL_SSL</strong>, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.</li><li id="kafka-python__li1343315111218">When <strong id="kafka-python__b128881719144911">Security Protocol</strong> is set to <strong id="kafka-python__b1088861934919">SASL_PLAINTEXT</strong>, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.</li></ul>
|
|
</li></ul>
|
|
</li><li id="kafka-python__li185041335193">Without SASL<pre class="screen" id="kafka-python__screen15854105662114">from kafka import KafkaProducer
|
|
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b319324744718">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b10131521162210">topic-name</strong>',
|
|
}
|
|
|
|
print('start producer')
|
|
producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
|
|
|
|
data = bytes("hello kafka!", encoding="utf-8")
|
|
producer.send(conf['topic_name'], data)
|
|
producer.close()
|
|
print('end producer')</pre>
|
|
<p id="kafka-python__p065985051919">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-python__ul1049316482195"><li id="kafka-python__li065913507197"><strong id="kafka-python__b0774145105217">bootstrap_servers</strong>: instance connection address and port</li><li id="kafka-python__li19659350181910"><strong id="kafka-python__b91251718135214">topic_name</strong>: topic name</li></ul>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-python__section965662719339"><h4 class="sectiontitle">Consuming Messages</h4><ul id="kafka-python__ul11656102733319"><li id="kafka-python__li3656627113319">With SASL<pre class="screen" id="kafka-python__screen10656142716334">from kafka import KafkaConsumer
|
|
import ssl
|
|
##Connection information
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b548515934715">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b12657162703310">topic_name</strong>',
|
|
'sasl_username': '<strong id="kafka-python__b17657122714335">username</strong>',
|
|
'sasl_password': '<strong id="kafka-python__b765719271337">password</strong>',
|
|
'consumer_id': '<strong id="kafka-python__b265792733316">consumer_id</strong>'
|
|
}
|
|
|
|
context = ssl.create_default_context()
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
## If <strong id="kafka-python__b161119407411">Security Protocol</strong> is set to <strong id="kafka-python__b1711134019418">SASL_PLAINTEXT</strong>, comment out the following parameters:
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If <strong id="kafka-python__b16872134315">Security Protocol</strong> is set to <strong id="kafka-python__b4681621184312">SASL_PLAINTEXT</strong>, comment out the following parameters:
|
|
context.load_verify_locations("<strong id="kafka-python__b1765782763312">phy_ca.crt</strong>")
|
|
|
|
print('start consumer')
|
|
consumer = KafkaConsumer(conf['topic_name'],
|
|
bootstrap_servers=conf['bootstrap_servers'],
|
|
group_id=conf['consumer_id'],
|
|
sasl_mechanism="<strong id="kafka-python__b1995815413437">PLAIN</strong>",
|
|
ssl_context=context,
|
|
security_protocol='<strong id="kafka-python__b436717570255">SASL_SSL</strong>',
|
|
sasl_plain_username=conf['sasl_username'],
|
|
sasl_plain_password=conf['sasl_password'])
|
|
|
|
for message in consumer:
|
|
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
|
|
|
|
print('end consumer')</pre>
|
|
<p id="kafka-python__p18797111818207">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-python__ul157971187206"><li id="kafka-python__li187987180209"><strong id="kafka-python__b977725205215">bootstrap_servers</strong>: instance connection address and port</li><li id="kafka-python__li12798181892017"><strong id="kafka-python__b512981810527">topic_name</strong>: topic name</li><li id="kafka-python__li416183672010"><strong id="kafka-python__b26451351164112">sasl_plain_username/sasl_plain_password</strong>: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.</li><li id="kafka-python__li10905164302016"><strong id="kafka-python__b12624721105514">consumer_id</strong>: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.</li><li id="kafka-python__li8353115302015"><strong id="kafka-python__b176711255185412">context.load_verify_locations</strong>: certificate file. This parameter is mandatory when <strong id="kafka-python__b59297369457">Security Protocol</strong> is set to <strong id="kafka-python__b149291036174516">SASL_SSL</strong>. CRT certificates are used for accessing instances in Python.</li><li id="kafka-python__li1217995814357"><strong id="kafka-python__b1945243183518">sasl_mechanism</strong>: SASL authentication mechanism. View it on the <strong id="kafka-python__b2133157134411">Basic Information</strong> page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if <strong id="kafka-python__b1672505375216">SASL Mechanism</strong> is not displayed on the instance details page, PLAIN is used by default.</li><li id="kafka-python__li243634118250"><strong id="kafka-python__b249610195611">security_protocol</strong>: Kafka security protocol. Obtain it from the <strong id="kafka-python__b18185182713397">Basic Information</strong> page on the Kafka console. For Kafka instances that were created much earlier, if <strong id="kafka-python__b1574659135217">Security Protocol</strong> is not displayed on the instance details page, SASL_SSL is used by default.<ul id="kafka-python__ul8529620142619"><li id="kafka-python__li20550152152618">When <strong id="kafka-python__b146410442477">Security Protocol</strong> is set to <strong id="kafka-python__b064115440476">SASL_SSL</strong>, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.</li><li id="kafka-python__li1955015213264">When <strong id="kafka-python__b24011028155016">Security Protocol</strong> is set to <strong id="kafka-python__b54011728165019">SASL_PLAINTEXT</strong>, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.</li></ul>
|
|
</li></ul>
|
|
</li><li id="kafka-python__li2657112753312">Without SASL<pre class="screen" id="kafka-python__screen1876614210300">from kafka import KafkaConsumer
|
|
|
|
conf = {
|
|
'bootstrap_servers': ["<strong id="kafka-python__b98773117489">ip1:port1","ip2:port2","ip3:port3</strong>"],
|
|
'topic_name': '<strong id="kafka-python__b195221955163019">topic-name</strong>',
|
|
'consumer_id': '<strong id="kafka-python__b5627977314">consumer-id</strong>'
|
|
}
|
|
|
|
print('start consumer')
|
|
consumer = KafkaConsumer(conf['topic_name'],
|
|
bootstrap_servers=conf['bootstrap_servers'],
|
|
group_id=conf['consumer_id'])
|
|
|
|
for message in consumer:
|
|
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
|
|
|
|
print('end consumer')</pre>
|
|
<p id="kafka-python__p1450716409484">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-python__ul95077400487"><li id="kafka-python__li16507184074817"><strong id="kafka-python__b1563646567">bootstrap_servers</strong>: instance connection address and port</li><li id="kafka-python__li9507440174814"><strong id="kafka-python__b113318182528">topic_name</strong>: topic name</li><li id="kafka-python__li485825414489"><strong id="kafka-python__b24410271222">consumer_id</strong>: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.</li></ul>
|
|
</li></ul>
|
|
</div>
|
|
</div>
|
|
|