发布日期:2023-04-04 VIP内容

任务失败恢复

当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,使任务恢复到正常状态。

重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应该重新启动哪些任务以恢复作业。

一、重启策略

集群可以使用默认的重新启动策略启动,当没有定义特定于作业的重新启动策略时,总是使用该策略。如果作业提交时带有重启策略,则该策略将覆盖集群的默认设置。

默认重启策略是通过Flink的配置文件flink-conf.yaml设置的。配置参数restart-strategy.type定义采用哪种策略。如果检查点未启用,则使用“不重启(no restart)”策略。如果检查点已激活且未配置重启策略,则使用固定延迟(fixed-delay)策略,并使用Integer.MAX_VALUE重启尝试。

restart-strategy.type定义在作业失败时使用的重新启动策略,可接收的值为String类型,包括:

  • none, off, disable:无重启策略。
  • fixeddelay, fixed-delay:固定延迟重启策略。
  • failurerate, failure-rate:故障率重启策略。
  • exponentialdelay, exponential-delay:指数延迟重启策略。

如果禁用检查点,默认值为none。如果启用了检查点,默认值为fixed-delay,带有Integer.MAX_VALUE重新启动尝试和'1 s'延迟。

除了定义默认的重新启动策略外,还可以为每个Flink作业定义一个特定的重新启动策略。这个重启策略是通过调用StreamExecutionEnvironment上的setRestartStrategy方法以编程方式设置的。

下面的示例展示了如何为作业设置固定延迟重启策略。如果失败,系统将尝试重新启动作业3次,并在两次重新启动之间等待10秒。

Java代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重启尝试次数
  Time.of(10, TimeUnit.SECONDS) // 延迟
));

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重启尝试次数
  Time.of(10, TimeUnit.SECONDS) // 延迟
))

Python代码:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
    3,  # 重启尝试次数
    10000  # 延迟(毫秒)
))

以下部分描述了重启策略特定的配置选项。

1. 固定延迟重启策略

固定延迟重启策略尝试给定次数重新启动作业。如果超过最大尝试次数,作业最终将失败。在两次连续的重新启动尝试之间,重新启动策略等待固定的时间。

通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。

restart-strategy.type: fixed-delay

该策略可配置的其他参数:

参数 默认值 数据类型 描述
restart-strategy.fixed-delay.attempts 1 Integer 在任务被声明为失败之前Flink重试执行的次数。
restart-strategy.fixed-delay.delay 1 s Duration Duration

配置示例:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延迟重启策略也可以通过编程方式设置:

Java代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重启尝试次数
  Time.of(10, TimeUnit.SECONDS) // 延迟 
));

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3, // 重启尝试次数
  Time.of(10, TimeUnit.SECONDS) // 延迟 
))

Python代码:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(
    3,  # 重启尝试次数
    10000  # 延迟 (毫秒)
))

上面的设置表示:如果job失败,重启3次, 每次间隔10秒钟

2. 指数延迟重启策略

指数延迟重启策略试图无限地重新启动作业,不断增加延迟,直到最大延迟。作业从来不会失败。在两次连续的重新启动尝试之间,重新启动策略保持指数级增长,直到达到最大次数。然后,它将延迟保持在最大数量。

当作业正确执行时,指数延迟值在一段时间后重置;该阈值是可配置的。

通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。

restart-strategy.type: exponential-delay

该策略需要以下几个配置:

参数 默认值 数据类型 描述
restart-strategy.exponential-delay.backoff-multiplier 2.0 Double 每次失败后,回退值(backoff value)乘以此值,直到达到最大回退(backoff)。
restart-strategy.exponential-delay.initial-backoff 1 s Duration 两次重启之间的启动持续时间。可以使用"1 min"、"20 s"符号来指定。
restart-strategy.exponential-delay.jitter-factor 0.1 Double 指定作为回退(backoff)的一部分的抖动。它表示将向回退增加或减去多大的随机值。当希望避免同时重新启动多个作业时非常有用。
restart-strategy.exponential-delay.max-backoff 5 min Duration 重启之间的最长持续时间。可以使用"1 min"、"20 s"符号来指定。
restart-strategy.exponential-delay.reset-backoff-threshold 1 h Duration 将回退(backoff)重置为初始值时的阈值。它指定作业必须无故障运行多长时间,才能将指数增长的回退(backoff)重置为初始值。可以使用"1 min"、"20 s"符号来指定。

