Apache Flink的初步试用
Apache Flink是一个开源的支持大规模流式或批量数据处理的平台。
上周有段时间调研了一下Flink,看了一下官方文档和部分代码,重点是研究了下Checkpointing机制。
然而其中的技术细节比较多,作为一个初步调研,本文还是希望通过几个简单的例子,来说明如何将Flink使用起来。
具体地,本文包括如下几个内容:
- 如何在YARN上部署一个Flink集群
- 如何使用Flink消费Kafka上的数据
- Flink的任务异常恢复与任务重启的测试
在YARN上部署一个Flink集群
Flink集群的部署方式有很多种,本文选取了在YARN的部署方式。
测试的YARN集群是CDH版本的,为了避免以后可能出现的错误,笔者很自觉地从代码编译开始。
首先,我们从GitHub拉下项目源码,切换到想要的发行版本,使用如下命令进行编译打包。
然后,在build-target目录下,设定好hadoop或者yarn的相关配置,就可以在YARN上部署一个Flink集群了。
在上述命令中,几个参数的意义如下:
- -n 10 一共启动10个TaskManager节点
- -jm 1024 JobManager的内存大小为1024M
- -tm 2048 TaskManager的内存大小为2048M
- -d 使用detached模式进行部署(部署完成后本地命令行可以退出)
- -qu default 部署到YARN的default队列中
执行完上述命令后,点开ApplicationMaster的链接,可以看到一个Flink Dashboard。
为了能够在后续提交任务到该集群,需要在这个Dashbord上找到并记录下JobManager对应的IP地址和端口号。
使用Flink消费Kafka的消息
连接Kafka并输出
Flink提供了FlinkKafkaConsumer08和FlinkKafkaConsumer09两个接口,分别用来从Apache Kafka 0.8.x和Apache Kafka 0.9.x拉取数据。
此外,本文使用的是Scala 10对应的API,因此需要添加这两个依赖:flink-streaming-scala_2.10和flink-connector-kafka-0.8_2.10。
首先,设定好kafka连接的相关属性:
然后,连接Kafka并使用StringDeserializer输出Message的value:
打包好代码后,用如下命令提交到Flink集群上:
使用Flink备份Kafka的Topic
接下来我们看一个稍微复杂一点的例子:使用Flink将Kafka中消息逐小时地备份到HDFS中。
由于是直接备份消息,所以直接将消息地原始字节拿出来写入就行了,为此可以写一个简单的DeserializationSchema如下所示:
Flink提供了RollingSink,用于将数据以滚动的方式写入到HDFS中。
RollingSink提供了Exactly Once的实现,为了使用它,需要添加flink-connector-filesystem_2.10这个依赖。
由于是直接备份,将Kafka这个Source直接连接到RollingSink这个Sink,就可以完成我们的目标了。
Flink任务的异常与恢复
Flink的Checkpointing机制能够不时的检查和保存Operator的状态。
在任务恢复时,每个Operator都会从最近完成的一个Checkpoint中恢复到当时所保存的状态中。
一个有状态的Operator的例子
下面的例子实现了一个RichFlatMapFunction. 这个operator将每个输入转换成Int,然后将其加到一个状态中去。
如果当前的和大于100,则将当前已经处理的事件个数和当前的和放到下一个阶段中去,然后将当前的和置为0。
Operator产生异常时Job的恢复
首先,需要设定Checkpointing机制和重启策略。
除此以外,我们还需要上述Operator的部分代码,让其能够产生异常:
经过测试,Job能够成功的从异常中恢复。
人工重启任务
Flink也提供了savepoint机制,以在人工重启Job的过程中,保存和恢复任务的状态。