首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Kafka实战解惑(5)

Kafka实战解惑(5)

六、Kafka其他组件6.1 Kafka ConnectKafka 0.9+增加了一个新的特性 Kafka Connect ,可以更方便的创建和管理数据流管道。它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统。Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理。而导出工作则是将数据从Kafka Topic中导出到其它数据存储系统、查询系统或者离线分析系统等,比如数据库、 Elastic Search 、 Apache Ignite 等。
Kafka Connect特性包括:
  • Kafka connector通用框架,提供统一的集成API
  • 同时支持分布式模式和单机模式
  • REST 接口,用来查看和管理Kafka connectors
  • 自动化的offset管理,开发人员不必担心错误处理的影响
  • 分布式、可扩展
  • 流/批处理集成
当前Kafka Connect支持两种分发担保:at least once (至少一次) 和 at most once(至多一次),exactly once将在未来支持,当前已有的Connectors包括:
Connector NameOwnerStatusHDFSconfluent-platform@googlegroups.comConfluentsupportedJDBCconfluent-platform@googlegroups.comConfluentsupportedDebezium - CDC Sourcesdebezium@gmail.comCommunity projectMongoDB Sourcea.patelli@reply.de a.topchyan@reply.deIn progressMQTT Sourcetomasz.pietrzak@evok.lyCommunity projectMySQL Binlog Sourcewushujames@gmail.comIn progressTwitter Sourcerollulus@xs4all.nlIn progressCassandra SinkCassandra SinkCommunity projectElastic Search Sinkksenji@gmail.comCommunity projectElastic Search Sinkhannes.stockner@gmail.comIn progressElastic Search Sinka.patelli@reply.de a.topchyan@reply.deIn progress我们来看一个使用Kafka Connect从一个文件读取数据在传输到另一个文件的例子。
  • 首先在192.168.104.101、192.168.104.102两台服务器上启动Kafka。
  • 在192.168.104.102服务器的Kafka安装目录上,修改connect-standalone.properties文件:
bootstrap.servers=192.168.104.101:9092, 192.168.104.102:9092key.converter=org.apache.Kafka.connect.storage.StringConvertervalue.converter=org.apache.Kafka.connect.storage.StringConverterkey.converter.schemas.enable=falsevalue.converter.schemas.enable=false修改connect-file-source.properties文件:file=/root/data.txttopic=t1修改connect-file-sink.properties文件:file=/root/output.txttopics=t1
  • 在192.168.104.102服务器上启动Kafka-connect
    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • 向/root/data.txt中写入数据,echo “Kafka connect”>> data.txt,可以观察”Kafka connect”被写入到/root/output.txt文件中。
6.2 Kafka StreamKafka Streams是一套类库,嵌入到java应用程序中,它使得Apache Kafka可以拥有流处理的能力,通过使用Kafka Stream API进行业务逻辑处理最后写回Kakfa或者其他系统中。Kafka Stream中有几个重要的流处理概念:严格区分Event time和Process Time、支持窗口函数、应用状态管理。开发者使用Kafka Stream的门槛非常低,比如单机进行一些小数据量的功能验证而不需要在其他机器上启动一些服务(比如在Storm运行Topology需要启动Nimbus和Supervisor,当然也支持Local Mode),Kafka Stream的并发模型可以对单应用多实例进行负载均衡。有了Kafka Stream可以在很多场景下代替Storm、Spark Streaming减少技术复杂度。目前Kafka Stream仍然处于开发阶段,不建议生产环境使用,所以期待正式版发布吧。
6.3 Kafka CamusCamus是Linkedin开源的一个从Kafka到HDFS的数据管道,本质上上Camus是一个运行在Hadoop中的MapReduce程序,调用一些Camus提供的API从Kafka中读取数据然后写入HDFS。Camus2015年已经停止维护了,gobblin是后续产品,camus功能是是Gobblin的一个子集,通过执行MapReduce任务实现从Kafka读取数据到HDFS,而gobblin是一个通用的数据提取框架,可以将各种来源的数据同步到HDFS上,包括数据库、FTP、Kafka等。
七、 Kafka典型应用场景Kafka作为一个消息中间件,最长应用的场景是将数据进行加工后从源系统移动到目的系统,也就是所谓的ETL过程,ETL是一个数据从源头到目的地的移动过程,当然其中也伴随数据清洗。通常数据源头是应用程序所输出的消息、日志、生产数据库数据。应用程序输出消息通常由应用程序主动控制写入Kfaka的行为,而从日志、生产数据库到Kfaka通常由第三方独立应用处理。从日志到Kfaka典型的技术方案如ELK,从生产数据库到Kafka通常可采用如下三种方式:
  • 通过时间戳方式记录数据变更并写入kafka,如使用kettle等ETL工具。
  • 通过触发器方式记录数据变更并写入kafka,如使用kettle等ETL工具。
  • 通过数据库特有特性记录数变更并写入kafka,如Oracle GoldenGate,MySQL Binlog,Postgre SQL Wal,MongoDB Oplog,CouchDB Changes Feed,值得一提的是PostgreSQL 9.4后的Bottled Water是一个非常好用的方案,将PostgreSQL数据同步到Kfaka中。
数据通过Kafka移动到Hadoop通常有如下方案:
  • Kafka -> Flume -> Hadoop Hdfs
  • Kafka -> Gobblin -> Hadoop Hdfs
  • Kafka -> Kafka Hadoop Loader -> Hadoop Hdfs
  • Kafka -> KaBoom -> Hadoop Hdfs
  • Kafka -> Kafka Connect -> Hadoop Hdfs
  • Kafka -> Storm\Spark Streaming -> Hadoop Hdfs
从目前看这些方法都是常用的成熟方案,很多技术也在被一线互联网公司所使用,比如京东内部在使用Gobblin将数据从Kafka同步到Hdfs中,但从长远看Kafka Connect则是最佳方案,毕竟是官方标准出品而且Kafka Connect还在快速的发展。
返回列表