This section describes how to convert and run a complete Storm topology developed using Storm API.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-storm_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.4.0</version> <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
If the project is not a non-Maven project, manually collect the preceding JAR packages and add them to the classpath environment variable of the project.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config(); conf.setNumWorkers(3); StormSubmitter.submitTopology("word-count", conf, builder.createTopology());
Perform the following operations:
Config conf = new Config(); conf.setNumWorkers(3); //converts Storm Config to StormConfig of Flink. StormConfig stormConfig = new StormConfig(conf); //Construct FlinkTopology using TopologBuilder of Storm. FlinkTopology topology = FlinkTopology.createTopology(builder); //Obtain the Stream execution environment. StreamExecutionEnvironment env = topology.getExecutionEnvironment(); //Set StormConfig to the environment variable of Job to construct Bolt and Spout. //If StormConfig is not required during the initialization of Bolt and Spout, you do not need to set this parameter. env.getConfig().setGlobalJobParameters(stormConfig); //Submit the topology. topology.execute();