首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

分布式集群环境下调用链路追踪(5)中间件中 Trace 介绍以及实践

分布式集群环境下调用链路追踪(5)中间件中 Trace 介绍以及实践

本文 demo 中所谈到的中间件是基于 Java 语言环境下,与 Kafka、Redis 中间件的集成,Spring Cloud 中已经包含与 Kafka        的集成工具:spring-cloud-starter-stream-kafka,只需在 dependency 中引用加上配置即可,与 Redis 的集成,由于版本还处于 1.0        并且已经被 deprecated 了,所以只能按照 Stream binder 实现方式自行实现。
集成介绍
  • spring-cloud-starter-stream-kafka:Spring Cloud 中与 Kafka 集成的工具包,即通过集成之后能够将 Trace 信息通过          Kafka Message 传递到下游系统。
  • Redis:与 Redis 集成能够传递 Trace 信息,在上游系统发送 Message 到 Redis 之前在 Message 基础上将 Trace 信息封包进去,通过          Redis 传递到下游系统,下游系统收到消息之后再将封包消息进行解包处理,同时将解包出来的 Trace 信息注册到系统中,解包出来的 Message          信息交由下游系统处理,并且需要做到封包处理和解包处理对于 Redis 上、下游模块都是透明的。
  • Pub/Sub:与 Redis 集成 Pub/Sub。
Redis        集成思路
  • 在上游系统在接收包用户消息之后,将 Trace 信息封装包 Message 中,然后再发送给 Redis。
  • Message 保存到 Redis。
  • 下游系统收到消息之后,对消息进行解包处理,同时将解包出来的 Trace 信息注册到当前系统中,解包出来的 Message 信息交由后续系统处理。
以上方式是通过 Pub/Sub 方式实现,并且需要保持对 Pub/Sub 两端系统都透明;不过本 demo 中 Redis 消息集成仅限于 JSON 消息格式,如清单 9        所示:
清单 9. Message 封包示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
        contentType = application / json,
        header = {
               spanParentSpanId = 1665384374984194801,
               spanTraceId = 4676089090076077054,
               spanId = -4549244921492974775,
               spanProcessId = null,
               spanName = publish
        },
        message = {
               "id": "24506",
               "name": "abef"
        }
}




