首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Springboot集成使用阿里云kafka详细步骤(2)

Springboot集成使用阿里云kafka详细步骤(2)

配置文件properties中增加相应变量

在application-beta.properties中增加对应配置如下:

kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093

kafka.sample.retrycount=100

kafka.sample.topic=save_sample

kafka.jks.location=/jar/kafka.client.truststore.jks


新建KafkaService发送消息

KafkaService.java

package com.biologic.api.service;


import org.springframework.stereotype.Service;

@Service
public interface KafkaService {


    void sendMessage(String topic, String data);


    void releaseKafkaMsg(String barcode, String chip);

}


KafkaServiceImpl.java

package com.biologic.api.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.biologic.api.service.KafkaService;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

@Service
public class KafkaServiceImpl implements KafkaService {

    @Value("${kafka.sample.topic}")
    private String sampleTopic;

    private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);

    // private final KafkaTemplate<Integer, String> kafkaTemplate;
    //
    // /**
    // * 注入KafkaTemplate
    // * @param kafkaTemplate kafka模版类
    // */
    // @Autowired
    // public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {
    // this.kafkaTemplate = kafkaTemplate;
    // }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String data) {
        LOG.info("kafka sendMessage start");
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);
            }
        });
        LOG.info("kafka sendMessage end");
    }

    public void releaseKafkaMsg(String barcode, String chip) {
        try {
            JSONArray data = new JSONArray();
            JSONObject kafka_sample_state = new JSONObject();
            kafka_sample_state.put("plate_id", chip);
            kafka_sample_state.put("barcode", barcode);
            kafka_sample_state.put("status", "release_report");
            data.add(kafka_sample_state);

            JSONObject sample_list = new JSONObject();
            sample_list.put("sample_list", data.toString());
            sendMessage(sampleTopic, sample_list.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
返回列表