The degree of parallelism (DOP) indicates the number of tasks to be executed concurrently. It determines the number of data blocks after the operation. Configuring the DOP will optimize the number of tasks, data volume of each task, and the host processing capability.
Query the CPU and memory usage. If data and tasks are not evenly distributed among nodes, increase the DOP for even distribution.
Configure the DOP at one of the following layers (the priorities of which are in the descending order) based on the actual memory, CPU, data, and application logic conditions:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
Flink runs in the execution environment which defines a default DOP for operators, data source and data sink.
Call the setParallelism() method to specify the default DOP of the execution environment. Example:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
On the Flink client, modify the parallelism.default parameter in the flink-conf.yaml file under the conf to specify the DOP for all execution environments.