分布式集群环境下调用链路追踪(5)中间件中 Trace 介绍以及实践
- UID
- 1066743
|
分布式集群环境下调用链路追踪(5)中间件中 Trace 介绍以及实践
本文 demo 中所谈到的中间件是基于 Java 语言环境下,与 Kafka、Redis 中间件的集成,Spring Cloud 中已经包含与 Kafka 的集成工具:spring-cloud-starter-stream-kafka,只需在 dependency 中引用加上配置即可,与 Redis 的集成,由于版本还处于 1.0 并且已经被 deprecated 了,所以只能按照 Stream binder 实现方式自行实现。
集成介绍- spring-cloud-starter-stream-kafka:Spring Cloud 中与 Kafka 集成的工具包,即通过集成之后能够将 Trace 信息通过 Kafka Message 传递到下游系统。
- Redis:与 Redis 集成能够传递 Trace 信息,在上游系统发送 Message 到 Redis 之前在 Message 基础上将 Trace 信息封包进去,通过 Redis 传递到下游系统,下游系统收到消息之后再将封包消息进行解包处理,同时将解包出来的 Trace 信息注册到系统中,解包出来的 Message 信息交由下游系统处理,并且需要做到封包处理和解包处理对于 Redis 上、下游模块都是透明的。
- Pub/Sub:与 Redis 集成 Pub/Sub。
Redis 集成思路- 在上游系统在接收包用户消息之后,将 Trace 信息封装包 Message 中,然后再发送给 Redis。
- Message 保存到 Redis。
- 下游系统收到消息之后,对消息进行解包处理,同时将解包出来的 Trace 信息注册到当前系统中,解包出来的 Message 信息交由后续系统处理。
以上方式是通过 Pub/Sub 方式实现,并且需要保持对 Pub/Sub 两端系统都透明;不过本 demo 中 Redis 消息集成仅限于 JSON 消息格式,如清单 9 所示:
清单 9. Message 封包示例1
2
3
4
5
6
7
8
9
10
11
12
13
14
| {
contentType = application / json,
header = {
spanParentSpanId = 1665384374984194801,
spanTraceId = 4676089090076077054,
spanId = -4549244921492974775,
spanProcessId = null,
spanName = publish
},
message = {
"id": "24506",
"name": "abef"
}
}
|
创建 demoe-service 工程- 新建一个基于 Spring Boot Web 的 Gradle 项目,项目名称为:demo-service。
- 项目 build.gradle 配置如下: 清单 10. demo-service Gradle 脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| repositories {
}
dependencies {
compile project(':demo-model')
compile project(':demo-util')
compile('org.springframework.boot:spring-boot-starter-aop')
compile('org.springframework.cloud:spring-cloud-starter-sleuth')
compile('org.springframework.cloud:spring-cloud-sleuth-stream')
compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
compile('org.springframework.boot:spring-boot-starter-data-redis')
compile('org.springframework.kafka:spring-kafka')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
}
|
- 配置 demo-service controller 类(如清单 11 所示),主要处理 demo-web 发送过来的 HTTP 请求,并发送消息到 Kafka、Redis 中间件。 清单 11. demo-service controller 类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @RequestMapping(value = "/testkafka", method = RequestMethod.GET)
public String testkafka(@RequestParam(value = "msg") String msg) throws
JsonProcessingException {
logger.info("Hello Docs Service testkafka Test!");
KafkaModel kafkaModel = new KafkaModel();
kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999)));
kafkaModel.setName(msg);
ObjectMapper jsonMapper = new ObjectMapper();
logger.info("Putting message to kafka topic name : sampletopic");
MessageBuilder<String> messageBuilder =
MessageBuilder.withPayload(jsonMapper.writeValueAsString(kafkaModel));
source.output().send(messageBuilder.build());
return "Message successfully pushed to kafka";
}
@RequestMapping(value = "/testredis", method = RequestMethod.GET)
public String testredis(@RequestParam(value = "msg") String msg) throws
JsonProcessingException {
logger.info("Hello Demo Service testredis Test!");
KafkaModel kafkaModel = new KafkaModel();
kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999)));
kafkaModel.setName(msg);
ObjectMapper jsonMapper = new ObjectMapper();
Object message = jsonMapper.writeValueAsString(kafkaModel);
customerInfoPublisher.publish(message);
return "Message successfully pushed to redis";
}
|
- demoe-service Redis 消息封包处理 (如清单 12 所示),基于 Aspect 实现,以此来实现对发送消息进行封包,同时尽可能实现透明注入。 清单 12. demo-service Message 封包处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
| @Pointcut("execution(* publish(..)) & &
target(com.ibm.demo.service.redis.pubsub.CustomerInfoPublisher)")
public void messageListener() {
// This func is intentionally empty. Nothing special is needed here.
}
@Around("messageListener()")
public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] joinPointArgs = joinPoint.getArgs();
Map<String, Object> messageObj = new HashMap<>();
createSpanIntoTrace(joinPoint);
Map<String, String> spanObj = SleuthHelper.toMap(tracer.getCurrentSpan());
messageObj.put("contentType", "application/json");
messageObj.put("header", spanObj);
RedisMessage redisMessage = new RedisMessage("application/json", spanObj,
joinPointArgs[0]);
ObjectMapper jsonMapper = new ObjectMapper();
Object mesgObj = jsonMapper.writeValueAsString(redisMessage);
Object[] message = new Object[] { mesgObj };
joinPoint.proceed(message);
logger.info("Putting message into redis with message {}", message);
afterSendCompletion(mesgObj, joinPoint.getSignature().getName(), null);
}
private void createSpanIntoTrace(ProceedingJoinPoint joinPoint) {
Span span = startSpan(this.tracer.getCurrentSpan(), joinPoint.getSignature().getName());
SpanBuilder spanBuilder = Span.builder().from(span);
Span currentSpan = spanBuilder.remote(false).build();
this.tracer.continueSpan(currentSpan);
}
public void afterSendCompletion(Object object, String name, Exception ex) {
ObjectMapper objectMapper = new ObjectMapper();
try {
RedisMessage redisMessage = objectMapper.readValue((String)
object.toString(), RedisMessage.class);
Map<String, String> spanMap = redisMessage.getHeader();
Span currentSpan = this.tracer.isTracing() ?
this.tracer.getCurrentSpan() : SleuthHelper.fromMap(spanMap);
if (logger.isDebugEnabled()) {
logger.debug("Completed sending and current span is " +
currentSpan);
}
if (containsServerReceived(currentSpan)) {
if (logger.isDebugEnabled()) {
logger.debug("Marking span with server send");
}
currentSpan.logEvent(Span.SERVER_SEND);
} else if (currentSpan != null) {
if (logger.isDebugEnabled()) {
logger.debug("Marking span with client received");
}
currentSpan.logEvent(Span.CLIENT_RECV);
}
addErrorTag(ex);
if (logger.isDebugEnabled()) {
logger.debug("Closing messaging span " + currentSpan);
}
this.tracer.close(currentSpan);
if (logger.isDebugEnabled()) {
logger.debug("Messaging span " + currentSpan + " successfully
closed");
}
} catch (IOException e) {
logger.error("Parse Messaging span {} with exception {}",
this.tracer.getCurrentSpan(), e);
}
}
private boolean containsServerReceived(Span span) {
if (span == null) {
return false;
}
for (Log log : span.logs()) {
if (Span.SERVER_RECV.equals(log.getEvent())) {
return true;
}
}
return false;
}
private void addErrorTag(Exception ex) {
if (ex != null) {
this.errorParser.parseErrorTags(this.tracer.getCurrentSpan(), ex);
}
}
private Span startSpan(Span span, String name) {
return startSpan(span, name, null);
}
private Span startSpan(Span span, String name, RedisMessage message) {
if (span != null) {
return this.tracer.createSpan(name, span);
}
if (message != null & &
Span.SPAN_NOT_SAMPLED.equals(message.getHeader().get(TraceMessageHeaders.SAMPLED_NA
ME))) {
return this.tracer.createSpan(name, NeverSampler.INSTANCE);
}
return this.tracer.createSpan(name);
}
|
|
|
|
|
|
|