Task Failure Recovery
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

タスク障害のリカバリ #

When a task failure happens, Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.

Restart strategies and failover strategies are used to control the task restarting. Restart strategies decide whether and when the failed/affected tasks can be restarted. Failover strategies decide which tasks should be restarted to recover the job.

ストラテジの再実行 #

ジョブに固有ではない再起動戦略が定義された時に常に使われるデフォルトの再起動戦略を使ってクラスタを起動することができます。 In case that the job is submitted with a restart strategy, this strategy overrides the cluster’s default setting.

The default restart strategy is set via Flink’s configuration file flink-conf.yaml. The configuration parameter restart-strategy.type defines which strategy is taken. If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts. どの値がサポートされるかを学ぶために、以下の利用可能な再起動戦略のリストを見てください。

各再起動戦略はその挙動を制御するパラメータの固有のセットが付属しています。 これらの値は設定ファイルの中でも設定されます。 各再起動戦略の説明にはそれぞれの設定値についてのより多くの情報が含まれます。

Key Default Type Description
(none) String Defines the restart strategy to use in case of job failures.
Accepted values are:
  • none, off, disable: No restart strategy.
  • fixeddelay, fixed-delay: Fixed delay restart strategy. More details can be found here.
  • failurerate, failure-rate: Failure rate restart strategy. More details can be found here.
  • exponentialdelay, exponential-delay: Exponential delay restart strategy. More details can be found here.
If checkpointing is disabled, the default value is none. If checkpointing is enabled, the default value is fixed-delay with Integer.MAX_VALUE restart attempts and '1 s' delay.

デフォルトの再起動戦略の定義とは別に、各Flinkのジョブについて特定の再起動戦略を定義することが可能です。 This restart strategy is set programmatically by calling the setRestartStrategy method on the StreamExecutionEnvironment.

以下の例はジョブに固定の遅延再起動戦略をどうやって設定することができるかを示します。 障害時には、システムはジョブを3回再起動しようとし、連続する再起動の試行の間で10秒間待ちます。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
val env = StreamExecutionEnvironment.getExecutionEnvironment()
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
env = StreamExecutionEnvironment.get_execution_environment()
    3,  # number of restart attempts
    10000  # delay(millisecond)


固定の遅延再起動戦略 #

固定の遅延再起動戦略はジョブの再起動を指定された回数だけ試そうとします。 もし指向の最大数が超過するとジョブは結果的に失敗します。 2つの連続する再起動の試行の間で、再起動戦略は固定の時間を待ちます。

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

restart-strategy.type: fixed-delay
Key Default Type Description
1 Integer The number of times that Flink retries the execution before the job is declared as failed if restart-strategy.type has been set to fixed-delay.
1 s Duration Delay between two consecutive restart attempts if restart-strategy.type has been set to fixed-delay. Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. It can be specified using notation: "1 min", "20 s"


restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
val env = StreamExecutionEnvironment.getExecutionEnvironment()
  3, // number of restart attempts
  Time.of(10, TimeUnit.SECONDS) // delay
env = StreamExecutionEnvironment.get_execution_environment()
    3,  # number of restart attempts
    10000  # delay(millisecond)

Exponential Delay Restart Strategy #

The exponential delay restart strategy attempts to restart the job infinitely, with increasing delay up to the maximum delay. The job never fails. In-between two consecutive restart attempts, the restart strategy keeps exponentially increasing until the maximum number is reached. Then, it keeps the delay at the maximum number.

When the job executes correctly, the exponential delay value resets after some time; this threshold is configurable.

restart-strategy.type: exponential-delay
Key Default Type Description
2.0 Double Backoff value is multiplied by this value after every failure,until max backoff is reached if restart-strategy.type has been set to exponential-delay.
1 s Duration Starting duration between restarts if restart-strategy.type has been set to exponential-delay. It can be specified using notation: "1 min", "20 s"
0.1 Double Jitter specified as a portion of the backoff if restart-strategy.type has been set to exponential-delay. It represents how large random value will be added or subtracted to the backoff. Useful when you want to avoid restarting multiple jobs at the same time.
5 min Duration The highest possible duration between restarts if restart-strategy.type has been set to exponential-delay. It can be specified using notation: "1 min", "20 s"
1 h Duration Threshold when the backoff is reset to its initial value if restart-strategy.type has been set to exponential-delay. It specifies how long the job must be running without failure to reset the exponentially increasing backoff to its initial value. It can be specified using notation: "1 min", "20 s"


restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1

The exponential delay restart strategy can also be set programmatically:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1.1, // exponential multiplier
  Time.milliseconds(2000), // threshold duration to reset delay to its initial value
  0.1 // jitter
val env = StreamExecutionEnvironment.getExecutionEnvironment()
  Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
  Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
  1.1, // exponential multiplier
  Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
  0.1 // jitter
またPython APIではサポートされません

障害レートの再起動戦略 #

The failure rate restart strategy restarts job after failure, but when failure rate (failures per time interval) is exceeded, the job eventually fails. 2つの連続する再起動の試行の間で、再起動戦略は固定の時間を待ちます。

This strategy is enabled as default by setting the following configuration parameter in flink-conf.yaml.

restart-strategy.type: failure-rate
Key Default Type Description
1 s Duration Delay between two consecutive restart attempts if restart-strategy.type has been set to failure-rate. It can be specified using notation: "1 min", "20 s"
1 min Duration Time interval for measuring failure rate if restart-strategy.type has been set to failure-rate. It can be specified using notation: "1 min", "20 s"
1 Integer Maximum number of restarts in given time interval before failing a job if restart-strategy.type has been set to failure-rate.
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3, // max failures per interval
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
val env = StreamExecutionEnvironment.getExecutionEnvironment()
  3, // max failures per unit
  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
  Time.of(10, TimeUnit.SECONDS) // delay
env = StreamExecutionEnvironment.get_execution_environment()
    3,  # max failures per interval
    300000,  # interval for measuring failure rate (millisecond)
    10000  # dela(millisecond)

再起動しない戦略 #


restart-strategy.type: none


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env = StreamExecutionEnvironment.get_execution_environment()

後退する再起動戦略 #

再開戦略が定義されたクラスタが使われます。 This is helpful for streaming programs which enable checkpointing. By default, a fixed delay restart strategy is chosen if there is no other restart strategy defined.

Failover Strategies #

Flink supports different failover strategies which can be configured via the configuration parameter jobmanager.execution.failover-strategy in Flink’s configuration file flink-conf.yaml.

Failover Strategy Value for jobmanager.execution.failover-strategy
Restart all full
Restart pipelined region region

Restart All Failover Strategy #

This strategy restarts all tasks in the job to recover from a task failure.

Restart Pipelined Region Failover Strategy #

This strategy groups tasks into disjoint regions. When a task failure is detected, this strategy computes the smallest set of regions that must be restarted to recover from the failure. For some jobs this can result in fewer tasks that will be restarted compared to the Restart All Failover Strategy.

A region is a set of tasks that communicate via pipelined data exchanges. That is, batch data exchanges denote the boundaries of a region.

DataStream/Table/SQL job data exchanges are determined by the ExecutionMode, which can be set through ExecutionConfig, which are pipelined in Streaming Mode, are batched by default in Batch Mode.

The regions to restart are decided as below:

  1. The region containing the failed task will be restarted.
  2. If a result partition is not available while it is required by a region that will be restarted, the region producing the result partition will be restarted as well.
  3. If a region is to be restarted, all of its consumer regions will also be restarted. This is to guarantee data consistency because nondeterministic processing or partitioning can result in different partitions.

Back to top

inserted by FC2 system