为了保证数据能正确的被处理, 对于spout产生的每一个tuple, storm都会进行跟踪, 这里面涉及到ack/fail的处理, 如果一个tuple处理成功, 会调用spout的ack方法, 如果失败, 会调用fail方法. 而在处理tuple的每一个bolt都会通过OutputCollector来告知storm, 当前bolt处理是否成功. 为了了解OutputCollector的ack/fail与Spout的ack/fail之间的关系, 我调试跟踪了一下storm代码.
IBasicBolt 实现类不关心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail来决定. 其execute方法的BasicOutputCollector参数也没有提供ack/fail方法给你调用. 相当于忽略了该bolt的ack/fail行为. 所以IBasicBolt用来做filter或者简单的计算比较合适.
可以参考BasicBoltExecutor代码里面的实现就可以明白了:
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
LOG.warn("Failed to process tuple", e);
_collector.getOutputter().fail(input);
}
}
在IRichBolt实现类中, 如果OutputCollector.emit(oldTuple, newTuple)这样调用来发射tuple(在storm中称之为anchoring), 那么后面的bolt的ack/fail会影响spout的ack/fail, 如果collector.emit(newTuple)这样来发射tuple(在storm称之为unanchoring), 则相当于断开了后面bolt的ack/fail对spout的影响.spout将立即根据当前bolt前面的ack/fail的情况来决定调用spout的ack/fail. 所以某个bolt后面的bolt的成功失败对你来说不关心, 你可以直接通过这种方式来忽略
中间的某个bolt fail了, 不会影响后面的bolt执行, 但是会立即触发spout的fail. 相当于短路了, 后面bolt虽然也执行了, 但是ack/fail对spout已经无意义了. 也就是说, 只要bolt集合中的任何一个fail了, 会立即触发spout的fail方法. 而ack方法需要所有的bolt调用为ack才能触发.
另外一点, storm只是通过ack/fail机制来告诉应用方bolt中间的处理情况, 对于成功/失败该如何处理, 必须由应用自己来决定, 因为storm内部也没有保存失败的具体数据, 但是也有办法知道失败记录, 因为spout的ack/fail方法会附带一个msgId对象, 我们可以在最初发射tuple的时候将将msgId设置为tuple, 然后在ack/fail中对该tuple进行处理.
这里有个问题, 就是每个bolt执行完之后要显式的调用ack/fail, 否则会出现tuple不释放导致oom. 不知道storm在最初设计的时候, 为什么不将bolt的ack设置为默认调用
参考文档:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
分享到:
相关推荐
storm利用ack保证数据的可靠性,发送失败时进行重发,保证数据不丢失。
• Storm 数据模型(topology) • Storm ack和fail • Storm 批处理 • Storm TOPN • Storm 流程聚合 • Storm DRPC • Storm executor、worker、task之间的关系和调优 • Storm异常解决
使用Storm编程,可以通过调用ack和fail方法来确保一条消息的处理成功或失败。不过当元组被重发时,会发生什么呢?你又该如何砍不会重复计算? Storm0.7.0实现了一个新特性——事务性拓扑,这一特性使消息在语义上...
对于m≥0和n≥0的ACK(m,n)函数定义如下: ACK(0,n)=n+1 ACK(m,0)=ACK(m-1,1) ACK(m,n)=ACK(m-1,ACK(m,n-1)) 程序要求: ⑴ m、n在主程序从键盘输入,输入错误显示“m和n输入错误”。 ⑵ 显示计算结果。
82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK82C55数据传送时序ACK
ACK系列产品升级指南,告诉你如何升级ACK系列产品的版本。
阿里云, 微服务, K8S,ACK
ack-etcd备份资源
nRF24L01,ACK,自动重发,自动应答,测试程序,每秒更新一次成功接收或者发送的数据包个数,注释清晰,代码简洁,具有预编译选项,单个程序包含发射和接收的测试程序,方便调试。
ZigBee实验\6.5.应答ACK帧实验
用c++语言编写 用堆栈的方法实现ack
ACK3116驱动程序 the USB to Virtual Serial ports device driver 串口转USB
NRF24L01无线收发例程,带ACK自动应答数据包,STM32的KEIL源代码,支持二次开发NRF24L01无线收发例程,带ACK自动应答数据包,STM32的KEIL源代码,支持二次开发NRF24L01无线收发例程,带ACK自动应答数据包,STM32的...
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。...里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。
ACK70N-W ACK70N-T 官方中文说明书PDF 大金空气净化器 日本进口
Ackerman递归函数int ack(int m,int n)
ACK测试方案,真个是北京艾瑞的技术文档,很好用的。
ack用法:http://blog.bccn.net/%E9%9D%99%E5%A4%9C%E6%80%9D/13430
DAIKIN大金_ACK70N_空气净化器_中文说明书