producer producer 1.启动流程
Producer如何感知要发送消息的broker即brokerAddrTable中的值是怎么获得的,1. 发送消息的时候指定会指定topic,如果producer集合中没有会根据指定topic到namesrv获取topic发布信息TopicPublishInfo,并放入本地集合2. 定时从namesrv更新topic路由信息,
Producer与broker间的心跳
Producer定时发送心跳将producer信息(其实就是procduer的group)定时发送到, brokerAddrTable集合中列出的broker上去Producer发送消息只发送到master的broker机器,在通过broker的主从复制机制拷贝到broker的slave上去
producer 2.如何发送消息Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡
1) Topic下的所有队列如何理解:
[url=][/url]
比如broker1, broker2, borker3三台broker机器都配置了Topic_ABroker1 的队列为queue0 , queue1Broker2 的队列为queue0, queue2, queue3,Broker3 的队列为queue0当然一般情况下的broker的配置都是一样的以上当broker启动的时候注册到namesrv的Topic_A队列为共6个分别为:broker1_queue0, broker1_queue1,broker2_queue0, broker2_queue1, broker2_queue2,broker3_queue0,[url=][/url]
2) Producer如何实现轮询队列:
[url=][/url]
Producer从namesrv获取的到Topic_A路由信息TopicPublishInfo --List<MessageQueue>messageQueueList //Topic_A的所有的队列 --AtomicIntegersendWhichQueue //自增整型 方法selectOneMessageQueue方法用来选择一个发送队列 (++sendWitchQueue)% messageQueueList.size为队列集合的下标 每次获取queue都会通过sendWhichQueue加一来实现对所有queue的轮询 如果入参lastBrokerName不为空,代表上次选择的queue发送失败,这次选择应该避开同一个queue[url=][/url]
3) Producer发消息系统重试:
[url=][/url]
发送失败后,重试几次retryTimesWhenSendFailed = 2发送消息超时sendMsgTimeout = 3000Producer通过selectOneMessageQueue方法获取一个MessagQueue对象 --topic //Topic_A --brokerName //代表发送消息到达的broker --queueId //代表发送消息的在指定broker上指定topic下的队列编号向指定broker的指定topic的指定queue发送消息 发送失败(1)重试次数不到两次(2)发送此条消息花费时间还没有到3000(毫秒), 换个队列继续发送。 [url=][/url]
producer发送普通消息
|