首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

分布式集群环境下调用链路追踪(6)创建 demoe-message 工程

分布式集群环境下调用链路追踪(6)创建 demoe-message 工程

创建 demoe-message 工程
  • 新建一个基于 Spring Boot Web 的 Gradle 项目,项目名称为:demo-message。
  • 项目 build.gradle 配置如下: 清单 13. demo-message Gradle              脚本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    repositories {
    }

    dependencies {
        compile project(':demo-model')
        compile project(':demo-util')

        compile('org.springframework.boot:spring-boot-starter-aop')
        compile('org.springframework.cloud:spring-cloud-starter-sleuth')
        compile('org.springframework.cloud:spring-cloud-sleuth-stream')
        compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
        compile('org.springframework.boot:spring-boot-starter-data-redis')
        compile('org.springframework.kafka:spring-kafka')
        compile('org.springframework.boot:spring-boot-starter-web')
        compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
    }




  • demoe-message Redis 消息解包处理, (如清单 14 所示),基于 Aspect 实现,以此来实现对接受到的消息进行解包处理,同时保证代码透明注入,请在项目中查看完整源代码。 清单 14. demo-message Message              封包处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
        @Pointcut("execution(* onMessage(..)) & &  
    target(org.springframework.data.redis.connection.MessageListener)")
        public void messageListener() {
            // This func is intentionally empty. Nothing special is needed here.
        }

        @Around("messageListener()")
        public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable {
            try {
                Object[] joinPointArgs = joinPoint.getArgs();
                Object[] result = new Object[joinPointArgs.length];
                for (int i = 0; i < joinPointArgs.length; i++) {
                    Object object = joinPointArgs;
                    ObjectMapper objectMapper = new ObjectMapper();
                    if (object instanceof Message && jsonParse(object)) {
                        Message message = (Message) object;
                        RedisMessage redisMessage = objectMapper.readValue((String)
    object.toString(), RedisMessage.class);
                        logger.debug("Received >> redisMessage {} ", redisMessage);
                        Map<String, String> spanMap = redisMessage.getHeader();
                        String traceIdString =
    Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.TRACE_ID_NAME)));
                        String spanId =
    Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.SPAN_ID_NAME)));
                        Span parentSpan = SleuthHelper.fromMap(spanMap);
                        logger.debug("Received >> trace {} span {}", traceIdString,
    spanId);
                        if (sleuthProperties != null &&
    sleuthProperties.isSupportsJoin()) {
                            SleuthHelper.joinSpan(tracer, spanMap);
                            logger.debug("Join trace span {}", spanMap);
                        } else {
                            SleuthHelper.continueSpan(tracer, spanMap);
                            logger.debug("Continue trace span {}", spanMap);
                        }
                        slf4jSpanLogger.logStartedSpan(parentSpan,
    tracer.getCurrentSpan());
                        logger.debug("Received >> trace {} span {}",
    Span.idToHex(tracer.getCurrentSpan().getTraceId()),
                                Span.idToHex(tracer.getCurrentSpan().getSpanId()));
                        byte[] channel = message.getChannel();
                        byte[] body =
    SerializationUtils.serialize(redisMessage.getMessage());
                        logger.debug("Received >> message {}",
    SerializationUtils.deserialize(body));
                        result = new RedisDefaultMessage(channel, body);
                    } else {
                        result = object;
                    }
                }
                SpanBuilder spanBuilder =
    Span.builder().from(this.tracer.getCurrentSpan());
                Span currentSpan =
    spanBuilder.parents(this.tracer.getCurrentSpan().getParents()).name(joinPoint.getSi
    gnature().getName()).remote(false).build();
                currentSpan.logEvent("sr");
                this.tracer.close(currentSpan);
                this.tracer.continueSpan(currentSpan);
                joinPoint.proceed(result);
            } catch (JsonParseException e) {
                logger.error("Join trace span with JsonParseException {} ", e);
            } catch (JsonMappingException e) {
                logger.error("Join trace span with JsonMappingException {} ", e);
            } catch (IOException e) {
                logger.error("Join trace span with IOException {}", e);
            }
        }

        private boolean jsonParse(Object object) {
            boolean falg = true;
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                objectMapper.readValue((String) object.toString(), RedisMessage.class);
                falg = true;
            } catch (Exception e) {
                falg = false;
            }
            return falg;
        }




返回列表