创建 demoe-service 工程
  • 新建一个基于 Spring Boot Web 的 Gradle 项目,项目名称为:demo-service。
  • 项目 build.gradle 配置如下: 清单 10. demo-service Gradle              脚本
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    repositories {
    }

    dependencies {
        compile project(':demo-model')
        compile project(':demo-util')

        compile('org.springframework.boot:spring-boot-starter-aop')
        compile('org.springframework.cloud:spring-cloud-starter-sleuth')
        compile('org.springframework.cloud:spring-cloud-sleuth-stream')
        compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
        compile('org.springframework.boot:spring-boot-starter-data-redis')
        compile('org.springframework.kafka:spring-kafka')
        compile('org.springframework.boot:spring-boot-starter-web')
        compile('org.springframework.cloud:spring-cloud-sleuth-zipkin')
    }




  • 配置 demo-service controller 类(如清单 11 所示),主要处理 demo-web 发送过来的 HTTP 请求,并发送消息到 Kafka、Redis          中间件。 清单 11. demo-service controller              类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
        @RequestMapping(value = "/testkafka", method = RequestMethod.GET)
        public String testkafka(@RequestParam(value = "msg") String msg) throws
    JsonProcessingException {
            logger.info("Hello Docs Service testkafka Test!");
            KafkaModel kafkaModel = new KafkaModel();
            kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999)));
            kafkaModel.setName(msg);
            ObjectMapper jsonMapper = new ObjectMapper();
            logger.info("Putting message to kafka topic name : sampletopic");
            MessageBuilder<String> messageBuilder =
    MessageBuilder.withPayload(jsonMapper.writeValueAsString(kafkaModel));
            source.output().send(messageBuilder.build());
            return "Message successfully pushed to kafka";
        }

        @RequestMapping(value = "/testredis", method = RequestMethod.GET)
        public String testredis(@RequestParam(value = "msg") String msg) throws
    JsonProcessingException {
            logger.info("Hello Demo Service testredis Test!");
            KafkaModel kafkaModel = new KafkaModel();
            kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999)));
            kafkaModel.setName(msg);
            ObjectMapper jsonMapper = new ObjectMapper();
            Object message = jsonMapper.writeValueAsString(kafkaModel);
            customerInfoPublisher.publish(message);
            return "Message successfully pushed to redis";
        }




  • demoe-service Redis 消息封包处理 (如清单 12 所示),基于 Aspect 实现,以此来实现对发送消息进行封包,同时尽可能实现透明注入。            清单 12. demo-service Message              封包处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
        @Pointcut("execution(* publish(..)) & &  
    target(com.ibm.demo.service.redis.pubsub.CustomerInfoPublisher)")
        public void messageListener() {
            // This func is intentionally empty. Nothing special is needed here.
        }

        @Around("messageListener()")
        public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable {
            Object[] joinPointArgs = joinPoint.getArgs();
            Map<String, Object> messageObj = new HashMap<>();
            createSpanIntoTrace(joinPoint);
            Map<String, String> spanObj = SleuthHelper.toMap(tracer.getCurrentSpan());
            messageObj.put("contentType", "application/json");
            messageObj.put("header", spanObj);
            RedisMessage redisMessage = new RedisMessage("application/json", spanObj,
    joinPointArgs[0]);
            ObjectMapper jsonMapper = new ObjectMapper();
            Object mesgObj = jsonMapper.writeValueAsString(redisMessage);
            Object[] message = new Object[] { mesgObj };
            joinPoint.proceed(message);
            logger.info("Putting message into redis with message {}", message);
            afterSendCompletion(mesgObj, joinPoint.getSignature().getName(), null);
        }

        private void createSpanIntoTrace(ProceedingJoinPoint joinPoint) {
            Span span = startSpan(this.tracer.getCurrentSpan(), joinPoint.getSignature().getName());
            SpanBuilder spanBuilder = Span.builder().from(span);
            Span currentSpan = spanBuilder.remote(false).build();
            this.tracer.continueSpan(currentSpan);
        }

        public void afterSendCompletion(Object object, String name, Exception ex) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                RedisMessage redisMessage = objectMapper.readValue((String)
    object.toString(), RedisMessage.class);
                Map<String, String> spanMap = redisMessage.getHeader();
                Span currentSpan = this.tracer.isTracing() ?
    this.tracer.getCurrentSpan() : SleuthHelper.fromMap(spanMap);
                if (logger.isDebugEnabled()) {
                    logger.debug("Completed sending and current span is " +
    currentSpan);
                }
                if (containsServerReceived(currentSpan)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Marking span with server send");
                    }
                    currentSpan.logEvent(Span.SERVER_SEND);
                } else if (currentSpan != null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Marking span with client received");
                    }
                    currentSpan.logEvent(Span.CLIENT_RECV);
                }
                addErrorTag(ex);
                if (logger.isDebugEnabled()) {
                    logger.debug("Closing messaging span " + currentSpan);
                }
                this.tracer.close(currentSpan);
                if (logger.isDebugEnabled()) {
                    logger.debug("Messaging span " + currentSpan + " successfully
    closed");
                }
            } catch (IOException e) {
                logger.error("Parse Messaging span {} with exception {}",
    this.tracer.getCurrentSpan(), e);
            }
        }

        private boolean containsServerReceived(Span span) {
            if (span == null) {
                return false;
            }
            for (Log log : span.logs()) {
                if (Span.SERVER_RECV.equals(log.getEvent())) {
                    return true;
                }
            }
            return false;
        }

        private void addErrorTag(Exception ex) {
            if (ex != null) {
                this.errorParser.parseErrorTags(this.tracer.getCurrentSpan(), ex);
            }
        }

        private Span startSpan(Span span, String name) {
            return startSpan(span, name, null);
        }

        private Span startSpan(Span span, String name, RedisMessage message) {
            if (span != null) {
                return this.tracer.createSpan(name, span);
            }
            if (message != null & &  
    Span.SPAN_NOT_SAMPLED.equals(message.getHeader().get(TraceMessageHeaders.SAMPLED_NA
    ME))) {
                return this.tracer.createSpan(name, NeverSampler.INSTANCE);
            }
            return this.tracer.createSpan(name);
        }










返回列表