Springboot集成使用阿里云kafka详细步骤(2)
- UID
- 1066743
|
Springboot集成使用阿里云kafka详细步骤(2)
配置文件properties中增加相应变量
在application-beta.properties中增加对应配置如下:
kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093
kafka.sample.retrycount=100
kafka.sample.topic=save_sample
kafka.jks.location=/jar/kafka.client.truststore.jks
新建KafkaService发送消息
KafkaService.java
package com.biologic.api.service;
import org.springframework.stereotype.Service;
@Service
public interface KafkaService {
void sendMessage(String topic, String data);
void releaseKafkaMsg(String barcode, String chip);
}
KafkaServiceImpl.java
package com.biologic.api.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.biologic.api.service.KafkaService;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
@Service
public class KafkaServiceImpl implements KafkaService {
@Value("${kafka.sample.topic}")
private String sampleTopic;
private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);
// private final KafkaTemplate<Integer, String> kafkaTemplate;
//
// /**
// * 注入KafkaTemplate
// * @param kafkaTemplate kafka模版类
// */
// @Autowired
// public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {
// this.kafkaTemplate = kafkaTemplate;
// }
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String data) {
LOG.info("kafka sendMessage start");
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
}
@Override
public void onSuccess(SendResult<String, String> result) {
LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);
}
});
LOG.info("kafka sendMessage end");
}
public void releaseKafkaMsg(String barcode, String chip) {
try {
JSONArray data = new JSONArray();
JSONObject kafka_sample_state = new JSONObject();
kafka_sample_state.put("plate_id", chip);
kafka_sample_state.put("barcode", barcode);
kafka_sample_state.put("status", "release_report");
data.add(kafka_sample_state);
JSONObject sample_list = new JSONObject();
sample_list.put("sample_list", data.toString());
sendMessage(sampleTopic, sample_list.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
} |
|
|
|
|
|