读取Socket数据源
DataStream API支持从Socket套接字读取数据。我们只需要指定要从其中读取数据的主机和端口号即可。读取Socket套接字的数据源函数定义如下:
- socketTextStream(hostName, port)
- socketTextStream(hostName, port, delimiter):可指定分隔符。
- socketTextStream(hostName, port, delimiter, maxRetry):还可以指定API应该尝试获取数据的最大次数。
【示例】Socket数据源: 流应用程序示例,它接收来自web套接字的单词。
Scala实现:
import org.apache.flink.streaming.api.scala._
object SocketSourceDemo {
val HOST = "192.168.190.133" // host主机
val PORT = 9999 // 端口号
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 连接Socket数据源
val input = env.socketTextStream(HOST, PORT)
// 流数据转换,并打印结果
input
.map(_.toLowerCase)
.flatMap(_.split("\\W+"))
.print
// 触发流程序执行
env.execute("Flink Socket Source")
}
}
Java实现:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketSourceDemo {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 源数据流
String host = "xueai8"; // host主机或IP地址
int port = 9999; // 端口号
DataStream<String> input = env.socketTextStream(host, port);
// 对DataStream进行转换,向map和flatMap传入匿名内容类
input.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
return s.toLowerCase(); // 先转小写
}
}).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
for(String word : s.split("\\W+")){ // 再分词
collector.collect(word); // 向下游发送
}
}
}).print();
// 执行流程序
env.execute("Flink Socket Source");
}
}
执行过程
1)打开一个终端窗口,执行如下的命令,启动一个netcat服务器,运行在9999端口:
$ nc -lk 9999
2)运行流程序
3)在netcat运行窗口,输入以下内容,并回车:
Good good study day day up
4)在程序执行窗口,可以看到如下计算输出:
5> good 5> good 5> study 6> day 6> day 6> up