发布日期:2021-11-12
VIP内容
案例:服务器故障检测报警程序
对于Hadoop运维人员,需要监控生产环境大数据各个组件的状态信息,由于Regionserver经常挂掉,这里模拟监控Regionserver的状态信息,每次Regionserver上线和下线都会发送一个状态信息。这里如果Regionserver挂掉后,服务自启动功能在30秒以内没有将Regionserver拉起(这里只是为了测试使用30秒),那么就进行持续的告警,直到收到上线消息,告警取消。
1)这里消息模拟从socket接收服务器告警消息,消息格式包含三个字段分别是:主机名hostname,告警时间time,状态status(RUNNING服务正常,DEAD服务停止);
2)接收消息之后对数据流按照主机名进行分组,对于状态为DEAD的消息,设置定时器30秒以内如果状态不恢复为RUNNING,则定时进行告警,如果30秒内恢复RUNNING状态,则认为上一条消息是误报,则删除定时器,取消报警。
Scala代码:
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* 服务器故障检测报警程序
*/
object KeyedProcessFunDemo2 {
// 服务器信息类型:主机名,时间,状态(RUNNING-正常,DEAD-宕机)
case class MessageInfo(hostname: String, msgTime: String, status: String)
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度1
env.setParallelism(1)
// 源数据流
val stream = env
.socketTextStream("localhost", 9999)
.filter(line => line match{
case null | "" => false
case row => row.split(",").length==3
})
.map(line => {
val lines = line.split(",")
MessageInfo(lines(0), lines(1), lines(2))
})
.keyBy(msg => msg.hostname)
.process(new AlertDownFunction)
.print()
// 执行流程序
env.execute("Process Function")
}
/**
* KeyedProcessFunction的子类
*/
class AlertDownFunction extends KeyedProcessFunction[String, MessageInfo, String] {
/**
* 首先获得由这个处理函数(process function)维护的状态
* 通过 RuntimeContext 访问Flink的keyed state
*/
lazy val lastStatus: ValueState[String] = getRuntimeContext.getState(
new ValueStateDescriptor[String]("lastStatus", classOf[String])
)
lazy val warningTimer: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("warning-timer", classOf[Long])
)
/**
* 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
* @param value 输入元素
* @param ctx 上下文件环境
* @param out
* @throws Exception
*/
override def processElement(value: MessageInfo,
ctx: KeyedProcessFunction[String, MessageInfo, String]#Context,
out: Collector[String]): Unit = {
// 获取当前的状态和上次的定时器时间
val currentStatus = value.status // 当前消息中的状态值
val currentTimer = warningTimer.value // 上次的定时器时间
println("\ncurrentStatus:" + currentStatus) // 当前事件状态
println("lastStatus:" + lastStatus.value) // 上次事件状态
// 连续两次状态都是"DEAD",说明是宕机状态,则新建定时器 30秒后进行告警
if ("DEAD" == currentStatus && "DEAD" == lastStatus.value) {
val timeTs = ctx.timerService.currentProcessingTime + 30000L
ctx.timerService.registerProcessingTimeTimer(timeTs)
warningTimer.update(timeTs) // 状态更新为定时器时间
}
else { // 如果不是连续告警,我们认为是误报警,删除定时器
if ("RUNNING" == currentStatus && "DEAD" == lastStatus.value) {
if (null != currentTimer) ctx.timerService.deleteProcessingTimeTimer(currentTimer)
warningTimer.clear()
}
}
// 状态更新:设当前状态为last status
lastStatus.update(currentStatus)
}
/** 定时器触发后执行的方法
*
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx
* @param out
* @throws Exception
*/
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, MessageInfo, String]#OnTimerContext,
out: Collector[String]): Unit = {
// 输出报警信息,Regionserver两次状态监测为"DEAD" 宕机
val hostname = ctx.getCurrentKey
out.collect("主机IP:" + hostname + ",服务器状态监测连续宕机,请排查!")
}
}
Java代码:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/**
* 服务器故障检测报警程序
*/
public class KeyedProcessFunDemo2 {
// POJO类,服务器告警信息类型
public static class MessageInfo {
public String hostname; // 主机名
public String msgTime; // 时间
public String status; // RUNNING 正常,DEAD 宕机
public MessageInfo(){}
public MessageInfo(String hostname,String msgTime,String status){
this.hostname = hostname;
this.msgTime = msgTime;
this.status = status;
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//这里为了便于理解,设置并行度为1
env.setParallelism(1);
// 指定数据源 从socket的9999端口接收数据,先进行了不合法数据的过滤
DataStream<String> sourceDS = env
.socketTextStream("localhost", 9999)
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
if (null == line || "".equals(line)) {
return false;
}
String[] lines = line.split(",", -1);
return lines.length == 3;
}
});
// 做了一个简单的map转换,将数据转换成MessageInfo格式
// 第一个字段代表是主机IP,第二个字段的代表的是消息时间,第三个字段是Regionserver状态
DataStream<String> warningDS = sourceDS
.map(new MapFunction<String, MessageInfo>() {
@Override
public MessageInfo map(String line) throws Exception {
String[] lines = line.split(",");
return new MessageInfo(lines[0], lines[1], lines[2]);
}
})
.keyBy(msg -> msg.hostname)
.process(new AlertDownFunction());
// 打印报警信息
warningDS.print();
// 执行流程序
env.execute("Process Alert Down");
}
/**
* ProcessFunction实现,处理告警信息
* KeyedProcessFunction<K,I,O>
*/
public static class AlertDownFunction
extends KeyedProcessFunction<String, MessageInfo, String> {
// 声明两个状态
private ValueState<String> lastStatus;
private ValueState<Long> warningTimer;
// 初始函数:首先获得之前(上次)已经存储的状态信息
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastStatus = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastStatus", String.class));
warningTimer = getRuntimeContext()
.getState(new ValueStateDescriptor<>("warning-timer", Long.class));
}
// 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
// 获得当前事件的状态,通过与之前保存的状态进行比较
@Override
public void processElement(MessageInfo value, Context ctx, Collector<String> out) throws Exception {
// 获取当前的状态和上次的定时器时间
String currentStatus = value.status; // 当前消息中的状态值
Long currentTimer = warningTimer.value(); // 上次的定时器时间
System.out.println("\ncurrentStatus:" + currentStatus); // 当前事件状态
System.out.println("lastStatus:" + lastStatus.value()); // 上次事件状态
// 连续两次状态都是"DEAD",说明是宕机状态,则新建定时器 30秒后进行告警
if("DEAD".equals(currentStatus) && "DEAD".equals(lastStatus.value())){
long timeTs = ctx.timerService().currentProcessingTime() + 30000L;
ctx.timerService().registerProcessingTimeTimer(timeTs);
warningTimer.update(timeTs); // 状态更新为定时器时间
}
// 如果不是连续告警,我们认为是误报警,删除定时器
else if(("RUNNING".equals(currentStatus) && "DEAD".equals(lastStatus.value()))){
if(null != currentTimer){
ctx.timerService().deleteProcessingTimeTimer(currentTimer);
}
warningTimer.clear();
}
// 状态更新:设当前状态为last status
lastStatus.update(currentStatus);
}
// 定时器
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<String> out) throws Exception {
// 输出报警信息,Regionserver两次状态监测为"DEAD" 宕机
out.collect("主机IP:" + ctx.getCurrentKey() + ",服务器状态监测连续宕机,请排查!");
}
}
}
执行步骤如下。
1)启动nc服务器:
$ nc -lk 9999
2)依次输入以下信息,模拟服务器监控事件:
192.168.1.101,2019-04-07 21:00,RUNNING 192.168.1.101,2019-04-07 21:02,DEAD 192.168.1.101,2019-04-07 21:03,DEAD 192.168.1.101,2019-04-07 21:04,RUNNING
3)执行结果:
currentStatus:RUNNING lastStatus:null currentStatus:DEAD lastStatus:RUNNING currentStatus:DEAD lastStatus:DEAD 主机IP:192.168.1.101, 两次Regionserver状态监测宕机,请监测!