forked from docs/doc-exports
Reviewed-by: Antonova, Ekaterina <ekantono@noreply.gitea.eco.tsi-dev.otc-service.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
845 lines
77 KiB
HTML
845 lines
77 KiB
HTML
<a name="Kafka-java-demo"></a><a name="Kafka-java-demo"></a>
|
|
|
|
<h1 class="topictitle1">Configuring Kafka Clients in Java</h1>
|
|
<div id="body1552118864471"><p id="Kafka-java-demo__p56098244413">This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see <a href="how-to-connect-kafka.html">Setting Up the Java Development Environment</a>.</p>
|
|
<p id="Kafka-java-demo__p194018169519">The Kafka instance connection addresses, topic name, and user information used in the following examples are available in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<div class="section" id="Kafka-java-demo__section510225675816"><h4 class="sectiontitle">Adding Kafka Clients in Maven</h4><pre class="screen" id="Kafka-java-demo__screen2066655918534">//Kafka instances are based on Kafka 1.1.0/2.3.0/2.7/3.x. Use the same version of the client.
|
|
<dependency>
|
|
<groupId>org.apache.kafka</groupId>
|
|
<artifactId>kafka-clients</artifactId>
|
|
<version>1.1.0/2.3.0/2.7.2/3.4.0</version>
|
|
</dependency></pre>
|
|
</div>
|
|
<div class="section" id="Kafka-java-demo__section235013516019"><h4 class="sectiontitle">Preparing Kafka Configuration Files</h4><p id="Kafka-java-demo__p16171537304">The following describes example producer and consumer configuration files. If ciphertext access is not enabled for the Kafka instance, comment out lines regarding the encryption. Otherwise, set configurations for encrypted access.</p>
|
|
<ul id="Kafka-java-demo__ul86714126513"><li id="Kafka-java-demo__li106711612652"><a name="Kafka-java-demo__li106711612652"></a><a name="li106711612652"></a>Producer configuration file (the <strong id="Kafka-java-demo__b335943411212">dms.sdk.producer.properties</strong> file in the <a href="#Kafka-java-demo__li6268103683712">message production code</a>)<pre class="screen" id="Kafka-java-demo__screen202453416716">#The topic name is in the specific production and consumption code.
|
|
#######################
|
|
#Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
|
|
bootstrap.servers=<strong id="Kafka-java-demo__b202310349719">ip1:port1,ip2:port2,ip3:port3</strong>
|
|
#Producer acknowledgement
|
|
acks=all
|
|
#Method of turning the key into bytes
|
|
key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
#Method of turning the value into bytes
|
|
value.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
#Memory available to the producer for buffering
|
|
buffer.memory=33554432
|
|
#Number of retries
|
|
retries=0
|
|
#######################
|
|
#<strong id="Kafka-java-demo__b16369754131319">Comment out the following parameters if ciphertext access is not enabled.</strong>
|
|
#######################
|
|
# Set the SASL authentication mechanism, username, and password.
|
|
#<strong id="Kafka-java-demo__b669862022810">sasl.mechanism</strong> is the SASL mechanism. <strong id="Kafka-java-demo__b2293204992815">username</strong> and <strong id="Kafka-java-demo__b14343112142919">password</strong> are the SASL username and password. Obtain them by referring to section "Collecting Connection Information". For security purposes, you are advised to encrypt the username and password.
|
|
# <strong id="Kafka-java-demo__b4322171817516">If the SASL mechanism is PLAIN, the configuration is as follows:</strong>
|
|
sasl.mechanism=PLAIN
|
|
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
|
username="<strong id="Kafka-java-demo__b1567782662213">username</strong>" \
|
|
password="<strong id="Kafka-java-demo__b12677726142220">password</strong>";
|
|
# <strong id="Kafka-java-demo__b199481638155419">If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:</strong>
|
|
sasl.mechanism=SCRAM-SHA-512
|
|
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
|
username="<strong id="Kafka-java-demo__b421192842513">username</strong>" \
|
|
password="<strong id="Kafka-java-demo__b62118288256">password</strong>";
|
|
|
|
# Set security.protocol.
|
|
# <strong id="Kafka-java-demo__b249141212521">If the security protocol is SASL_SSL, the configuration is as follows:</strong>
|
|
security.protocol=SASL_SSL
|
|
# ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
|
|
ssl.truststore.location=<strong id="Kafka-java-demo__b14247349717">E:\\temp\\client.jks</strong>
|
|
# ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java.
|
|
ssl.truststore.password=dms@kafka
|
|
# ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. <strong id="Kafka-java-demo__b1175436155310">This parameter must be left blank, which indicates disabling domain name verification.</strong>
|
|
ssl.endpoint.identification.algorithm=
|
|
# <strong id="Kafka-java-demo__b152805494544">If the security protocol is SASL_PLAINTEXT, the configuration is as follows:</strong>
|
|
security.protocol=SASL_PLAINTEXT</pre>
|
|
</li><li id="Kafka-java-demo__li156719123520">Consumer configuration file (the <strong id="Kafka-java-demo__b1686581792517">dms.sdk.consumer.properties</strong> file in the <a href="#Kafka-java-demo__li843695925415">message consumption code</a>)<pre class="screen" id="Kafka-java-demo__screen15720561581">#The topic name is in the specific production and consumption code.
|
|
#######################
|
|
#Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
|
|
bootstrap.servers=<strong id="Kafka-java-demo__b177118561487">ip1:port1,ip2:port2,ip3:port3</strong>
|
|
#Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same <strong id="Kafka-java-demo__b81843132817">group.id</strong> for different processes indicates that the processes belong to the same consumer group.
|
|
group.id=<strong id="Kafka-java-demo__b37217561083">1</strong>
|
|
#Method of turning the key into bytes
|
|
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
|
#Method of turning the value into bytes
|
|
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
|
#Offset reset policy
|
|
auto.offset.reset=earliest
|
|
#######################
|
|
#<strong id="Kafka-java-demo__b165957175347">Comment out the following parameters if ciphertext access is not enabled.</strong>
|
|
#######################
|
|
# Set the SASL authentication mechanism, username, and password.
|
|
#<strong id="Kafka-java-demo__b4279173119461">sasl.mechanism</strong> is the SASL mechanism. <strong id="Kafka-java-demo__b22792319466">username</strong> and <strong id="Kafka-java-demo__b1627919310463">password</strong> are the SASL username and password. Obtain them by referring to section "Collecting Connection Information". For security purposes, you are advised to encrypt the username and password.
|
|
# <strong id="Kafka-java-demo__b337691955411">If the SASL mechanism is PLAIN, the configuration is as follows:</strong>
|
|
sasl.mechanism=PLAIN
|
|
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
|
|
username="<strong id="Kafka-java-demo__b19509201114154">username</strong>" \
|
|
password="<strong id="Kafka-java-demo__b750911119155">password</strong>";
|
|
# <strong id="Kafka-java-demo__b1058092115543">If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:</strong>
|
|
sasl.mechanism=SCRAM-SHA-512
|
|
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
|
|
username="<strong id="Kafka-java-demo__b750910119154">username</strong>" \
|
|
password="<strong id="Kafka-java-demo__b185094118152">password</strong>";
|
|
|
|
# Set security.protocol.
|
|
# <strong id="Kafka-java-demo__b13911025135419">If the security protocol is SASL_SSL, the configuration is as follows:</strong>
|
|
security.protocol=SASL_SSL
|
|
# ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
|
|
ssl.truststore.location=<strong id="Kafka-java-demo__b3353133217209">E:\\temp\\client.jks</strong>
|
|
# ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java.
|
|
ssl.truststore.password=dms@kafka
|
|
# ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. <strong id="Kafka-java-demo__b1337143412544">This parameter must be left blank, which indicates disabling domain name verification.</strong>
|
|
ssl.endpoint.identification.algorithm=
|
|
# <strong id="Kafka-java-demo__b596109105515">If the security protocol is SASL_PLAINTEXT, the configuration is as follows:</strong>
|
|
security.protocol=SASL_PLAINTEXT</pre>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="Kafka-java-demo__section5204122943315"><h4 class="sectiontitle">Producing Messages</h4><ul id="Kafka-java-demo__ul426711369375"><li id="Kafka-java-demo__li1026893617376">Test code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen18699101917393"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
|
|
<span class="normal"> 2</span>
|
|
<span class="normal"> 3</span>
|
|
<span class="normal"> 4</span>
|
|
<span class="normal"> 5</span>
|
|
<span class="normal"> 6</span>
|
|
<span class="normal"> 7</span>
|
|
<span class="normal"> 8</span>
|
|
<span class="normal"> 9</span>
|
|
<span class="normal">10</span>
|
|
<span class="normal">11</span>
|
|
<span class="normal">12</span>
|
|
<span class="normal">13</span>
|
|
<span class="normal">14</span>
|
|
<span class="normal">15</span>
|
|
<span class="normal">16</span>
|
|
<span class="normal">17</span>
|
|
<span class="normal">18</span>
|
|
<span class="normal">19</span>
|
|
<span class="normal">20</span>
|
|
<span class="normal">21</span>
|
|
<span class="normal">22</span>
|
|
<span class="normal">23</span>
|
|
<span class="normal">24</span>
|
|
<span class="normal">25</span>
|
|
<span class="normal">26</span>
|
|
<span class="normal">27</span>
|
|
<span class="normal">28</span>
|
|
<span class="normal">29</span>
|
|
<span class="normal">30</span>
|
|
<span class="normal">31</span>
|
|
<span class="normal">32</span>
|
|
<span class="normal">33</span>
|
|
<span class="normal">34</span>
|
|
<span class="normal">35</span>
|
|
<span class="normal">36</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.producer</span><span class="p">;</span>
|
|
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Callback</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.RecordMetadata</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.junit.Test</span><span class="p">;</span>
|
|
|
|
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsProducerTest</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="nd">@Test</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">testProducer</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">DmsProducer</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">DmsProducer</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">></span><span class="p">();</span>
|
|
<span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">partition</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o"><</span><span class="w"> </span><span class="mi">10</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">key</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">data</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">"The msg is "</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">i</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="c1">//Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code.</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">produce</span><span class="p">(</span><span class="s">"topic-0"</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Callback</span><span class="p">()</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">onCompletion</span><span class="p">(</span><span class="n">RecordMetadata</span><span class="w"> </span><span class="n">metadata</span><span class="p">,</span>
|
|
<span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="n">exception</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">exception</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">exception</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">"produce msg completed"</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">});</span>
|
|
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">"produce msg:"</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">data</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">Exception</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="c1">// TODO: Exception handling</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">finally</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="p">}</span>
|
|
</pre></div></td></tr></table></div>
|
|
</div>
|
|
</li><li id="Kafka-java-demo__li6268103683712"><a name="Kafka-java-demo__li6268103683712"></a><a name="li6268103683712"></a>Message production code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen7914125317374"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
|
|
<span class="normal"> 2</span>
|
|
<span class="normal"> 3</span>
|
|
<span class="normal"> 4</span>
|
|
<span class="normal"> 5</span>
|
|
<span class="normal"> 6</span>
|
|
<span class="normal"> 7</span>
|
|
<span class="normal"> 8</span>
|
|
<span class="normal"> 9</span>
|
|
<span class="normal"> 10</span>
|
|
<span class="normal"> 11</span>
|
|
<span class="normal"> 12</span>
|
|
<span class="normal"> 13</span>
|
|
<span class="normal"> 14</span>
|
|
<span class="normal"> 15</span>
|
|
<span class="normal"> 16</span>
|
|
<span class="normal"> 17</span>
|
|
<span class="normal"> 18</span>
|
|
<span class="normal"> 19</span>
|
|
<span class="normal"> 20</span>
|
|
<span class="normal"> 21</span>
|
|
<span class="normal"> 22</span>
|
|
<span class="normal"> 23</span>
|
|
<span class="normal"> 24</span>
|
|
<span class="normal"> 25</span>
|
|
<span class="normal"> 26</span>
|
|
<span class="normal"> 27</span>
|
|
<span class="normal"> 28</span>
|
|
<span class="normal"> 29</span>
|
|
<span class="normal"> 30</span>
|
|
<span class="normal"> 31</span>
|
|
<span class="normal"> 32</span>
|
|
<span class="normal"> 33</span>
|
|
<span class="normal"> 34</span>
|
|
<span class="normal"> 35</span>
|
|
<span class="normal"> 36</span>
|
|
<span class="normal"> 37</span>
|
|
<span class="normal"> 38</span>
|
|
<span class="normal"> 39</span>
|
|
<span class="normal"> 40</span>
|
|
<span class="normal"> 41</span>
|
|
<span class="normal"> 42</span>
|
|
<span class="normal"> 43</span>
|
|
<span class="normal"> 44</span>
|
|
<span class="normal"> 45</span>
|
|
<span class="normal"> 46</span>
|
|
<span class="normal"> 47</span>
|
|
<span class="normal"> 48</span>
|
|
<span class="normal"> 49</span>
|
|
<span class="normal"> 50</span>
|
|
<span class="normal"> 51</span>
|
|
<span class="normal"> 52</span>
|
|
<span class="normal"> 53</span>
|
|
<span class="normal"> 54</span>
|
|
<span class="normal"> 55</span>
|
|
<span class="normal"> 56</span>
|
|
<span class="normal"> 57</span>
|
|
<span class="normal"> 58</span>
|
|
<span class="normal"> 59</span>
|
|
<span class="normal"> 60</span>
|
|
<span class="normal"> 61</span>
|
|
<span class="normal"> 62</span>
|
|
<span class="normal"> 63</span>
|
|
<span class="normal"> 64</span>
|
|
<span class="normal"> 65</span>
|
|
<span class="normal"> 66</span>
|
|
<span class="normal"> 67</span>
|
|
<span class="normal"> 68</span>
|
|
<span class="normal"> 69</span>
|
|
<span class="normal"> 70</span>
|
|
<span class="normal"> 71</span>
|
|
<span class="normal"> 72</span>
|
|
<span class="normal"> 73</span>
|
|
<span class="normal"> 74</span>
|
|
<span class="normal"> 75</span>
|
|
<span class="normal"> 76</span>
|
|
<span class="normal"> 77</span>
|
|
<span class="normal"> 78</span>
|
|
<span class="normal"> 79</span>
|
|
<span class="normal"> 80</span>
|
|
<span class="normal"> 81</span>
|
|
<span class="normal"> 82</span>
|
|
<span class="normal"> 83</span>
|
|
<span class="normal"> 84</span>
|
|
<span class="normal"> 85</span>
|
|
<span class="normal"> 86</span>
|
|
<span class="normal"> 87</span>
|
|
<span class="normal"> 88</span>
|
|
<span class="normal"> 89</span>
|
|
<span class="normal"> 90</span>
|
|
<span class="normal"> 91</span>
|
|
<span class="normal"> 92</span>
|
|
<span class="normal"> 93</span>
|
|
<span class="normal"> 94</span>
|
|
<span class="normal"> 95</span>
|
|
<span class="normal"> 96</span>
|
|
<span class="normal"> 97</span>
|
|
<span class="normal"> 98</span>
|
|
<span class="normal"> 99</span>
|
|
<span class="normal">100</span>
|
|
<span class="normal">101</span>
|
|
<span class="normal">102</span>
|
|
<span class="normal">103</span>
|
|
<span class="normal">104</span>
|
|
<span class="normal">105</span>
|
|
<span class="normal">106</span>
|
|
<span class="normal">107</span>
|
|
<span class="normal">108</span>
|
|
<span class="normal">109</span>
|
|
<span class="normal">110</span>
|
|
<span class="normal">111</span>
|
|
<span class="normal">112</span>
|
|
<span class="normal">113</span>
|
|
<span class="normal">114</span>
|
|
<span class="normal">115</span>
|
|
<span class="normal">116</span>
|
|
<span class="normal">117</span>
|
|
<span class="normal">118</span>
|
|
<span class="normal">119</span>
|
|
<span class="normal">120</span>
|
|
<span class="normal">121</span>
|
|
<span class="normal">122</span>
|
|
<span class="normal">123</span>
|
|
<span class="normal">124</span>
|
|
<span class="normal">125</span>
|
|
<span class="normal">126</span>
|
|
<span class="normal">127</span>
|
|
<span class="normal">128</span>
|
|
<span class="normal">129</span>
|
|
<span class="normal">130</span>
|
|
<span class="normal">131</span>
|
|
<span class="normal">132</span>
|
|
<span class="normal">133</span>
|
|
<span class="normal">134</span>
|
|
<span class="normal">135</span>
|
|
<span class="normal">136</span>
|
|
<span class="normal">137</span>
|
|
<span class="normal">138</span>
|
|
<span class="normal">139</span>
|
|
<span class="normal">140</span>
|
|
<span class="normal">141</span>
|
|
<span class="normal">142</span>
|
|
<span class="normal">143</span>
|
|
<span class="normal">144</span>
|
|
<span class="normal">145</span>
|
|
<span class="normal">146</span>
|
|
<span class="normal">147</span>
|
|
<span class="normal">148</span>
|
|
<span class="normal">149</span>
|
|
<span class="normal">150</span>
|
|
<span class="normal">151</span>
|
|
<span class="normal">152</span>
|
|
<span class="normal">153</span>
|
|
<span class="normal">154</span>
|
|
<span class="normal">155</span>
|
|
<span class="normal">156</span>
|
|
<span class="normal">157</span>
|
|
<span class="normal">158</span>
|
|
<span class="normal">159</span>
|
|
<span class="normal">160</span>
|
|
<span class="normal">161</span>
|
|
<span class="normal">162</span>
|
|
<span class="normal">163</span>
|
|
<span class="normal">164</span>
|
|
<span class="normal">165</span>
|
|
<span class="normal">166</span>
|
|
<span class="normal">167</span>
|
|
<span class="normal">168</span>
|
|
<span class="normal">169</span>
|
|
<span class="normal">170</span>
|
|
<span class="normal">171</span>
|
|
<span class="normal">172</span>
|
|
<span class="normal">173</span>
|
|
<span class="normal">174</span>
|
|
<span class="normal">175</span>
|
|
<span class="normal">176</span>
|
|
<span class="normal">177</span>
|
|
<span class="normal">178</span>
|
|
<span class="normal">179</span>
|
|
<span class="normal">180</span>
|
|
<span class="normal">181</span>
|
|
<span class="normal">182</span>
|
|
<span class="normal">183</span>
|
|
<span class="normal">184</span>
|
|
<span class="normal">185</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.producer</span><span class="p">;</span>
|
|
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.BufferedInputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.FileInputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.IOException</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.InputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.net.URL</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.ArrayList</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Enumeration</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.List</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Properties</span><span class="p">;</span>
|
|
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Callback</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.KafkaProducer</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.Producer</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.producer.ProducerRecord</span><span class="p">;</span>
|
|
|
|
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsProducer</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="c1">//Add the producer configurations that have been specified earlier.</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">CONFIG_PRODUCER_FILE_NAME</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">"dms.sdk.producer.properties"</span><span class="p">;</span>
|
|
|
|
<span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Producer</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="w"> </span><span class="n">producer</span><span class="p">;</span>
|
|
|
|
<span class="w"> </span><span class="n">DmsProducer</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">path</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">in</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">BufferedInputStream</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FileInputStream</span><span class="p">(</span><span class="n">path</span><span class="p">));</span>
|
|
<span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">in</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaProducer</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="n">V</span><span class="o">></span><span class="p">(</span><span class="n">props</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">DmsProducer</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">loadFromClasspath</span><span class="p">(</span><span class="n">CONFIG_PRODUCER_FILE_NAME</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaProducer</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="n">V</span><span class="o">></span><span class="p">(</span><span class="n">props</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Producing messages</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param topic Topic</span>
|
|
<span class="cm"> * @param partition partition</span>
|
|
<span class="cm"> * @param key Message key</span>
|
|
<span class="cm"> * @param data Message data</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Producing messages</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param topic Topic</span>
|
|
<span class="cm"> * @param partition partition</span>
|
|
<span class="cm"> * @param key Message key</span>
|
|
<span class="cm"> * @param data Message data</span>
|
|
<span class="cm"> * @param timestamp timestamp</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Producing messages</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param topic Topic</span>
|
|
<span class="cm"> * @param partition partition</span>
|
|
<span class="cm"> * @param key Message key</span>
|
|
<span class="cm"> * @param data Message data</span>
|
|
<span class="cm"> * @param callback callback</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="kc">null</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Producing messages</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param topic Topic</span>
|
|
<span class="cm"> * @param partition partition</span>
|
|
<span class="cm"> * @param key Message key</span>
|
|
<span class="cm"> * @param data Message data</span>
|
|
<span class="cm"> * @param timestamp timestamp</span>
|
|
<span class="cm"> * @param callback callback</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">data</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">ProducerRecord</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="w"> </span><span class="n">kafkaRecord</span><span class="w"> </span><span class="o">=</span>
|
|
<span class="w"> </span><span class="n">timestamp</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="w"> </span><span class="o">?</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ProducerRecord</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ProducerRecord</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="p">(</span><span class="n">topic</span><span class="p">,</span><span class="w"> </span><span class="n">partition</span><span class="p">,</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">data</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">ProducerRecord</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="w"> </span><span class="n">kafkaRecord</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">produce</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="p">(</span><span class="n">Callback</span><span class="p">)</span><span class="kc">null</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">produce</span><span class="p">(</span><span class="n">ProducerRecord</span><span class="o"><</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">></span><span class="w"> </span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">Callback</span><span class="w"> </span><span class="n">callback</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">send</span><span class="p">(</span><span class="n">kafkaRecord</span><span class="p">,</span><span class="w"> </span><span class="n">callback</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">close</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">producer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * get classloader from thread context if no classloader found in thread</span>
|
|
<span class="cm"> * context return the classloader which has loaded this class</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @return classloader</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="nf">getCurrentClassLoader</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">currentThread</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">.</span><span class="na">getContextClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">classLoader</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DmsProducer</span><span class="p">.</span><span class="na">class</span><span class="p">.</span><span class="na">getClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">classLoader</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Load configuration information from classpath.</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param configFileName Configuration file name</span>
|
|
<span class="cm"> * @return Configuration information</span>
|
|
<span class="cm"> * @throws IOException</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="nf">loadFromClasspath</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">configFileName</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">IOException</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">getCurrentClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
|
|
<span class="w"> </span><span class="n">List</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="w"> </span><span class="n">properties</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">Enumeration</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="w"> </span><span class="n">propertyResources</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">classLoader</span>
|
|
<span class="w"> </span><span class="p">.</span><span class="na">getResources</span><span class="p">(</span><span class="n">configFileName</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">hasMoreElements</span><span class="p">())</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">properties</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">nextElement</span><span class="p">());</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">URL</span><span class="w"> </span><span class="n">url</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">properties</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="k">try</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">url</span><span class="p">.</span><span class="na">openStream</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">config</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">is</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="k">finally</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">is</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">config</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="p">}</span>
|
|
</pre></div></td></tr></table></div>
|
|
</div>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="Kafka-java-demo__section19410163413542"><h4 class="sectiontitle">Consuming Messages</h4><ul id="Kafka-java-demo__ul164361859155416"><li id="Kafka-java-demo__li12436759165413">Test code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen171556519552"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
|
|
<span class="normal"> 2</span>
|
|
<span class="normal"> 3</span>
|
|
<span class="normal"> 4</span>
|
|
<span class="normal"> 5</span>
|
|
<span class="normal"> 6</span>
|
|
<span class="normal"> 7</span>
|
|
<span class="normal"> 8</span>
|
|
<span class="normal"> 9</span>
|
|
<span class="normal">10</span>
|
|
<span class="normal">11</span>
|
|
<span class="normal">12</span>
|
|
<span class="normal">13</span>
|
|
<span class="normal">14</span>
|
|
<span class="normal">15</span>
|
|
<span class="normal">16</span>
|
|
<span class="normal">17</span>
|
|
<span class="normal">18</span>
|
|
<span class="normal">19</span>
|
|
<span class="normal">20</span>
|
|
<span class="normal">21</span>
|
|
<span class="normal">22</span>
|
|
<span class="normal">23</span>
|
|
<span class="normal">24</span>
|
|
<span class="normal">25</span>
|
|
<span class="normal">26</span>
|
|
<span class="normal">27</span>
|
|
<span class="normal">28</span>
|
|
<span class="normal">29</span>
|
|
<span class="normal">30</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.consumer</span><span class="p">;</span>
|
|
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecords</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.junit.Test</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Arrays</span><span class="p">;</span>
|
|
|
|
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsConsumerTest</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="nd">@Test</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">testConsumer</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">DmsConsumer</span><span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">DmsConsumer</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">consume</span><span class="p">(</span><span class="n">Arrays</span><span class="p">.</span><span class="na">asList</span><span class="p">(</span><span class="s">"topic-0"</span><span class="p">));</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="mi">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o"><</span><span class="w"> </span><span class="mi">10</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">){</span>
|
|
<span class="w"> </span><span class="n">ConsumerRecords</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="w"> </span><span class="n">records</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">poll</span><span class="p">(</span><span class="mi">1000</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="s">"the numbers of topic:"</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">records</span><span class="p">.</span><span class="na">count</span><span class="p">());</span>
|
|
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">ConsumerRecord</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="w"> </span><span class="n">record</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">records</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">out</span><span class="p">.</span><span class="na">println</span><span class="p">(</span><span class="n">record</span><span class="p">.</span><span class="na">toString</span><span class="p">());</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">Exception</span><span class="w"> </span><span class="n">e</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="c1">// TODO: Exception handling</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">finally</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="p">}</span>
|
|
</pre></div></td></tr></table></div>
|
|
</div>
|
|
</li><li id="Kafka-java-demo__li843695925415"><a name="Kafka-java-demo__li843695925415"></a><a name="li843695925415"></a>Message consumption code<div class="codecoloring" codetype="Java" id="Kafka-java-demo__screen97141458568"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
|
|
<span class="normal"> 2</span>
|
|
<span class="normal"> 3</span>
|
|
<span class="normal"> 4</span>
|
|
<span class="normal"> 5</span>
|
|
<span class="normal"> 6</span>
|
|
<span class="normal"> 7</span>
|
|
<span class="normal"> 8</span>
|
|
<span class="normal"> 9</span>
|
|
<span class="normal"> 10</span>
|
|
<span class="normal"> 11</span>
|
|
<span class="normal"> 12</span>
|
|
<span class="normal"> 13</span>
|
|
<span class="normal"> 14</span>
|
|
<span class="normal"> 15</span>
|
|
<span class="normal"> 16</span>
|
|
<span class="normal"> 17</span>
|
|
<span class="normal"> 18</span>
|
|
<span class="normal"> 19</span>
|
|
<span class="normal"> 20</span>
|
|
<span class="normal"> 21</span>
|
|
<span class="normal"> 22</span>
|
|
<span class="normal"> 23</span>
|
|
<span class="normal"> 24</span>
|
|
<span class="normal"> 25</span>
|
|
<span class="normal"> 26</span>
|
|
<span class="normal"> 27</span>
|
|
<span class="normal"> 28</span>
|
|
<span class="normal"> 29</span>
|
|
<span class="normal"> 30</span>
|
|
<span class="normal"> 31</span>
|
|
<span class="normal"> 32</span>
|
|
<span class="normal"> 33</span>
|
|
<span class="normal"> 34</span>
|
|
<span class="normal"> 35</span>
|
|
<span class="normal"> 36</span>
|
|
<span class="normal"> 37</span>
|
|
<span class="normal"> 38</span>
|
|
<span class="normal"> 39</span>
|
|
<span class="normal"> 40</span>
|
|
<span class="normal"> 41</span>
|
|
<span class="normal"> 42</span>
|
|
<span class="normal"> 43</span>
|
|
<span class="normal"> 44</span>
|
|
<span class="normal"> 45</span>
|
|
<span class="normal"> 46</span>
|
|
<span class="normal"> 47</span>
|
|
<span class="normal"> 48</span>
|
|
<span class="normal"> 49</span>
|
|
<span class="normal"> 50</span>
|
|
<span class="normal"> 51</span>
|
|
<span class="normal"> 52</span>
|
|
<span class="normal"> 53</span>
|
|
<span class="normal"> 54</span>
|
|
<span class="normal"> 55</span>
|
|
<span class="normal"> 56</span>
|
|
<span class="normal"> 57</span>
|
|
<span class="normal"> 58</span>
|
|
<span class="normal"> 59</span>
|
|
<span class="normal"> 60</span>
|
|
<span class="normal"> 61</span>
|
|
<span class="normal"> 62</span>
|
|
<span class="normal"> 63</span>
|
|
<span class="normal"> 64</span>
|
|
<span class="normal"> 65</span>
|
|
<span class="normal"> 66</span>
|
|
<span class="normal"> 67</span>
|
|
<span class="normal"> 68</span>
|
|
<span class="normal"> 69</span>
|
|
<span class="normal"> 70</span>
|
|
<span class="normal"> 71</span>
|
|
<span class="normal"> 72</span>
|
|
<span class="normal"> 73</span>
|
|
<span class="normal"> 74</span>
|
|
<span class="normal"> 75</span>
|
|
<span class="normal"> 76</span>
|
|
<span class="normal"> 77</span>
|
|
<span class="normal"> 78</span>
|
|
<span class="normal"> 79</span>
|
|
<span class="normal"> 80</span>
|
|
<span class="normal"> 81</span>
|
|
<span class="normal"> 82</span>
|
|
<span class="normal"> 83</span>
|
|
<span class="normal"> 84</span>
|
|
<span class="normal"> 85</span>
|
|
<span class="normal"> 86</span>
|
|
<span class="normal"> 87</span>
|
|
<span class="normal"> 88</span>
|
|
<span class="normal"> 89</span>
|
|
<span class="normal"> 90</span>
|
|
<span class="normal"> 91</span>
|
|
<span class="normal"> 92</span>
|
|
<span class="normal"> 93</span>
|
|
<span class="normal"> 94</span>
|
|
<span class="normal"> 95</span>
|
|
<span class="normal"> 96</span>
|
|
<span class="normal"> 97</span>
|
|
<span class="normal"> 98</span>
|
|
<span class="normal"> 99</span>
|
|
<span class="normal">100</span>
|
|
<span class="normal">101</span>
|
|
<span class="normal">102</span>
|
|
<span class="normal">103</span>
|
|
<span class="normal">104</span>
|
|
<span class="normal">105</span>
|
|
<span class="normal">106</span>
|
|
<span class="normal">107</span>
|
|
<span class="normal">108</span>
|
|
<span class="normal">109</span>
|
|
<span class="normal">110</span>
|
|
<span class="normal">111</span>
|
|
<span class="normal">112</span>
|
|
<span class="normal">113</span>
|
|
<span class="normal">114</span>
|
|
<span class="normal">115</span>
|
|
<span class="normal">116</span></pre></div></td><td class="code"><div><pre><span></span><span class="kn">package</span><span class="w"> </span><span class="nn">com.dms.consumer</span><span class="p">;</span>
|
|
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.ConsumerRecords</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.kafka.clients.consumer.KafkaConsumer</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.BufferedInputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.FileInputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.IOException</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.io.InputStream</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.net.URL</span><span class="p">;</span>
|
|
<span class="kn">import</span><span class="w"> </span><span class="nn">java.util.*</span><span class="p">;</span>
|
|
|
|
<span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DmsConsumer</span><span class="w"> </span><span class="p">{</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">CONFIG_CONSUMER_FILE_NAME</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">"dms.sdk.consumer.properties"</span><span class="p">;</span>
|
|
|
|
<span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="w"> </span><span class="n">consumer</span><span class="p">;</span>
|
|
|
|
<span class="w"> </span><span class="n">DmsConsumer</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">path</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">in</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">BufferedInputStream</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FileInputStream</span><span class="p">(</span><span class="n">path</span><span class="p">));</span>
|
|
<span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">in</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="p">(</span><span class="n">props</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="n">DmsConsumer</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">loadFromClasspath</span><span class="p">(</span><span class="n">CONFIG_CONSUMER_FILE_NAME</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">IOException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">printStackTrace</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">KafkaConsumer</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="p">(</span><span class="n">props</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">consume</span><span class="p">(</span><span class="n">List</span><span class="w"> </span><span class="n">topics</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">subscribe</span><span class="p">(</span><span class="n">topics</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">ConsumerRecords</span><span class="o"><</span><span class="n">Object</span><span class="p">,</span><span class="w"> </span><span class="n">Object</span><span class="o">></span><span class="w"> </span><span class="nf">poll</span><span class="p">(</span><span class="kt">long</span><span class="w"> </span><span class="n">timeout</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">poll</span><span class="p">(</span><span class="n">timeout</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">close</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">consumer</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * get classloader from thread context if no classloader found in thread</span>
|
|
<span class="cm"> * context return the classloader which has loaded this class</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @return classloader</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="nf">getCurrentClassLoader</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">currentThread</span><span class="p">()</span>
|
|
<span class="w"> </span><span class="p">.</span><span class="na">getContextClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">classLoader</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DmsConsumer</span><span class="p">.</span><span class="na">class</span><span class="p">.</span><span class="na">getClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">classLoader</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="cm">/**</span>
|
|
<span class="cm"> * Load configuration information from classpath.</span>
|
|
<span class="cm"> *</span>
|
|
<span class="cm"> * @param configFileName Configuration file name</span>
|
|
<span class="cm"> * @return Configuration information</span>
|
|
<span class="cm"> * @throws IOException</span>
|
|
<span class="cm"> */</span>
|
|
<span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="nf">loadFromClasspath</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">configFileName</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">IOException</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">ClassLoader</span><span class="w"> </span><span class="n">classLoader</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">getCurrentClassLoader</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span>
|
|
|
|
<span class="w"> </span><span class="n">List</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="w"> </span><span class="n">properties</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">Enumeration</span><span class="o"><</span><span class="n">URL</span><span class="o">></span><span class="w"> </span><span class="n">propertyResources</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">classLoader</span>
|
|
<span class="w"> </span><span class="p">.</span><span class="na">getResources</span><span class="p">(</span><span class="n">configFileName</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">hasMoreElements</span><span class="p">())</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">properties</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">propertyResources</span><span class="p">.</span><span class="na">nextElement</span><span class="p">());</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">URL</span><span class="w"> </span><span class="n">url</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">properties</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">InputStream</span><span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="k">try</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">url</span><span class="p">.</span><span class="na">openStream</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">config</span><span class="p">.</span><span class="na">load</span><span class="p">(</span><span class="n">is</span><span class="p">);</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="k">finally</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">is</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span>
|
|
<span class="w"> </span><span class="p">{</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="p">.</span><span class="na">close</span><span class="p">();</span>
|
|
<span class="w"> </span><span class="n">is</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">null</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
|
|
<span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">config</span><span class="p">;</span>
|
|
<span class="w"> </span><span class="p">}</span>
|
|
<span class="p">}</span>
|
|
</pre></div></td></tr></table></div>
|
|
</div>
|
|
</li></ul>
|
|
</div>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="kafka-java.html">Java</a></div>
|
|
</div>
|
|
</div>
|
|
|