首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Springboot集成使用阿里云kafka详细步骤(1)

Springboot集成使用阿里云kafka详细步骤(1)

集成

springboot版本为1.5.2。
引入kafka-client的jar包

在项目的pom文件中添加kafka-clients并且排除spring-kafka中的kafka-clients。

因为spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:disconnected

如下:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


新建KafkaAliyunConfiguration类

KafkaAliyunConfiguration.java

package com.biologic.util;

import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.StringUtils;

@Configuration
@EnableKafka
public class KafkaAliyunConfiguration {

    @Value("${kafka.broker.address}")
    private String brokerAddress;

    @Value("${kafka.sample.topic}")
    private String defaultTopic;

    @Value("${kafka.jks.location}")
    private String jksLocation;
   
    @Value("${kafka.sample.retrycount}")
    private String retrycount;
   
   

    public KafkaAliyunConfiguration() {
         //如果用-D 或者其它方式设置过,这里不再设置
           if (null == System.getProperty("java.security.auth.login.config")) {
               //请注意将 XXX 修改为自己的路径
               //这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中
               System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");
           }
                System.out.println("环境变量中已有config文件,kafka配置为:"+System.getProperty("java.security.auth.login.config"));
    }

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        if (StringUtils.isEmpty(jksLocation)) {
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaAliyunConfiguration.class.getClassLoader()
                    .getResource("kafka.client.truststore.jks").getPath());
        } else {
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);
        }
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(ProducerConfig.RETRIES_CONFIG, retrycount);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
    }

    @Bean
    public  KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        kafkaTemplate.setDefaultTopic(defaultTopic);
        return kafkaTemplate;
    }
}



此处定义了四个变量,通过配置文件注入:

brokerAddress kafka服务器地址

defaultTopic kafka默认topic

jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)

retrycount 重试次数
返回列表