配置示例:

restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1

指数延迟重启策略也可以通过编程方式设置:

Java代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  Time.milliseconds(1),
  Time.milliseconds(1000),
  1.1, // 指数乘子
  Time.milliseconds(2000), // 将延迟重置为初始值的阈值持续时间
  0.1 // 抖动
));

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
  Time.of(1, TimeUnit.MILLISECONDS),	// 两次重启之间的初始延迟时间
  Time.of(1000, TimeUnit.MILLISECONDS), // 两次重启之间的最大延迟时间
  1.1,	// 指数乘子
  Time.of(2, TimeUnit.SECONDS),		// 将延迟重置为初始值的阈值持续时间
  0.1	// 抖动
))

Python代码:

Python API目前还不支持。

3. 故障率重启策略

故障率重启策略在任务失败后重新启动任务,但当超过故障率(failure rate, 每个时间间隔的失败次数)时,任务最终失败。在两次连续的重新启动尝试之间,重新启动策略等待固定的时间。

通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。

restart-strategy.type: failure-rate

该策略需要以下几个配置:

参数 默认值 数据类型 描述
restart-strategy.failure-rate.delay 1 s Duration 连续两次重启尝试之间的延迟。可以使用"1 min"、"20 s"符号来指定。
restart-strategy.failure-rate.failure-rate-interval 1 min Duration 测量故障率的时间间隔。可以使用"1 min"、"20 s"符号来指定。
restart-strategy.failure-rate.max-failures-per-interval 1 Integer 失败前在给定时间间隔内的最大重启次数。

配置示例:

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

故障率重启策略也可以通过编程方式设置:

Java代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每个周期最大故障次数
  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 延迟
));

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 每单元最大故障数
  Time.of(5, TimeUnit.MINUTES), // 测量故障率的时间间隔
  Time.of(10, TimeUnit.SECONDS) // 延迟
))

Python代码:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.failure_rate_restart(
    3,  # 每个周期最大故障次数
    500000,  # 测量故障率的时间间隔(毫秒)
    10000  # 延迟(毫秒)
))

上面的设置表示:如果5分钟内job失败不超过三次,自动重启, 每次间隔10s (如果5分钟内程序失败超过3次,则程序退出)

4. 不重启策略

作业直接失败,不尝试重新启动。

通过在flink-conf.yaml中设置以下配置参数,默认启用该策略。

restart-strategy.type: none

不重启策略也可以通过编程方式设置:

Java代码:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());

Scala代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

Python代码:

env = StreamExecutionEnvironment.get_execution_environment()
env.set_restart_strategy(RestartStrategies.no_restart())

5. 备用重启策略

使用集群定义的重启策略。这对于启用检查点的流程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

二、故障转移策略

Flink支持不同的故障转移策略,这些策略可以通过在配置文件flink-conf.yaml中配置参数jobmanager.execution.failover-strategy进行配置。

jobmanager.execution.failover-strategy的值 故障转移策略值
full 重启全部
region 重新启动流水线区域

1. 重启全部的故障转移策略

此策略重新启动作业中的所有任务以从任务失败中恢复。

2. 重启管道区域故障转移策略

该策略将任务划分为互不关联的区域。当检测到任务失败时,该策略计算必须重新启动以从失败中恢复的最小区域集。对于某些作业,与"重启全部的故障转移策略"相比,这可能导致重新启动的任务更少。

一个区域(region)是一组通过流水线(管道)数据交换进行通信的任务。也就是说,批处理数据交换表示区域的边界。

  • DataStream作业或流表/SQL作业中的所有数据交换都是流水线化的。
  • 批处理表/SQL作业中的所有数据交换默认情况下都是批处理的。
  • DataSet作业中的数据交换类型由ExecutionMode决定,可以通过ExecutionConfig设置ExecutionMode。

重启区域确定如下:

  • 1) 失败任务所在的区域将重新启动。
  • 2) 如果结果分区不可用,而将要重新启动的区域需要它,则产生结果分区的区域也将重新启动。
  • 3) 如果要重新启动一个区域,那么它的所有消费区域也将重新启动。这是为了保证数据的一致性,因为不确定的处理或分区可能导致不同的分区。