rocketMq 消费者疑问
时间: 2019-05-14来源:开源中国
前景提要
HDC调试需求开发(15万预算),能者速来!>>>
@Override
public void receiveResult(String id, long timeout) {
//计数器
LOGGER.info(">>>开始接收订单结果:{}", id);
CountDownLatch countDownLatch = new CountDownLatch(1);
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);
//从消息队列头开始消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
String result =new String(msgs.get(0).getBody(), "UTF-8");
LOGGER.info(">>>接收到结果:{}",result);
countDownLatch.countDown();
} catch (Exception e) {
LOGGER.error(">>>消息处理异常", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
//设置过滤的top 和tag
defaultMQPushConsumer.subscribe(topic, id);
// 启动消费者 等待结果
defaultMQPushConsumer.start();
if (countDownLatch.await(timeout, TimeUnit.MILLISECONDS)) {
LOGGER.info(">>>接收成功");
return;
}
LOGGER.info(">>>接收超时");
} catch (Exception e) {
LOGGER.error(">>>等待消息异常", e);
} finally {
shutdown(defaultMQPushConsumer);
}

}
private void shutdown(DefaultMQPushConsumer defaultMQPushConsumer) {
try {
defaultMQPushConsumer.shutdown();
} catch (Exception e) {
LOGGER.info(">>>资源释放异常", e);
}
}

有个业务场景: 在一个同步线程A过程中 执行到某个点的时候暂停 需要等待另外一个线程B通知 才继续往下面执行。 单机的话 可以直接使用countDownLatch 实现。但是在分布式环境中 B线程可能在另外一台机器上。所以我引入了rocketMq. 每次根据订单号传给tag 进行过滤。但是这样的话 就需要每次创建和销毁consumer。总是感觉这样做 不好。但是又没有其他方式实现。请rocketMq大神帮忙看看。

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行