doc-exports/docs/dms/dev/kafka-go.html
Chen, Junjie 82285f686d DMS DEV Initial Version
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
Co-authored-by: Chen, Junjie <chenjunjie@huawei.com>
Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
2022-12-08 00:39:54 +00:00

306 lines
9.8 KiB
HTML

<a name="kafka-go"></a><a name="kafka-go"></a>
<h1 class="topictitle1">Go</h1>
<div id="body0000001081563264"><p id="kafka-go__p189211411174010">This section describes how to access a Kafka instance using Go 1.16.5 on the Linux CentOS, how to obtain the demo code, and how to produce and consume messages.</p>
<p id="kafka-go__p465419346576">Before getting started, ensure that you have collected the information listed in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
<div class="section" id="kafka-go__section4574152863919"><h4 class="sectiontitle">Preparing the Environment</h4><ul id="kafka-go__ul1205424194112"><li id="kafka-go__li162059247412">Run the following command to check whether Go has been installed:<pre class="screen" id="kafka-go__screen549817508437">go version</pre>
<p id="kafka-go__p0879124464319">If the following information is displayed, Go has been installed.</p>
<pre class="screen" id="kafka-go__screen98677221458">[root@ecs-test sarama]# go version
go version go1.16.5 linux/amd64</pre>
<p id="kafka-go__p4425159184510">If Go is not installed, <a href="https://golang.org/doc/install?download=go1.16.5.linux-amd64.tar.gz" target="_blank" rel="noopener noreferrer">download</a> and install it.</p>
</li><li id="kafka-go__li791118451193">Run the following command to obtain the code used in the demo:<pre class="screen" id="kafka-go__screen118625185206">go get github.com/confluentinc/confluent-kafka-go/kafka</pre>
</li></ul>
</div>
<div class="section" id="kafka-go__section186152224220"><h4 class="sectiontitle">Producing Messages</h4><div class="note" id="kafka-go__note1681753756"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="kafka-go__p18426113152515">Replace the following information in bold with the actual values.</p>
</div></div>
<ul id="kafka-go__ul72740212348"><li id="kafka-go__li1274821143420">With SASL<pre class="screen" id="kafka-go__screen35531122111218">package main
import (
"bufio"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"os/signal"
"syscall"
)
var (
brokers = "<strong id="kafka-go__b4187917133412">ip1:port1</strong>,<strong id="kafka-go__b1485722110341">ip2:port2</strong>,<strong id="kafka-go__b1662172517346">ip3:port3</strong>"
topics = "<strong id="kafka-go__b202881446203420">topic_name</strong>"
user = "<strong id="kafka-go__b1483718500344">username</strong>"
password = "<strong id="kafka-go__b1537065583417">password</strong>"
caFile = "<strong id="kafka-go__b1726142423614">phy_ca.crt</strong>" //Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
)
func main() {
log.Println("Starting a new kafka producer")
config := &amp;kafka.ConfigMap{
"bootstrap.servers": brokers,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": user,
"sasl.password": password,
"ssl.ca.location": caFile,
}
producer, err := kafka.NewProducer(config)
if err != nil {
log.Panicf("producer error, err: %v", err)
return
}
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
log.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
fmt.Println("please enter message:")
go func() {
for {
err := producer.Produce(&amp;kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &amp;topics, Partition: kafka.PartitionAny},
Value: GetInput(),
}, nil)
if err != nil {
log.Panicf("send message fail, err: %v", err)
return
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case &lt;-sigterm:
log.Println("terminating: via signal")
}
// Wait for message deliveries before shutting down
producer.Flush(15 * 1000)
producer.Close()
}
func GetInput() []byte {
reader := bufio.NewReader(os.Stdin)
data, _, _ := reader.ReadLine()
return data
}</pre>
</li><li id="kafka-go__li3815417348">Without SASL<pre class="screen" id="kafka-go__screen53201715243">package main
import (
"bufio"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"os/signal"
"syscall"
)
var (
brokers = "<strong id="kafka-go__b21051353173710">ip1:port1</strong>,<strong id="kafka-go__b454315383814">ip2:port2</strong>,<strong id="kafka-go__b953314816383">ip3:port3</strong>"
topics = "<strong id="kafka-go__b151061558193710">topic_name</strong>"
)
func main() {
log.Println("Starting a new kafka producer")
config := &amp;kafka.ConfigMap{
"bootstrap.servers": brokers,
}
producer, err := kafka.NewProducer(config)
if err != nil {
log.Panicf("producer error, err: %v", err)
return
}
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
log.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
fmt.Println("please enter message:")
go func() {
for {
err := producer.Produce(&amp;kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &amp;topics, Partition: kafka.PartitionAny},
Value: GetInput(),
}, nil)
if err != nil {
log.Panicf("send message fail, err: %v", err)
return
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case &lt;-sigterm:
log.Println("terminating: via signal")
}
// Wait for message deliveries before shutting down
producer.Flush(15 * 1000)
producer.Close()
}
func GetInput() []byte {
reader := bufio.NewReader(os.Stdin)
data, _, _ := reader.ReadLine()
return data
}</pre>
</li></ul>
<p id="kafka-go__p1455535103320"></p>
</div>
<div class="section" id="kafka-go__section12564133114116"><h4 class="sectiontitle">Consuming Messages</h4><div class="note" id="kafka-go__note12134175215445"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="kafka-go__p1913517527445">Replace the following information in bold with the actual values.</p>
</div></div>
<ul id="kafka-go__ul79383504417"><li id="kafka-go__li109381250174112">With SASL<pre class="screen" id="kafka-go__screen1590418017254">package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"os/signal"
"syscall"
)
var (
brokers = "<strong id="kafka-go__b10914185218399">ip1:port1</strong>,<strong id="kafka-go__b99947577398">ip2:port2</strong>,<strong id="kafka-go__b173711025400">ip3:port3</strong>"
group = "<strong id="kafka-go__b1364013754013">group-id</strong>"
topics = "<strong id="kafka-go__b1746141316402">topic_name</strong>"
user = "<strong id="kafka-go__b1479313179407">username</strong>"
password = "<strong id="kafka-go__b1270622194015">password</strong>"
caFile = "<strong id="kafka-go__b15790155121016">phy_ca.crt</strong>" //Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
)
func main() {
log.Println("Starting a new kafka consumer")
config := &amp;kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": group,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": user,
"sasl.password": password,
"ssl.ca.location": caFile,
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
log.Panicf("Error creating consumer: %v", err)
return
}
err = consumer.SubscribeTopics([]string{topics}, nil)
if err != nil {
log.Panicf("Error subscribe consumer: %v", err)
return
}
go func() {
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)", err, msg)
} else {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case &lt;-sigterm:
log.Println("terminating: via signal")
}
if err = consumer.Close(); err != nil {
log.Panicf("Error closing consumer: %v", err)
}
}</pre>
</li><li id="kafka-go__li1529265364117">Without SASL<pre class="screen" id="kafka-go__screen115581152182712">package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"log"
"os"
"os/signal"
"syscall"
)
var (
brokers = "<strong id="kafka-go__b998019010441">ip1:port1</strong>,<strong id="kafka-go__b1995025204420">ip2:port2</strong>,<strong id="kafka-go__b1696199184416">ip3:port3</strong>"
group = "<strong id="kafka-go__b8911181417445">group-id</strong>"
topics = "<strong id="kafka-go__b172951519134418">topic_name</strong>"
)
func main() {
log.Println("Starting a new kafka consumer")
config := &amp;kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": group,
"auto.offset.reset": "earliest",
}
consumer, err := kafka.NewConsumer(config)
if err != nil {
log.Panicf("Error creating consumer: %v", err)
return
}
err = consumer.SubscribeTopics([]string{topics}, nil)
if err != nil {
log.Panicf("Error subscribe consumer: %v", err)
return
}
go func() {
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v (%v)", err, msg)
} else {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
}
}
}()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case &lt;-sigterm:
log.Println("terminating: via signal")
}
if err = consumer.Close(); err != nil {
log.Panicf("Error closing consumer: %v", err)
}
}</pre>
</li></ul>
</div>
</div>
<div></div>