发布日期:2021-11-12 VIP内容

案例:服务器故障检测报警程序

对于Hadoop运维人员,需要监控生产环境大数据各个组件的状态信息,由于Regionserver经常挂掉,这里模拟监控Regionserver的状态信息,每次Regionserver上线和下线都会发送一个状态信息。这里如果Regionserver挂掉后,服务自启动功能在30秒以内没有将Regionserver拉起(这里只是为了测试使用30秒),那么就进行持续的告警,直到收到上线消息,告警取消。

1)这里消息模拟从socket接收服务器告警消息,消息格式包含三个字段分别是:主机名hostname,告警时间time,状态status(RUNNING服务正常,DEAD服务停止);

2)接收消息之后对数据流按照主机名进行分组,对于状态为DEAD的消息,设置定时器30秒以内如果状态不恢复为RUNNING,则定时进行告警,如果30秒内恢复RUNNING状态,则认为上一条消息是误报,则删除定时器,取消报警。

Scala代码:

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * 服务器故障检测报警程序
 */
object KeyedProcessFunDemo2 {

  // 服务器信息类型:主机名,时间,状态(RUNNING-正常,DEAD-宕机)
  case class MessageInfo(hostname: String, msgTime: String, status: String)

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 并行度1
    env.setParallelism(1)

    // 源数据流
    val stream = env
      .socketTextStream("localhost", 9999)
      .filter(line => line match{
          case null | "" => false
          case row       => row.split(",").length==3
      })
      .map(line => {
        val lines = line.split(",")
        MessageInfo(lines(0), lines(1), lines(2))
      })
      .keyBy(msg => msg.hostname)
      .process(new AlertDownFunction)
      .print()

    // 执行流程序
    env.execute("Process Function")
  }

  /**
    * KeyedProcessFunction的子类
    */
  class AlertDownFunction extends KeyedProcessFunction[String, MessageInfo, String] {

    /**
      * 首先获得由这个处理函数(process function)维护的状态
      * 通过 RuntimeContext 访问Flink的keyed state
      */
    lazy val lastStatus: ValueState[String] = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("lastStatus", classOf[String])
    )
    lazy val warningTimer: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("warning-timer", classOf[Long])
    )

    /**
      * 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
      * @param value			输入元素
      * @param ctx			    上下文件环境
      * @param out
      * @throws Exception
      */
    override def processElement(value: MessageInfo,
                                ctx: KeyedProcessFunction[String, MessageInfo, String]#Context,
                                out: Collector[String]): Unit = {

      // 获取当前的状态和上次的定时器时间
      val currentStatus = value.status        		// 当前消息中的状态值
      val currentTimer = warningTimer.value   	// 上次的定时器时间

      println("\ncurrentStatus:" + currentStatus) 	// 当前事件状态
      println("lastStatus:" + lastStatus.value) 		// 上次事件状态

      // 连续两次状态都是"DEAD",说明是宕机状态,则新建定时器 30秒后进行告警
      if ("DEAD" == currentStatus && "DEAD" == lastStatus.value) {
        val timeTs = ctx.timerService.currentProcessingTime + 30000L
        ctx.timerService.registerProcessingTimeTimer(timeTs)
        warningTimer.update(timeTs) 			// 状态更新为定时器时间
      }
      else { // 如果不是连续告警,我们认为是误报警,删除定时器
        if ("RUNNING" == currentStatus && "DEAD" == lastStatus.value) {
          if (null != currentTimer) ctx.timerService.deleteProcessingTimeTimer(currentTimer)
          warningTimer.clear()
        }
      }

      // 状态更新:设当前状态为last status
      lastStatus.update(currentStatus)
    }

    /** 定时器触发后执行的方法
      *
      * @param timestamp 这个时间戳代表的是该定时器的触发时间
      * @param ctx
      * @param out
      * @throws Exception
      */
    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[String, MessageInfo, String]#OnTimerContext,
                         out: Collector[String]): Unit = {
      // 输出报警信息,Regionserver两次状态监测为"DEAD" 宕机
      val hostname = ctx.getCurrentKey
      out.collect("主机IP:" + hostname + ",服务器状态监测连续宕机,请排查!")
    }
}

Java代码:

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/**
 * 服务器故障检测报警程序
 */
public class KeyedProcessFunDemo2 {

	// POJO类,服务器告警信息类型
	public static class MessageInfo {
		public String  hostname;			// 主机名
		public String  msgTime;			// 时间
		public String status;				// RUNNING 正常,DEAD 宕机

