forked from docs/doc-exports
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
306 lines
9.8 KiB
HTML
306 lines
9.8 KiB
HTML
<a name="kafka-go"></a><a name="kafka-go"></a>
|
|
|
|
<h1 class="topictitle1">Go</h1>
|
|
<div id="body0000001081563264"><p id="kafka-go__p189211411174010">This section describes how to access a Kafka instance using Go 1.16.5 on the Linux CentOS, how to obtain the demo code, and how to produce and consume messages.</p>
|
|
<p id="kafka-go__p465419346576">Before getting started, ensure that you have collected the information listed in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<div class="section" id="kafka-go__section4574152863919"><h4 class="sectiontitle">Preparing the Environment</h4><ul id="kafka-go__ul1205424194112"><li id="kafka-go__li162059247412">Run the following command to check whether Go has been installed:<pre class="screen" id="kafka-go__screen549817508437">go version</pre>
|
|
<p id="kafka-go__p0879124464319">If the following information is displayed, Go has been installed.</p>
|
|
<pre class="screen" id="kafka-go__screen98677221458">[root@ecs-test sarama]# go version
|
|
go version go1.16.5 linux/amd64</pre>
|
|
<p id="kafka-go__p4425159184510">If Go is not installed, <a href="https://golang.org/doc/install?download=go1.16.5.linux-amd64.tar.gz" target="_blank" rel="noopener noreferrer">download</a> and install it.</p>
|
|
</li><li id="kafka-go__li791118451193">Run the following command to obtain the code used in the demo:<pre class="screen" id="kafka-go__screen118625185206">go get github.com/confluentinc/confluent-kafka-go/kafka</pre>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-go__section186152224220"><h4 class="sectiontitle">Producing Messages</h4><div class="note" id="kafka-go__note1681753756"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="kafka-go__p18426113152515">Replace the following information in bold with the actual values.</p>
|
|
</div></div>
|
|
<ul id="kafka-go__ul72740212348"><li id="kafka-go__li1274821143420">With SASL<pre class="screen" id="kafka-go__screen35531122111218">package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b4187917133412">ip1:port1</strong>,<strong id="kafka-go__b1485722110341">ip2:port2</strong>,<strong id="kafka-go__b1662172517346">ip3:port3</strong>"
|
|
topics = "<strong id="kafka-go__b202881446203420">topic_name</strong>"
|
|
user = "<strong id="kafka-go__b1483718500344">username</strong>"
|
|
password = "<strong id="kafka-go__b1537065583417">password</strong>"
|
|
caFile = "<strong id="kafka-go__b1726142423614">phy_ca.crt</strong>" //Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka producer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"security.protocol": "SASL_SSL",
|
|
"sasl.mechanism": "PLAIN",
|
|
"sasl.username": user,
|
|
"sasl.password": password,
|
|
"ssl.ca.location": caFile,
|
|
}
|
|
producer, err := kafka.NewProducer(config)
|
|
if err != nil {
|
|
log.Panicf("producer error, err: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e := range producer.Events() {
|
|
switch ev := e.(type) {
|
|
case *kafka.Message:
|
|
if ev.TopicPartition.Error != nil {
|
|
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
|
|
} else {
|
|
log.Printf("Delivered message to %v\n", ev.TopicPartition)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Produce messages to topic (asynchronously)
|
|
fmt.Println("please enter message:")
|
|
go func() {
|
|
for {
|
|
err := producer.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
|
|
Value: GetInput(),
|
|
}, nil)
|
|
if err != nil {
|
|
log.Panicf("send message fail, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
// Wait for message deliveries before shutting down
|
|
producer.Flush(15 * 1000)
|
|
producer.Close()
|
|
}
|
|
|
|
func GetInput() []byte {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
data, _, _ := reader.ReadLine()
|
|
return data
|
|
}</pre>
|
|
</li><li id="kafka-go__li3815417348">Without SASL<pre class="screen" id="kafka-go__screen53201715243">package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b21051353173710">ip1:port1</strong>,<strong id="kafka-go__b454315383814">ip2:port2</strong>,<strong id="kafka-go__b953314816383">ip3:port3</strong>"
|
|
topics = "<strong id="kafka-go__b151061558193710">topic_name</strong>"
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka producer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
}
|
|
producer, err := kafka.NewProducer(config)
|
|
if err != nil {
|
|
log.Panicf("producer error, err: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e := range producer.Events() {
|
|
switch ev := e.(type) {
|
|
case *kafka.Message:
|
|
if ev.TopicPartition.Error != nil {
|
|
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
|
|
} else {
|
|
log.Printf("Delivered message to %v\n", ev.TopicPartition)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Produce messages to topic (asynchronously)
|
|
fmt.Println("please enter message:")
|
|
go func() {
|
|
for {
|
|
err := producer.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
|
|
Value: GetInput(),
|
|
}, nil)
|
|
if err != nil {
|
|
log.Panicf("send message fail, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
// Wait for message deliveries before shutting down
|
|
producer.Flush(15 * 1000)
|
|
producer.Close()
|
|
}
|
|
|
|
func GetInput() []byte {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
data, _, _ := reader.ReadLine()
|
|
return data
|
|
}</pre>
|
|
</li></ul>
|
|
<p id="kafka-go__p1455535103320"></p>
|
|
</div>
|
|
<div class="section" id="kafka-go__section12564133114116"><h4 class="sectiontitle">Consuming Messages</h4><div class="note" id="kafka-go__note12134175215445"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="kafka-go__p1913517527445">Replace the following information in bold with the actual values.</p>
|
|
</div></div>
|
|
<ul id="kafka-go__ul79383504417"><li id="kafka-go__li109381250174112">With SASL<pre class="screen" id="kafka-go__screen1590418017254">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b10914185218399">ip1:port1</strong>,<strong id="kafka-go__b99947577398">ip2:port2</strong>,<strong id="kafka-go__b173711025400">ip3:port3</strong>"
|
|
group = "<strong id="kafka-go__b1364013754013">group-id</strong>"
|
|
topics = "<strong id="kafka-go__b1746141316402">topic_name</strong>"
|
|
user = "<strong id="kafka-go__b1479313179407">username</strong>"
|
|
password = "<strong id="kafka-go__b1270622194015">password</strong>"
|
|
caFile = "<strong id="kafka-go__b15790155121016">phy_ca.crt</strong>" //Certificate file. For details about how to obtain an SSL certificate, see section "Collecting Connection Information."
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka consumer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"group.id": group,
|
|
"auto.offset.reset": "earliest",
|
|
"security.protocol": "SASL_SSL",
|
|
"sasl.mechanism": "PLAIN",
|
|
"sasl.username": user,
|
|
"sasl.password": password,
|
|
"ssl.ca.location": caFile,
|
|
}
|
|
|
|
consumer, err := kafka.NewConsumer(config)
|
|
if err != nil {
|
|
log.Panicf("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
err = consumer.SubscribeTopics([]string{topics}, nil)
|
|
if err != nil {
|
|
log.Panicf("Error subscribe consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := consumer.ReadMessage(-1)
|
|
if err != nil {
|
|
log.Printf("Consumer error: %v (%v)", err, msg)
|
|
} else {
|
|
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
if err = consumer.Close(); err != nil {
|
|
log.Panicf("Error closing consumer: %v", err)
|
|
}
|
|
}</pre>
|
|
</li><li id="kafka-go__li1529265364117">Without SASL<pre class="screen" id="kafka-go__screen115581152182712">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b998019010441">ip1:port1</strong>,<strong id="kafka-go__b1995025204420">ip2:port2</strong>,<strong id="kafka-go__b1696199184416">ip3:port3</strong>"
|
|
group = "<strong id="kafka-go__b8911181417445">group-id</strong>"
|
|
topics = "<strong id="kafka-go__b172951519134418">topic_name</strong>"
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka consumer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"group.id": group,
|
|
"auto.offset.reset": "earliest",
|
|
}
|
|
|
|
consumer, err := kafka.NewConsumer(config)
|
|
if err != nil {
|
|
log.Panicf("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
err = consumer.SubscribeTopics([]string{topics}, nil)
|
|
if err != nil {
|
|
log.Panicf("Error subscribe consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := consumer.ReadMessage(-1)
|
|
if err != nil {
|
|
log.Printf("Consumer error: %v (%v)", err, msg)
|
|
} else {
|
|
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
if err = consumer.Close(); err != nil {
|
|
log.Panicf("Error closing consumer: %v", err)
|
|
}
|
|
}</pre>
|
|
</li></ul>
|
|
</div>
|
|
</div>
|
|
<div></div>
|
|
|