任务失败恢复
当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,使任务恢复到正常状态。
重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应该重新启动哪些任务以恢复作业。
一、重启策略
集群可以使用默认的重新启动策略启动,当没有定义特定于作业的重新启动策略时,总是使用该策略。如果作业提交时带有重启策略,则该策略将覆盖集群的默认设置。
默认重启策略是通过Flink的配置文件flink-conf.yaml设置的。配置参数restart-strategy.type定义采用哪种策略。如果检查点未启用,则使用“不重启(no restart)”策略。如果检查点已激活且未配置重启策略,则使用固定延迟(fixed-delay)策略,并使用Integer.MAX_VALUE重启尝试。
restart-strategy.type定义在作业失败时使用的重新启动策略,可接收的值为String类型,包括:
- none, off, disable:无重启策略。
- fixeddelay, fixed-delay:固定延迟重启策略。
- failurerate, failure-rate:故障率重启策略。
- exponentialdelay, exponential-delay:指数延迟重启策略。
如果禁用检查点,默认值为none。如果启用了检查点,默认值为fixed-delay,带有Integer.MAX_VALUE重新启动尝试和'1 s'延迟。
除了定义默认的重新启动策略外,还可以为每个Flink作业定义一个特定的重新启动策略。这个重启策略是通过调用StreamExecutionEnvironment上的setRestartStrategy方法以编程方式设置的。
下面的示例展示了如何为作业设置固定延迟重启策略。如果失败,系统将尝试重新启动作业3次,并在两次重新启动之间等待10秒。
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重启尝试次数 Time.of(10, TimeUnit.SECONDS) // 延迟 ));
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重启尝试次数 Time.of(10, TimeUnit.SECONDS) // 延迟 ))
Python代码:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
3, # 重启尝试次数
10000 # 延迟(毫秒)
))
以下部分描述了重启策略特定的配置选项。
1. 固定延迟重启策略
固定延迟重启策略尝试给定次数重新启动作业。如果超过最大尝试次数,作业最终将失败。在两次连续的重新启动尝试之间,重新启动策略等待固定的时间。
通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。
restart-strategy.type: fixed-delay
该策略可配置的其他参数:
| 参数 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|
| restart-strategy.fixed-delay.attempts | 1 | Integer | 在任务被声明为失败之前Flink重试执行的次数。 |
| restart-strategy.fixed-delay.delay | 1 s | Duration | Duration |
配置示例:
restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s
固定延迟重启策略也可以通过编程方式设置:
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重启尝试次数 Time.of(10, TimeUnit.SECONDS) // 延迟 ));
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重启尝试次数 Time.of(10, TimeUnit.SECONDS) // 延迟 ))
Python代码:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
3, # 重启尝试次数
10000 # 延迟 (毫秒)
))
上面的设置表示:如果job失败,重启3次, 每次间隔10秒钟
2. 指数延迟重启策略
指数延迟重启策略试图无限地重新启动作业,不断增加延迟,直到最大延迟。作业从来不会失败。在两次连续的重新启动尝试之间,重新启动策略保持指数级增长,直到达到最大次数。然后,它将延迟保持在最大数量。
当作业正确执行时,指数延迟值在一段时间后重置;该阈值是可配置的。
通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。
restart-strategy.type: exponential-delay
该策略需要以下几个配置:
| 参数 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|
| restart-strategy.exponential-delay.backoff-multiplier | 2.0 | Double | 每次失败后,回退值(backoff value)乘以此值,直到达到最大回退(backoff)。 |
| restart-strategy.exponential-delay.initial-backoff | 1 s | Duration | 两次重启之间的启动持续时间。可以使用"1 min"、"20 s"符号来指定。 |
| restart-strategy.exponential-delay.jitter-factor | 0.1 | Double | 指定作为回退(backoff)的一部分的抖动。它表示将向回退增加或减去多大的随机值。当希望避免同时重新启动多个作业时非常有用。 |
| restart-strategy.exponential-delay.max-backoff | 5 min | Duration | 重启之间的最长持续时间。可以使用"1 min"、"20 s"符号来指定。 |
| restart-strategy.exponential-delay.reset-backoff-threshold | 1 h | Duration | 将回退(backoff)重置为初始值时的阈值。它指定作业必须无故障运行多长时间,才能将指数增长的回退(backoff)重置为初始值。可以使用"1 min"、"20 s"符号来指定。 |
配置示例:
restart-strategy.exponential-delay.initial-backoff: 10 s restart-strategy.exponential-delay.max-backoff: 2 min restart-strategy.exponential-delay.backoff-multiplier: 2.0 restart-strategy.exponential-delay.reset-backoff-threshold: 10 min restart-strategy.exponential-delay.jitter-factor: 0.1
指数延迟重启策略也可以通过编程方式设置:
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.exponentialDelayRestart( Time.milliseconds(1), Time.milliseconds(1000), 1.1, // 指数乘子 Time.milliseconds(2000), // 将延迟重置为初始值的阈值持续时间 0.1 // 抖动 ));
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.exponentialDelayRestart( Time.of(1, TimeUnit.MILLISECONDS), // 两次重启之间的初始延迟时间 Time.of(1000, TimeUnit.MILLISECONDS), // 两次重启之间的最大延迟时间 1.1, // 指数乘子 Time.of(2, TimeUnit.SECONDS), // 将延迟重置为初始值的阈值持续时间 0.1 // 抖动 ))
Python代码:
Python API目前还不支持。
3. 故障率重启策略
故障率重启策略在任务失败后重新启动任务,但当超过故障率(failure rate, 每个时间间隔的失败次数)时,任务最终失败。在两次连续的重新启动尝试之间,重新启动策略等待固定的时间。
通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。
restart-strategy.type: failure-rate
该策略需要以下几个配置:
| 参数 | 默认值 | 数据类型 | 描述 |
|---|---|---|---|
| restart-strategy.failure-rate.delay | 1 s | Duration | 连续两次重启尝试之间的延迟。可以使用"1 min"、"20 s"符号来指定。 |
| restart-strategy.failure-rate.failure-rate-interval | 1 min | Duration | 测量故障率的时间间隔。可以使用"1 min"、"20 s"符号来指定。 |
| restart-strategy.failure-rate.max-failures-per-interval | 1 | Integer | 失败前在给定时间间隔内的最大重启次数。 |
配置示例:
restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s
故障率重启策略也可以通过编程方式设置:
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每个周期最大故障次数 Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔 Time.of(10, TimeUnit.SECONDS) // 延迟 ));
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每单元最大故障数 Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔 Time.of(10, TimeUnit.SECONDS) // 延迟 ))
Python代码:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.failure_rate_restart(
3, # 每个周期最大故障次数
500000, # 测量故障率的时间间隔(毫秒)
10000 # 延迟(毫秒)
))
上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失败超过3次,则程序退出)
4. 不重启策略
作业直接失败,不尝试重新启动。
通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。
restart-strategy.type: none
不重启策略也可以通过编程方式设置:
Java代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart());
Scala代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.noRestart())
Python代码:
env = StreamExecutionEnvironment.get_execution_environment() env.set_restart_strategy(RestartStrategies.no_restart())
5. 备用重启策略
使用集群定义的重启策略。这对于启用检查点的流程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。
二、故障转移策略
Flink支持不同的故障转移策略,这些策略可以通过在配置文件flink-conf.yaml中配置参数jobmanager.execution.failover-strategy进行配置。
| jobmanager.execution.failover-strategy的值 | 故障转移策略值 |
|---|---|
| full | 重启全部 |
| region | 重新启动流水线区域 |
1. 重启全部的故障转移策略
此策略重新启动作业中的所有任务以从任务失败中恢复。
2. 重启管道区域故障转移策略
该策略将任务划分为互不关联的区域。当检测到任务失败时,该策略计算必须重新启动以从失败中恢复的最小区域集。对于某些作业,与"重启全部的故障转移策略"相比,这可能导致重新启动的任务更少。
一个区域(region)是一组通过流水线(管道)数据交换进行通信的任务。也就是说,批处理数据交换表示区域的边界。
- DataStream作业或流表/SQL作业中的所有数据交换都是流水线化的。
- 批处理表/SQL作业中的所有数据交换默认情况下都是批处理的。
- DataSet作业中的数据交换类型由ExecutionMode决定,可以通过ExecutionConfig设置ExecutionMode。
重启区域确定如下:
- 1) 失败任务所在的区域将重新启动。
- 2) 如果结果分区不可用,而将要重新启动的区域需要它,则产生结果分区的区域也将重新启动。
- 3) 如果要重新启动一个区域,那么它的所有消费区域也将重新启动。这是为了保证数据的一致性,因为不确定的处理或分区可能导致不同的分区。