doc-exports/docs/dms/dev/Kafka-java-demo.html
Chen, Junjie 82285f686d DMS DEV Initial Version
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
Co-authored-by: Chen, Junjie <chenjunjie@huawei.com>
Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
2022-12-08 00:39:54 +00:00

832 lines
81 KiB
HTML

<a name="Kafka-java-demo"></a><a name="Kafka-java-demo"></a>
<h1 class="topictitle1">Configuring Kafka Clients in Java</h1>
<div id="body1552118864471"><p id="Kafka-java-demo__p56098244413">This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see <a href="how-to-connect-kafka.html">Setting Up the Java Development Environment</a>.</p>
<p id="Kafka-java-demo__p194018169519">The Kafka instance connection addresses, topic name, and user information used in the following examples are available in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
<div class="section" id="Kafka-java-demo__section510225675816"><h4 class="sectiontitle">Adding Kafka Clients in Maven</h4><pre class="screen" id="Kafka-java-demo__screen176831817194516">//Kafka instances are based on Kafka 1.1.0/2.3.0/2.7. Use the same version of clients.
&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
&lt;version&gt;1.1.0/2.3.0/2.7&lt;/version&gt;
&lt;/dependency&gt;</pre>
</div>
<div class="section" id="Kafka-java-demo__section235013516019"><h4 class="sectiontitle">Preparing Kafka Configuration Files</h4><p id="Kafka-java-demo__p16171537304">The following describes example producer and consumer configuration files. If SASL is not enabled for the Kafka instance, comment out lines regarding SASL. If SASL has been enabled, set SASL configurations for encrypted access.</p>
<ul id="Kafka-java-demo__ul86714126513"><li id="Kafka-java-demo__li106711612652">Producer configuration file (the <strong id="Kafka-java-demo__b335943411212">dms.sdk.producer.properties</strong> file in the demo project)<p id="Kafka-java-demo__p463910341272">The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.</p>
<pre class="screen" id="Kafka-java-demo__screen202453416716">#The topic name is in the specific production and consumption code.
#######################
#Information about Kafka brokers. <strong id="Kafka-java-demo__b14531444217">ip:port</strong> are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
bootstrap.servers=<strong id="Kafka-java-demo__b202310349719">ip1:port1,ip2:port2,ip3:port3</strong>
#Producer acknowledgement
acks=all
#Method of turning the key into bytes
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#Method of turning the value into bytes
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#Memory available to the producer for buffering
buffer.memory=33554432
#Number of retries
retries=0
#######################
#<strong id="Kafka-java-demo__b103591405346">Comment out the following parameters if SASL access is not enabled.</strong>
#######################
#Configure the JAAS username and password. <em id="Kafka-java-demo__i727614592264"><strong id="Kafka-java-demo__b17276145982614">username</strong></em> and <em id="Kafka-java-demo__i22762592261"><strong id="Kafka-java-demo__b20276185942612">password</strong></em> are set when you enable Kafka SASL_SSL during instance creation or when you create a SASL_SSL user.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<strong id="Kafka-java-demo__b132411341574">username</strong>" \
password="<strong id="Kafka-java-demo__b16243341173">password</strong>";
#SASL mechanism
sasl.mechanism=PLAIN
#Encryption protocol. Currently, only SASL_SSL is supported.
security.protocol=SASL_SSL
#Location of ssl.truststore
ssl.truststore.location=<strong id="Kafka-java-demo__b14247349717">E:\\temp\\client.truststore.jks</strong>
#Password of the SSL truststore file for accessing the JKS file generated by Java.
ssl.truststore.password=dms@kafka
#Whether to verify the certificate domain name. <strong id="Kafka-java-demo__b1299112361712">This parameter must be left blank, which indicates disabling domain name verification.</strong>
ssl.endpoint.identification.algorithm=
</pre>
</li><li id="Kafka-java-demo__li156719123520">Consumer configuration file (the <strong id="Kafka-java-demo__b1686581792517">dms.sdk.consumer.properties</strong> file in the demo project)<div class="p" id="Kafka-java-demo__p174615571788">The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.<pre class="screen" id="Kafka-java-demo__screen15720561581">#The topic name is in the specific production and consumption code.
#######################
#Information about Kafka brokers. <strong id="Kafka-java-demo__b1231719371020">ip:port</strong> are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
bootstrap.servers=<strong id="Kafka-java-demo__b177118561487">ip1:port1,ip2:port2,ip3:port3</strong>
#Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same <strong id="Kafka-java-demo__b81843132817">group.id</strong> for different processes indicates that the processes belong to the same consumer group.
group.id=<strong id="Kafka-java-demo__b37217561083">1</strong>
#Method of turning the key into bytes
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#Method of turning the value into bytes
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#Offset reset policy
auto.offset.reset=earliest
#######################
#<strong id="Kafka-java-demo__b165957175347">Comment out the following parameters if SASL access is not enabled.</strong>
#######################
#Configure the JAAS username and password. <em id="Kafka-java-demo__i113034168277"><strong id="Kafka-java-demo__b4303816172716">username</strong></em> and <em id="Kafka-java-demo__i13041316142713"><strong id="Kafka-java-demo__b203041816112713">password</strong></em> are set when you enable Kafka SASL_SSL during instance creation or when you create a SASL_SSL user.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="<strong id="Kafka-java-demo__b157214561180">username</strong>" \
password="<strong id="Kafka-java-demo__b67217567810">password</strong>";
#SASL mechanism
sasl.mechanism=PLAIN
#Encryption protocol. Currently, only SASL_SSL is supported.
security.protocol=SASL_SSL
#Location of ssl.truststore
ssl.truststore.location=<strong id="Kafka-java-demo__b372856482">E:\\temp\\client.truststore.jks</strong>
#Password of the SSL truststore file for accessing the JKS file generated by Java.
ssl.truststore.password=dms@kafka
# Disable certificate domain name verification.
ssl.endpoint.identification.algorithm=</pre>
</div>
</li></ul>
</div>
<div class="section" id="Kafka-java-demo__section5204122943315"><h4 class="sectiontitle">Producing Messages</h4><ul id="Kafka-java-demo__ul426711369375"><li id="Kafka-java-demo__li1026893617376">Test code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen18699101917393"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
<span class="normal"> 2</span>
<span class="normal"> 3</span>
<span class="normal"> 4</span>
<span class="normal"> 5</span>
<span class="normal"> 6</span>
<span class="normal"> 7</span>
<span class="normal"> 8</span>
<span class="normal"> 9</span>
<span class="normal">10</span>
<span class="normal">11</span>
<span class="normal">12</span>
<span class="normal">13</span>
<span class="normal">14</span>
<span class="normal">15</span>
<span class="normal">16</span>
<span class="normal">17</span>
<span class="normal">18</span>
<span class="normal">19</span>
<span class="normal">20</span>
<span class="normal">21</span>
<span class="normal">22</span>
<span class="normal">23</span>
<span class="normal">24</span>
<span class="normal">25</span>
<span class="normal">26</span>
<span class="normal">27</span>
<span class="normal">28</span>
<span class="normal">29</span>
<span class="normal">30</span>
<span class="normal">31</span>
<span class="normal">32</span>
<span class="normal">33</span>
<span class="normal">34</span>
<span class="normal">35</span>
<span class="normal">36</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.producer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Callback</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.RecordMetadata</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.junit.Test</span><span class="p">;</span><span class="w"></span>
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsProducerTest</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="nd">@Test</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">testProducer</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsProducer</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">DmsProducer</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">&gt;</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">partition</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="mi">10</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">key</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">data</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">&quot;The msg is &quot;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">i</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="c1">//Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code.</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">produce</span><span class="p">(</span><span class="s">&quot;topic-0&quot;</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Callback</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">onCompletion</span><span class="p">(</span><span class="n">RecordMetadata</span><span class="w"> </span><span class="n">metadata</span><span class="p">,</span><span class="w"></span>
<span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="n">exception</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">exception</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">exception</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">&quot;produce msg completed&quot;</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">});</span><span class="w"></span>
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">&quot;produce msg:&quot;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">data</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">Exception</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="c1">// TODO: Exception handling</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">finally</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="p">}</span><span class="w"></span>
</pre></div></td></tr></table></div>
</div>
</li><li id="Kafka-java-demo__li6268103683712">Message production code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen7914125317374"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
<span class="normal"> 2</span>
<span class="normal"> 3</span>
<span class="normal"> 4</span>
<span class="normal"> 5</span>
<span class="normal"> 6</span>
<span class="normal"> 7</span>
<span class="normal"> 8</span>
<span class="normal"> 9</span>
<span class="normal"> 10</span>
<span class="normal"> 11</span>
<span class="normal"> 12</span>
<span class="normal"> 13</span>
<span class="normal"> 14</span>
<span class="normal"> 15</span>
<span class="normal"> 16</span>
<span class="normal"> 17</span>
<span class="normal"> 18</span>
<span class="normal"> 19</span>
<span class="normal"> 20</span>
<span class="normal"> 21</span>
<span class="normal"> 22</span>
<span class="normal"> 23</span>
<span class="normal"> 24</span>
<span class="normal"> 25</span>
<span class="normal"> 26</span>
<span class="normal"> 27</span>
<span class="normal"> 28</span>
<span class="normal"> 29</span>
<span class="normal"> 30</span>
<span class="normal"> 31</span>
<span class="normal"> 32</span>
<span class="normal"> 33</span>
<span class="normal"> 34</span>
<span class="normal"> 35</span>
<span class="normal"> 36</span>
<span class="normal"> 37</span>
<span class="normal"> 38</span>
<span class="normal"> 39</span>
<span class="normal"> 40</span>
<span class="normal"> 41</span>
<span class="normal"> 42</span>
<span class="normal"> 43</span>
<span class="normal"> 44</span>
<span class="normal"> 45</span>
<span class="normal"> 46</span>
<span class="normal"> 47</span>
<span class="normal"> 48</span>
<span class="normal"> 49</span>
<span class="normal"> 50</span>
<span class="normal"> 51</span>
<span class="normal"> 52</span>
<span class="normal"> 53</span>
<span class="normal"> 54</span>
<span class="normal"> 55</span>
<span class="normal"> 56</span>
<span class="normal"> 57</span>
<span class="normal"> 58</span>
<span class="normal"> 59</span>
<span class="normal"> 60</span>
<span class="normal"> 61</span>
<span class="normal"> 62</span>
<span class="normal"> 63</span>
<span class="normal"> 64</span>
<span class="normal"> 65</span>
<span class="normal"> 66</span>
<span class="normal"> 67</span>
<span class="normal"> 68</span>
<span class="normal"> 69</span>
<span class="normal"> 70</span>
<span class="normal"> 71</span>
<span class="normal"> 72</span>
<span class="normal"> 73</span>
<span class="normal"> 74</span>
<span class="normal"> 75</span>
<span class="normal"> 76</span>
<span class="normal"> 77</span>
<span class="normal"> 78</span>
<span class="normal"> 79</span>
<span class="normal"> 80</span>
<span class="normal"> 81</span>
<span class="normal"> 82</span>
<span class="normal"> 83</span>
<span class="normal"> 84</span>
<span class="normal"> 85</span>
<span class="normal"> 86</span>
<span class="normal"> 87</span>
<span class="normal"> 88</span>
<span class="normal"> 89</span>
<span class="normal"> 90</span>
<span class="normal"> 91</span>
<span class="normal"> 92</span>
<span class="normal"> 93</span>
<span class="normal"> 94</span>
<span class="normal"> 95</span>
<span class="normal"> 96</span>
<span class="normal"> 97</span>
<span class="normal"> 98</span>
<span class="normal"> 99</span>
<span class="normal">100</span>
<span class="normal">101</span>
<span class="normal">102</span>
<span class="normal">103</span>
<span class="normal">104</span>
<span class="normal">105</span>
<span class="normal">106</span>
<span class="normal">107</span>
<span class="normal">108</span>
<span class="normal">109</span>
<span class="normal">110</span>
<span class="normal">111</span>
<span class="normal">112</span>
<span class="normal">113</span>
<span class="normal">114</span>
<span class="normal">115</span>
<span class="normal">116</span>
<span class="normal">117</span>
<span class="normal">118</span>
<span class="normal">119</span>
<span class="normal">120</span>
<span class="normal">121</span>
<span class="normal">122</span>
<span class="normal">123</span>
<span class="normal">124</span>
<span class="normal">125</span>
<span class="normal">126</span>
<span class="normal">127</span>
<span class="normal">128</span>
<span class="normal">129</span>
<span class="normal">130</span>
<span class="normal">131</span>
<span class="normal">132</span>
<span class="normal">133</span>
<span class="normal">134</span>
<span class="normal">135</span>
<span class="normal">136</span>
<span class="normal">137</span>
<span class="normal">138</span>
<span class="normal">139</span>
<span class="normal">140</span>
<span class="normal">141</span>
<span class="normal">142</span>
<span class="normal">143</span>
<span class="normal">144</span>
<span class="normal">145</span>
<span class="normal">146</span>
<span class="normal">147</span>
<span class="normal">148</span>
<span class="normal">149</span>
<span class="normal">150</span>
<span class="normal">151</span>
<span class="normal">152</span>
<span class="normal">153</span>
<span class="normal">154</span>
<span class="normal">155</span>
<span class="normal">156</span>
<span class="normal">157</span>
<span class="normal">158</span>
<span class="normal">159</span>
<span class="normal">160</span>
<span class="normal">161</span>
<span class="normal">162</span>
<span class="normal">163</span>
<span class="normal">164</span>
<span class="normal">165</span>
<span class="normal">166</span>
<span class="normal">167</span>
<span class="normal">168</span>
<span class="normal">169</span>
<span class="normal">170</span>
<span class="normal">171</span>
<span class="normal">172</span>
<span class="normal">173</span>
<span class="normal">174</span>
<span class="normal">175</span>
<span class="normal">176</span>
<span class="normal">177</span>
<span class="normal">178</span>
<span class="normal">179</span>
<span class="normal">180</span>
<span class="normal">181</span>
<span class="normal">182</span>
<span class="normal">183</span>
<span class="normal">184</span>
<span class="normal">185</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.producer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.BufferedInputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.FileInputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.IOException</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.InputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.net.URL</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.ArrayList</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Enumeration</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.List</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Properties</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Callback</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.KafkaProducer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Producer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.ProducerRecord</span><span class="p">;</span><span class="w"></span>
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsProducer</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="c1">//Add the producer configurations that have been specified earlier.</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">CONFIG_PRODUCER_FILE_NAME</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">&quot;dms.sdk.producer.properties&quot;</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Producer</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">producer</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsProducer</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">path</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">in</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">BufferedInputStream</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FileInputStream</span><span class="p">(</span><span class="n">path</span><span class="p">));</span><span class="w"></span>
<span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">in</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaProducer</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="n">V</span><span class="o">&gt;</span><span class="p">(</span><span class="n">props</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsProducer</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">loadFromClasspath</span><span class="p">(</span><span class="n">CONFIG_PRODUCER_FILE_NAME</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaProducer</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="n">V</span><span class="o">&gt;</span><span class="p">(</span><span class="n">props</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Producing messages</span>
<span class="cm"> *</span>
<span class="cm"> * @param topic Topic</span>
<span class="cm"> * @param partition partition</span>
<span class="cm"> * @param key Message key</span>
<span class="cm"> * @param data Message data</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Producing messages</span>
<span class="cm"> *</span>
<span class="cm"> * @param topic Topic</span>
<span class="cm"> * @param partition partition</span>
<span class="cm"> * @param key Message key</span>
<span class="cm"> * @param data Message data</span>
<span class="cm"> * @param timestamp timestamp</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Producing messages</span>
<span class="cm"> *</span>
<span class="cm"> * @param topic Topic</span>
<span class="cm"> * @param partition partition</span>
<span class="cm"> * @param key Message key</span>
<span class="cm"> * @param data Message data</span>
<span class="cm"> * @param callback callback</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Producing messages</span>
<span class="cm"> *</span>
<span class="cm"> * @param topic Topic</span>
<span class="cm"> * @param partition partition</span>
<span class="cm"> * @param key Message key</span>
<span class="cm"> * @param data Message data</span>
<span class="cm"> * @param timestamp timestamp</span>
<span class="cm"> * @param callback callback</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">ProducerRecord</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kafkaRecord</span><span class="w"> </span><span class="o">=</span><span class="w"></span>
<span class="w"> </span><span class="n">timestamp</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="w"> </span><span class="o">?</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ProducerRecord</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ProducerRecord</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">ProducerRecord</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kafkaRecord</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">ProducerRecord</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">send</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">close</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * get classloader from thread context if no classloader found in thread</span>
<span class="cm"> * context return the classloader which has loaded this class</span>
<span class="cm"> *</span>
<span class="cm"> * @return classloader</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="nf">getCurrentClassLoader</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">currentThread</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">.</span><span class="na">getContextClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">classLoader</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DmsProducer</span><span class="p">.</span><span class="na">class</span><span class="p">.</span><span class="na">getClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">classLoader</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Load configuration information from classpath.</span>
<span class="cm"> *</span>
<span class="cm"> * @param configFileName Configuration file name</span>
<span class="cm"> * @return Configuration information</span>
<span class="cm"> * @throws IOException</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="nf">loadFromClasspath</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">configFileName</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">IOException</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">getCurrentClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="w"> </span><span class="n">properties</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">Enumeration</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="w"> </span><span class="n">propertyResources</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">classLoader</span><span class="w"></span>
<span class="w"> </span><span class="p">.</span><span class="na">getResources</span><span class="p">(</span><span class="n">configFileName</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">hasMoreElements</span><span class="p">())</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">properties</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">nextElement</span><span class="p">());</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">URL</span><span class="w"> </span><span class="n">url</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">properties</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">url</span><span class="p">.</span><span class="na">openStream</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">config</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">is</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">finally</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">is</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">config</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="p">}</span><span class="w"></span>
</pre></div></td></tr></table></div>
</div>
</li></ul>
</div>
<div class="section" id="Kafka-java-demo__section19410163413542"><h4 class="sectiontitle">Consuming Messages</h4><ul id="Kafka-java-demo__ul164361859155416"><li id="Kafka-java-demo__li12436759165413">Test code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen171556519552"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
<span class="normal"> 2</span>
<span class="normal"> 3</span>
<span class="normal"> 4</span>
<span class="normal"> 5</span>
<span class="normal"> 6</span>
<span class="normal"> 7</span>
<span class="normal"> 8</span>
<span class="normal"> 9</span>
<span class="normal">10</span>
<span class="normal">11</span>
<span class="normal">12</span>
<span class="normal">13</span>
<span class="normal">14</span>
<span class="normal">15</span>
<span class="normal">16</span>
<span class="normal">17</span>
<span class="normal">18</span>
<span class="normal">19</span>
<span class="normal">20</span>
<span class="normal">21</span>
<span class="normal">22</span>
<span class="normal">23</span>
<span class="normal">24</span>
<span class="normal">25</span>
<span class="normal">26</span>
<span class="normal">27</span>
<span class="normal">28</span>
<span class="normal">29</span>
<span class="normal">30</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.consumer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecords</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.junit.Test</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Arrays</span><span class="p">;</span><span class="w"></span>
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsConsumerTest</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="nd">@Test</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">testConsumer</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsConsumer</span><span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">DmsConsumer</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">consume</span><span class="p">(</span><span class="n">Arrays</span><span class="p">.</span><span class="na">asList</span><span class="p">(</span><span class="s">&quot;topic-0&quot;</span><span class="p">));</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="mi">10</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">){</span><span class="w"></span>
<span class="w"> </span><span class="n">ConsumerRecords</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="w"> </span><span class="n">records</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">poll</span><span class="p">(</span><span class="mi">1000</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">&quot;the numbers of topic:&quot;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">records</span><span class="p">.</span><span class="na">count</span><span class="p">());</span><span class="w"></span>
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">ConsumerRecord</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="w"> </span><span class="n">record</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">records</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="n">record</span><span class="p">.</span><span class="na">toString</span><span class="p">());</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">Exception</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="c1">// TODO: Exception handling</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">finally</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="p">}</span><span class="w"></span>
</pre></div></td></tr></table></div>
</div>
</li><li id="Kafka-java-demo__li843695925415">Message consumption code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen97141458568"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
<span class="normal"> 2</span>
<span class="normal"> 3</span>
<span class="normal"> 4</span>
<span class="normal"> 5</span>
<span class="normal"> 6</span>
<span class="normal"> 7</span>
<span class="normal"> 8</span>
<span class="normal"> 9</span>
<span class="normal"> 10</span>
<span class="normal"> 11</span>
<span class="normal"> 12</span>
<span class="normal"> 13</span>
<span class="normal"> 14</span>
<span class="normal"> 15</span>
<span class="normal"> 16</span>
<span class="normal"> 17</span>
<span class="normal"> 18</span>
<span class="normal"> 19</span>
<span class="normal"> 20</span>
<span class="normal"> 21</span>
<span class="normal"> 22</span>
<span class="normal"> 23</span>
<span class="normal"> 24</span>
<span class="normal"> 25</span>
<span class="normal"> 26</span>
<span class="normal"> 27</span>
<span class="normal"> 28</span>
<span class="normal"> 29</span>
<span class="normal"> 30</span>
<span class="normal"> 31</span>
<span class="normal"> 32</span>
<span class="normal"> 33</span>
<span class="normal"> 34</span>
<span class="normal"> 35</span>
<span class="normal"> 36</span>
<span class="normal"> 37</span>
<span class="normal"> 38</span>
<span class="normal"> 39</span>
<span class="normal"> 40</span>
<span class="normal"> 41</span>
<span class="normal"> 42</span>
<span class="normal"> 43</span>
<span class="normal"> 44</span>
<span class="normal"> 45</span>
<span class="normal"> 46</span>
<span class="normal"> 47</span>
<span class="normal"> 48</span>
<span class="normal"> 49</span>
<span class="normal"> 50</span>
<span class="normal"> 51</span>
<span class="normal"> 52</span>
<span class="normal"> 53</span>
<span class="normal"> 54</span>
<span class="normal"> 55</span>
<span class="normal"> 56</span>
<span class="normal"> 57</span>
<span class="normal"> 58</span>
<span class="normal"> 59</span>
<span class="normal"> 60</span>
<span class="normal"> 61</span>
<span class="normal"> 62</span>
<span class="normal"> 63</span>
<span class="normal"> 64</span>
<span class="normal"> 65</span>
<span class="normal"> 66</span>
<span class="normal"> 67</span>
<span class="normal"> 68</span>
<span class="normal"> 69</span>
<span class="normal"> 70</span>
<span class="normal"> 71</span>
<span class="normal"> 72</span>
<span class="normal"> 73</span>
<span class="normal"> 74</span>
<span class="normal"> 75</span>
<span class="normal"> 76</span>
<span class="normal"> 77</span>
<span class="normal"> 78</span>
<span class="normal"> 79</span>
<span class="normal"> 80</span>
<span class="normal"> 81</span>
<span class="normal"> 82</span>
<span class="normal"> 83</span>
<span class="normal"> 84</span>
<span class="normal"> 85</span>
<span class="normal"> 86</span>
<span class="normal"> 87</span>
<span class="normal"> 88</span>
<span class="normal"> 89</span>
<span class="normal"> 90</span>
<span class="normal"> 91</span>
<span class="normal"> 92</span>
<span class="normal"> 93</span>
<span class="normal"> 94</span>
<span class="normal"> 95</span>
<span class="normal"> 96</span>
<span class="normal"> 97</span>
<span class="normal"> 98</span>
<span class="normal"> 99</span>
<span class="normal">100</span>
<span class="normal">101</span>
<span class="normal">102</span>
<span class="normal">103</span>
<span class="normal">104</span>
<span class="normal">105</span>
<span class="normal">106</span>
<span class="normal">107</span>
<span class="normal">108</span>
<span class="normal">109</span>
<span class="normal">110</span>
<span class="normal">111</span>
<span class="normal">112</span>
<span class="normal">113</span>
<span class="normal">114</span>
<span class="normal">115</span>
<span class="normal">116</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.consumer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecords</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.KafkaConsumer</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.BufferedInputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.FileInputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.IOException</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.InputStream</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.net.URL</span><span class="p">;</span><span class="w"></span>
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.*</span><span class="p">;</span><span class="w"></span>
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsConsumer</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">CONFIG_CONSUMER_FILE_NAME</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">&quot;dms.sdk.consumer.properties&quot;</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="w"> </span><span class="n">consumer</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsConsumer</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">path</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">in</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">BufferedInputStream</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FileInputStream</span><span class="p">(</span><span class="n">path</span><span class="p">));</span><span class="w"></span>
<span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">in</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="p">(</span><span class="n">props</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">DmsConsumer</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">loadFromClasspath</span><span class="p">(</span><span class="n">CONFIG_CONSUMER_FILE_NAME</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="p">(</span><span class="n">props</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">consume</span><span class="p">(</span><span class="n">List</span><span class="w"> </span><span class="n">topics</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">subscribe</span><span class="p">(</span><span class="n">topics</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">ConsumerRecords</span><span class="o">&lt;</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">&gt;</span><span class="w"> </span><span class="nf">poll</span><span class="p">(</span><span class="kt">long</span><span class="w"> </span><span class="n">timeout</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">poll</span><span class="p">(</span><span class="n">timeout</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">close</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * get classloader from thread context if no classloader found in thread</span>
<span class="cm"> * context return the classloader which has loaded this class</span>
<span class="cm"> *</span>
<span class="cm"> * @return classloader</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="nf">getCurrentClassLoader</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">currentThread</span><span class="p">()</span><span class="w"></span>
<span class="w"> </span><span class="p">.</span><span class="na">getContextClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">classLoader</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DmsConsumer</span><span class="p">.</span><span class="na">class</span><span class="p">.</span><span class="na">getClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">classLoader</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="cm">/**</span>
<span class="cm"> * Load configuration information from classpath.</span>
<span class="cm"> *</span>
<span class="cm"> * @param configFileName Configuration file name</span>
<span class="cm"> * @return Configuration information</span>
<span class="cm"> * @throws IOException</span>
<span class="cm"> */</span><span class="w"></span>
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="nf">loadFromClasspath</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">configFileName</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">IOException</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">getCurrentClassLoader</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="w"> </span><span class="n">properties</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">Enumeration</span><span class="o">&lt;</span><span class="n">URL</span><span class="o">&gt;</span><span class="w"> </span><span class="n">propertyResources</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">classLoader</span><span class="w"></span>
<span class="w"> </span><span class="p">.</span><span class="na">getResources</span><span class="p">(</span><span class="n">configFileName</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">hasMoreElements</span><span class="p">())</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">properties</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">nextElement</span><span class="p">());</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">URL</span><span class="w"> </span><span class="n">url</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">properties</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="k">try</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">url</span><span class="p">.</span><span class="na">openStream</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">config</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">is</span><span class="p">);</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">finally</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">is</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"></span>
<span class="w"> </span><span class="p">{</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w"></span>
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">config</span><span class="p">;</span><span class="w"></span>
<span class="w"> </span><span class="p">}</span><span class="w"></span>
<span class="p">}</span><span class="w"></span>
</pre></div></td></tr></table></div>
</div>
</li></ul>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="kafka-java.html">Java</a></div>
</div>
</div>