spout放在每个executer执行,我们先从spoutExecutors的初始化开始往下看,spoutExecutors是在一个worker中管理其中的tasks,在SpoutExecutors的构造函数中初始化一些组件:taskId,topologyId,spout等,在这个线程中,除了一些常见的属性,可以看到还会去创建并设置两个对象,将待执行的task信息传入:
1、TaskTransfer
2、TaskHeartbeatTrigger
构造完成之后,init方法进行一些初始化,在这里执行spout的open方法同时进行事件注册:
this.spout.open(storm_conf, userTopologyCtx, outputCollector);
LOG.info("Successfully open SpoutExecutors " + idStr);
taskHbTrigger.register();
int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
// wait other bolt is ready
JStormUtils.sleepMs(delayRun * 1000);
if (taskStatus.isRun()) {
spout.activate();
} else {
spout.deactivate();
}
LOG.info(idStr + " is ready ");
}
在spout调用open初始化完成之后,spout需要根据配置文件每10秒读取一次数据,这个是怎么实现的呢?发现在调用open之后,会调用taskHbTrigger.register(),taskHeartbeatTrigger是一个TimerTrigger的继承类,他会根据配置,通过ScheduledExecutorService设置每隔一段时间执行task。
Spout.emit过程:
真正执行emit的是SpoutCollector.sendMsg
public List
sendMsg(String out_stream_id, List
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/204928.html原文链接:https://javaforall.net