admin管理员组

文章数量:1794759

Flink重启策略Restart

Flink重启策略Restart

Flink重启策略

为什么需要设置重启策略?

当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,以将作业恢复到正常状态。

重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应重新启动哪些任务以恢复作业。

NOTE:重启策略需要配合Checkpoint启动,因为需要用到flink的内部State

使用RestartStrategy 配置文件配置

配置文件中是DataSet&DataStream通用的。

如果enableCheckpoint()没有设置,那么restart-strategy默认为:none

如果设置了enableCheckpoint(),那么restart-strategy为:fixed-delay且delay=1s

#这里有3种不同的重启策略, restart-strategy: none, off, disable|fixeddelay, fixed-delay|failurerate, failure-rate 通过ExecutionConfig配置 //限定重启次数 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 每次重启之间的时间间隔,即重启尝试时间 ) //限定失败率 //如果Duration被设为5分钟 = 300s,那么10s尝试重启一次,那么实际可重试30次 //failure-rate = n/30 其中n为重试但失败的次数,如果达到一定的阈值,那么任务重启失败 val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 5min内允许失败的最大次数,可以适当调整 Time.of(5, TimeUnit.MINUTES), //用来衡量失败率的时间间隔 Time.of(10, TimeUnit.SECONDS) //2个连续的重试尝试之间的时间间隔 )) 重启策略 Restart strategy fixed-delay #假如 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts=3 [default] restart-strategy.fixed-delay.delay=2s [default] 举个栗子: ===> 假如 delay=1s,attempts=1,那么重启的策略就为每2秒尝试重启一次,要么重启成功,要么失败进入下一次重启尝试,如果累计重试次数达到3次但是任然没有成功,那么这个task重启就算失败 failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s 举个栗子: ===> 假如failure-rate-interval=5min,max-failures-per-interval=3,delay=10,那么重启策略就是每10s尝试重启一次,如果连续重试失败次数超过3次,那么表示重启失败 non-restart

不启用重启策略

fallback-restart

Flink自动管理重启策略,如果用这个策略,那么默认就是使用fixed-dalay

失败策略Failover strategy

官网参考:ci.apache/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html

可以通过flink-conf.yaml来设置failover strategy

Restart All Failover Strategy
  • 这个策略是重启整个job中所有的task,从失败恢复到正常状态
Restart Pipelined Region Failover Strateg
  • 用来决定在region 失败策略中的region范围,这种策略比重启所有任务代价要小的多env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
jobmanager.execution.failover-strategyvalue to config
Restart all 重启所有的任务Full
Restart pipelined region 重启单个分区内的任务Region
简单的实践Checkpoint代码 package com.shufang.state.chekpoint import com.shufang.broadcast.People import com.shufang.entities.WorkPeople import com.shufang.source.MyUDFPeopleSource import org.apache.flink.apimon.ExecutionMode import org.apache.flink.apimon.restartstrategy.RestartStrategies import org.apache.flink.apimon.state.MapStateDescriptor import org.apache.flink.apimon.time.Time import org.apache.flink.apimon.typeinfo.TypeInformation import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, KeyedBroadcastProcessFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object CheckPointDemo { def main(args: Array[String]): Unit = { //获取执行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //用来决定在region (failover strategy)失败策略中的region范围 env.getConfig.setExecutionMode(ExecutionMode.PIPELINED) /** * --------------------------------------checkpoint的配置----------------------------------------------- */ env.enableCheckpointing(1000) //每1s checkpoint 一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //默认是EXACTLY_ONCE env.getCheckpointConfig.setCheckpointInterval(1000) //每隔 1s进行一次checkpoint 的工作 env.getCheckpointConfig.setCheckpointTimeout(6000) //如果checkpoint操作在6s之内没有完成,那么就discard终端该checkpoint操作 //true:假如在checkpoint过程中产生了Error,那么Task直接显示失败 //false:产生了error,Task继续运行,checkpoint会降级到之前那个状态 env.getCheckpointConfig.setFailOnCheckpointingErrors(false) //默认为true env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //在统一时间只能同时有1个checkpoint操作,其他的操作必须等当前操作执行完或者超时之后才能执行 env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //清除或保留状态 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(0) //下一个checkpoint操作触发之前最小的阻塞时间,必须>=0 /** --------------------------------------配置重启策略---------------------------------------------------- * When a task failure happens, (当一个任务失败后) * Flink needs to restart the failed task and other affected tasks to recover the job to a normal state. * (Flink 需要重启失败的任务和其他受影响的task并恢复到一个正常的状态) * 重启配置与checkpoint设置有关: * 如果没有开启checkpoint,那么重启策略为:no restart! * 如果开启了checkpoint,那么重启策略默认为:fixed-delay strategy is used with Integer.MAX_VALUE * * restart-strategy 可以在flink-conf.yaml中进行设置,也可以通过env.setRestartStrategy()设置 */ /*env.setRestartStrategy( RestartStrategies.failureRateRestart( 10, Time.minutes(5), Time.seconds(10)) )*/ //env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration) //自动按照fixed-dalay重启策略 /*env.setRestartStrategy( new RestartStrategies.FailureRateRestartStrategyConfiguration( 10, Time.minutes(5), Time.seconds(10)))*/ //env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()) //env.setRestartStrategy(new RestartStrategies.FixedDelayRestartStrategyConfiguration(5,Time.seconds(4))) //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.seconds(4))) val config = new RestartStrategies.FailureRateRestartStrategyConfiguration(3, Time.minutes(5), Time.seconds(10)) env.setRestartStrategy(config) val ds: DataStream[WorkPeople] = env.addSource(new MyUDFPeopleSource) val ds1: DataStream[(Int, Char)] = env.fromElements((1, '男'), (2, '女')) val describer = new MapStateDescriptor[Int, Char]("genderInfo", classOf[Int], classOf[Char]) val bcStream: BroadcastStream[(Int, Char)] = ds1.broadcast(describer) val resultStream: DataStream[People] = ds.connect(bcStream).process( new BroadcastProcessFunction[WorkPeople, (Int, Char), People] { override def processElement(value: WorkPeople, ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#ReadOnlyContext, out: Collector[People]): Unit = { val gender: Char = ctx.getBroadcastState(describer).get(value.genderCode).charValue() out.collect(People(value.id, value.name, gender, value.address, value.price)) } override def processBroadcastElement(value: (Int, Char), ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#Context, out: Collector[People]): Unit = { ctx.getBroadcastState(describer).put(value._1, value._2) } } ) ds.print("before:") resultStream.print("after:") env.execute("checkpoint") } }

本文标签: 重启策略FlinkRestart