分布式集群环境下调用链路追踪(6)创建 demoe-message 工程
- UID
- 1066743
|
分布式集群环境下调用链路追踪(6)创建 demoe-message 工程
创建 demoe-message 工程- 新建一个基于 Spring Boot Web 的 Gradle 项目,项目名称为:demo-message。
- 项目 build.gradle 配置如下: 清单 13. demo-message Gradle 脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| repositories {
}
dependencies {
compile project(':demo-model')
compile project(':demo-util')
compile('org.springframework.boot:spring-boot-starter-aop')
compile('org.springframework.cloud:spring-cloud-starter-sleuth')
compile('org.springframework.cloud:spring-cloud-sleuth-stream')
compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
compile('org.springframework.boot:spring-boot-starter-data-redis')
compile('org.springframework.kafka:spring-kafka')
compile('org.springframework.boot:spring-boot-starter-web')
compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
}
|
- demoe-message Redis 消息解包处理, (如清单 14 所示),基于 Aspect 实现,以此来实现对接受到的消息进行解包处理,同时保证代码透明注入,请在项目中查看完整源代码。 清单 14. demo-message Message 封包处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
| @Pointcut("execution(* onMessage(..)) & &
target(org.springframework.data.redis.connection.MessageListener)")
public void messageListener() {
// This func is intentionally empty. Nothing special is needed here.
}
@Around("messageListener()")
public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable {
try {
Object[] joinPointArgs = joinPoint.getArgs();
Object[] result = new Object[joinPointArgs.length];
for (int i = 0; i < joinPointArgs.length; i++) {
Object object = joinPointArgs;
ObjectMapper objectMapper = new ObjectMapper();
if (object instanceof Message && jsonParse(object)) {
Message message = (Message) object;
RedisMessage redisMessage = objectMapper.readValue((String)
object.toString(), RedisMessage.class);
logger.debug("Received >> redisMessage {} ", redisMessage);
Map<String, String> spanMap = redisMessage.getHeader();
String traceIdString =
Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.TRACE_ID_NAME)));
String spanId =
Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.SPAN_ID_NAME)));
Span parentSpan = SleuthHelper.fromMap(spanMap);
logger.debug("Received >> trace {} span {}", traceIdString,
spanId);
if (sleuthProperties != null &&
sleuthProperties.isSupportsJoin()) {
SleuthHelper.joinSpan(tracer, spanMap);
logger.debug("Join trace span {}", spanMap);
} else {
SleuthHelper.continueSpan(tracer, spanMap);
logger.debug("Continue trace span {}", spanMap);
}
slf4jSpanLogger.logStartedSpan(parentSpan,
tracer.getCurrentSpan());
logger.debug("Received >> trace {} span {}",
Span.idToHex(tracer.getCurrentSpan().getTraceId()),
Span.idToHex(tracer.getCurrentSpan().getSpanId()));
byte[] channel = message.getChannel();
byte[] body =
SerializationUtils.serialize(redisMessage.getMessage());
logger.debug("Received >> message {}",
SerializationUtils.deserialize(body));
result = new RedisDefaultMessage(channel, body);
} else {
result = object;
}
}
SpanBuilder spanBuilder =
Span.builder().from(this.tracer.getCurrentSpan());
Span currentSpan =
spanBuilder.parents(this.tracer.getCurrentSpan().getParents()).name(joinPoint.getSi
gnature().getName()).remote(false).build();
currentSpan.logEvent("sr");
this.tracer.close(currentSpan);
this.tracer.continueSpan(currentSpan);
joinPoint.proceed(result);
} catch (JsonParseException e) {
logger.error("Join trace span with JsonParseException {} ", e);
} catch (JsonMappingException e) {
logger.error("Join trace span with JsonMappingException {} ", e);
} catch (IOException e) {
logger.error("Join trace span with IOException {}", e);
}
}
private boolean jsonParse(Object object) {
boolean falg = true;
ObjectMapper objectMapper = new ObjectMapper();
try {
objectMapper.readValue((String) object.toString(), RedisMessage.class);
falg = true;
} catch (Exception e) {
falg = false;
}
return falg;
}
|
|
|
|
|
|
|