		public MessageInfo(){}

		public MessageInfo(String hostname,String msgTime,String status){
			this.hostname = hostname;
			this.msgTime = msgTime;
			this.status = status;
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		//这里为了便于理解,设置并行度为1
		env.setParallelism(1);

		// 指定数据源 从socket的9999端口接收数据,先进行了不合法数据的过滤
		DataStream<String> sourceDS = env
				.socketTextStream("localhost", 9999)
				.filter(new FilterFunction<String>() {
					@Override
					public boolean filter(String line) throws Exception {
						if (null == line || "".equals(line)) {
							return false;
						}
						String[] lines = line.split(",", -1);
						return lines.length == 3;
					}
				});

		// 做了一个简单的map转换,将数据转换成MessageInfo格式
		// 第一个字段代表是主机IP,第二个字段的代表的是消息时间,第三个字段是Regionserver状态
		DataStream<String> warningDS = sourceDS
				.map(new MapFunction<String, MessageInfo>() {
					@Override
					public MessageInfo map(String line) throws Exception {
						String[] lines = line.split(",");
						return new MessageInfo(lines[0], lines[1], lines[2]);
					}
				})
				.keyBy(msg -> msg.hostname)
				.process(new AlertDownFunction());

		// 打印报警信息
		warningDS.print();

		// 执行流程序
		env.execute("Process Alert Down");
	}

	/**
	 * ProcessFunction实现,处理告警信息
	 * KeyedProcessFunction<K,I,O>
	 */
	public static class AlertDownFunction
			extends KeyedProcessFunction<String, MessageInfo, String> {

		// 声明两个状态
		private ValueState<String> lastStatus;
		private ValueState<Long> warningTimer;

		// 初始函数:首先获得之前(上次)已经存储的状态信息
		@Override
		public void open(Configuration parameters) throws Exception {
			super.open(parameters);
			lastStatus = getRuntimeContext()
					.getState(new ValueStateDescriptor<>("lastStatus", String.class));
			warningTimer = getRuntimeContext()
					.getState(new ValueStateDescriptor<>("warning-timer", Long.class));
		}

		// 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
		// 获得当前事件的状态,通过与之前保存的状态进行比较
		@Override
		public void processElement(MessageInfo value, Context ctx, Collector<String> out) throws Exception {

			// 获取当前的状态和上次的定时器时间
			String currentStatus = value.status;				// 当前消息中的状态值
			Long currentTimer = warningTimer.value();			// 上次的定时器时间

			System.out.println("\ncurrentStatus:" + currentStatus);		// 当前事件状态
			System.out.println("lastStatus:" + lastStatus.value());		// 上次事件状态

			// 连续两次状态都是"DEAD",说明是宕机状态,则新建定时器 30秒后进行告警
			if("DEAD".equals(currentStatus) && "DEAD".equals(lastStatus.value())){
				long timeTs = ctx.timerService().currentProcessingTime() + 30000L;
				ctx.timerService().registerProcessingTimeTimer(timeTs);
				warningTimer.update(timeTs);			// 状态更新为定时器时间
			}
			// 如果不是连续告警,我们认为是误报警,删除定时器
			else if(("RUNNING".equals(currentStatus) && "DEAD".equals(lastStatus.value()))){
				if(null != currentTimer){
					ctx.timerService().deleteProcessingTimeTimer(currentTimer);
				}
				warningTimer.clear();
			}

			// 状态更新:设当前状态为last status
			lastStatus.update(currentStatus);
		}

		// 定时器
		@Override
		public void onTimer(
				long timestamp,
				OnTimerContext ctx,
				Collector<String> out) throws Exception {

			// 输出报警信息,Regionserver两次状态监测为"DEAD" 宕机
			out.collect("主机IP:" + ctx.getCurrentKey() + ",服务器状态监测连续宕机,请排查!");
		}
	}
}

执行步骤如下。

1)启动nc服务器:

$ nc -lk 9999

2)依次输入以下信息,模拟服务器监控事件:

192.168.1.101,2019-04-07 21:00,RUNNING
192.168.1.101,2019-04-07 21:02,DEAD
192.168.1.101,2019-04-07 21:03,DEAD
192.168.1.101,2019-04-07 21:04,RUNNING

3)执行结果:

currentStatus:RUNNING
lastStatus:null

currentStatus:DEAD
lastStatus:RUNNING

currentStatus:DEAD
lastStatus:DEAD

主机IP:192.168.1.101, 两次Regionserver状态监测宕机,请监测!