Board logo

标题: 基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现(2) [打印本页]

作者: look_w    时间: 2018-6-24 13:23     标题: 基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现(2)

KafkaKafka 是 LinkedIn 开源的分布式消息队列,它采用了独特的消费者-生产者架构实现数据平台各组件间的数据共享。集群概念中的 server 在 Kafka 中称之为 broker,它使用主题管理不同类别的数据,比如 DB2 日志归为一个主题,tomcat 日志归为一个主题。我们使用 Logstash 作为 Kafka 消息的生产者时,output 插件就需要配置好 Kafka broker 的列表,也就是 Kafka 集群主机的列表;相应的,用作 Kafka 消费者角色的 Logstash 的 input 插件就要配置好需要订阅的 Kafka 中的主题名称和 ZooKeeper 主机列表。Kafka 通过将数据持久化到硬盘的 Write Ahead Log(WAL)保证数据可靠性与顺序性,但这并不会影响实时数据的传输速度,实时数据仍是通过内存传输的。Kafka 是依赖于 ZooKeeper 的,它将每组消费者消费的相应 topic 的偏移量保存在 ZooKeeper 中。据称 LinkedIn 内部的 Kafka 集群每天已能处理超过 1 万亿条消息。
图 6. 基于消息订阅机制的 Kafka 架构除了可靠性和独特的 push&pull 架构外,相较于其他消息队列,Kafka 还拥有更大的吞吐量:
图 7. 基于消息持久化机制的消息队列吞吐量比较Spark StreamingSpark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。它将批处理、流处理、即席查询融为一体。Spark 社区也是相当火爆,平均每三个月迭代一次版本更是体现了它在大数据处理领域的地位。
Spark Streaming 不同于 Storm,Storm 是基于事件级别的流处理,Spark Streaming 是 mini-batch 形式的近似流处理的微型批处理。Spark Streaming 提供了两种从 Kafka 中获取消息的方式:
第一种是利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,数据会传输到 Spark 工作节点的执行器中,但是默认配置下这种方法在 Spark Job 出错时会导致数据丢失,如果要保证数据可靠性,需要在 Spark Streaming 中开启Write Ahead Logs(WAL),也就是上文提到的 Kafka 用来保证数据可靠性和一致性的数据保存方式。可以选择让 Spark 程序把 WAL 保存在分布式文件系统(比如 HDFS)中。
第二种方式不需要建立消费者线程,使用 createDirectStream 接口直接去读取 Kafka 的 WAL,将 Kafka 分区与 RDD 分区做一对一映射,相较于第一种方法,不需再维护一份 WAL 数据,提高了性能。读取数据的偏移量由 Spark Streaming 程序通过检查点机制自身处理,避免在程序出错的情况下重现第一种方法重复读取数据的情况,消除了 Spark Streaming 与 ZooKeeper/Kafka 数据不一致的风险。保证每条消息只会被 Spark Streaming 处理一次。以下代码片通过第二种方式读取 Kafka 中的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
messages.foreachRDD(new Function<JavaPairRDD<String,String>,Void>(){
    public Void call(JavaPairRDD<String, String> v1)
throws Exception {
        v1.foreach(new VoidFunction<Tuple2<String, String>>(){
public void call(Tuple2<String, String> tuple2) {
try{
JSONObject a = new JSONObject(tuple2._2);
...




Spark Streaming 获取到消息后便可以通过 Tuple 对象自定义操作消息,如下图是针对 DB2 数据库日志的邮件告警,生成告警邮件发送到 Notes 邮箱:
图 8. 基于 Spark Streaming 对 DB2 异常日志实现 Notes 邮件告警互联网行业日志处理方案举例介绍与应用1. 新浪新浪采用的技术架构是常见的 Kafka 整合 ELK Stack 方案。Kafka 作为消息队列用来缓存用户日志;使用 Logstash 做日志解析,统一成 JSON 格式输出给 Elasticsearch;使用 Elasticsearch 提供实时日志分析与强大的搜索和统计服务;Kibana 用作数据可视化组件。该技术架构目前服务的用户包括微博、微盘、云存储、弹性计算平台等十多个部门的多个产品的日志搜索分析业务,每天处理约 32 亿条(2TB)日志。
新浪的日志处理平台团队对 Elasticsearch 做了大量优化(比如调整 max open files 等),并且开发了一个独立的 Elasticsearch Index 管理系统,负责索引日常维护任务(比如索引的创建、优化、删除、与分布式文件系统的数据交换等)的调度及执行。为 Elasticsearch 安装了国内中文分词插件 elasticsearch-analysis-ik,满足微盘搜索对中文分词的需求。(见参考资源 2)
2. 腾讯腾讯蓝鲸数据平台告警系统的技术架构同样基于分布式消息队列和全文搜索引擎。但腾讯的告警平台不仅限于此,它的复杂的指标数据统计任务通过使用 Storm 自定义流式计算任务的方法实现,异常检测的实现利用了曲线的时间周期性和相关曲线之间的相关性去定义动态的阈值,并且基于机器学习算法实现了复杂的日志自动分类(比如 summo logic)。
告警平台把拨测(定时 curl 一下某个 url,有问题就告警)、日志集中检索、日志告警(5 分钟 Error 大于 X 次告警)、指标告警(cpu 使用率大于 X 告警)整合进同一个数据管线,简化了整体的架构。(见参考资源 3)
3. 七牛七牛采用的技术架构为 Flume+Kafka+Spark,混部在 8 台高配机器。根据七牛技术博客提供的数据,该日志处理平台每天处理 500 亿条数据,峰值 80 万 TPS。
Flume 相较于 Logstash 有更大的吞吐量,而且与 HDFS 整合的性能比 Logstash 强很多。七牛技术架构选型显然考虑了这一点,七牛云平台的日志数据到 Kafka 后,一路同步到 HDFS,用于离线统计,另一路用于使用 Spark Streaming 进行实时计算,计算结果保存在 Mongodb 集群中。(见参考资源 4)
任何解决方案都不是十全十美的,具体采用哪些技术要深入了解自己的应用场景。就目前日志处理领域的开源组件来说,在以下几个方面还比较欠缺:
结束语大数据时代的运维管理意义重大,好的日志处理平台可以事半功倍的提升开发人员和运维人员的效率。本文通过简单用例介绍了 ELK Stack、Kafka 和 Spark Streaming 在日志处理平台中各自在系统架构中的功能。现实中应用场景繁多复杂、数据形式多种多样,日志处理工作不是一蹴而就的,分析处理过程还需要在实践中不断挖掘和优化,笔者也将致力于 DB2 数据库运行状态更细节数据的收集和更全面细致的监控。




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0