spout详解

spout详解spout 放在每个 executer 执行 我们先从 spoutExecuto 的初始化开始往下看 spoutExecuto 是在一个 worker 中管理其中的 tasks 在 SpoutExecuto 的构造函数中初始化一些组件 taskId topologyId spout 等 在这个线程中 除了一些常见的属性 可以看到还会去创建并设置两个对象 将待执行的 task 信息传入 1 TaskTransf

spout放在每个executer执行,我们先从spoutExecutors的初始化开始往下看,spoutExecutors是在一个worker中管理其中的tasks,在SpoutExecutors的构造函数中初始化一些组件:taskId,topologyId,spout等,在这个线程中,除了一些常见的属性,可以看到还会去创建并设置两个对象,将待执行的task信息传入:
1、TaskTransfer
2、TaskHeartbeatTrigger
 
构造完成之后,init方法进行一些初始化,在这里执行spout的open方法同时进行事件注册:
 
this.spout.open(storm_conf, userTopologyCtx, outputCollector);

LOG.info("Successfully open SpoutExecutors " + idStr);

taskHbTrigger.register();

int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);

// wait other bolt is ready
JStormUtils.sleepMs(delayRun * 1000);

if (taskStatus.isRun()) {
spout.activate();
} else {
spout.deactivate();
}

LOG.info(idStr + " is ready ");

}


























































在spout调用open初始化完成之后,spout需要根据配置文件每10秒读取一次数据,这个是怎么实现的呢?发现在调用open之后,会调用taskHbTrigger.register(),taskHeartbeatTrigger是一个TimerTrigger的继承类,他会根据配置,通过ScheduledExecutorService设置每隔一段时间执行task。
 

Spout.emit过程:

真正执行emit的是SpoutCollector.sendMsg
public List 
   
     sendMsg(String out_stream_id, List 
     values, Object message_id, Integer out_task_id, ICollectorCallback callback) { 
      
final long startTime = emitTotalTimer.getTime();
try {
boolean needAck = (message_id != null) && (ackerNum > 0); //needAck满足的两个条件
Long root_id = getRootId(message_id);//如果需要ack,随机生成rootId,并对rootId做一次去重校验
java.util.List out_tasks;

if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values, null, root_id);
} else {
out_tasks = sendTargets.get(out_stream_id, values, null, root_id);
}
if (out_tasks.size() == 0) {
// don't need send tuple to other task
return out_tasks;
}

List ackSeq = new ArrayList ();
for (Integer t : out_tasks) {
MessageId msgid;
if (needAck) {
// Long as = MessageId.generateId();
Long as = MessageId.generateId(random);
msgid = MessageId.makeRootId(root_id, as);
ackSeq.add(as);
} else {
msgid = MessageId.makeUnanchored();
}

TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
tp.setTargetTaskId(t);
transfer_fn.transfer(tp);
}
sendMsgToAck(out_stream_id, values, message_id, root_id, ackSeq, needAck);
if (callback != null)
callback.execute(out_tasks);
return out_tasks;
} finally {
emitTotalTimer.updateTime(startTime);
}
}























































































































 

转载于:https://www.cnblogs.com/harlenzhang/p/5544964.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/204928.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午7:12
下一篇 2026年3月19日 下午7:13


相关推荐

  • 全平台丝滑联动!Chatbox 玩转 Claude、GPT、DeepSeek 模型配置秘籍

    全平台丝滑联动!Chatbox 玩转 Claude、GPT、DeepSeek 模型配置秘籍

    2026年3月16日
    4
  • CAS原理详解_外燃机工作原理

    CAS原理详解_外燃机工作原理CAS简介CAS的意思是compareandswap,比较并交换。CAS的引入是为了解决java锁机制带来的性能问题。锁机制存在以下问题:(1)在多线程竞争下,加锁、释放锁会导致比较多的上下文切换和调度延时,引起性能问题。(2)一个线程持有锁会导致其它所有需要此锁的线程挂起。(3)如果一个优先级高的线程等待一个优先级低的线程释放锁会导致优先级倒置,引起性能风险。解决线程安全问题volatile是不错的机制,但是volatile不能保证原子性。因此对于同步最终还是要回到锁机制上来。独占锁

    2022年10月16日
    3
  • 什么是java的多态

    什么是java的多态多态分为两种a.编译时多态:方法的重载;b. 运行时多态:JAVA运行时系统根据调用该方法的实例的类型来决定选择调用哪个方法则被称为运行时多态。(我们平时说得多的事运行时多态,所以多态主要也是指运行时多态);上述描述认为重载也是多态的一种表现,不过多态主要指运行时多态。2.运行时多态a.面向对象的三大特性:封装、继承、多态。从一定角度来看,封装和继承几乎都是为多态而准备的。…

    2022年7月7日
    24
  • Vue父组件向子组件传递参数[通俗易懂]

    1、父组件projectBatchsindex.vue//使用:projectId=”this.projectId”传递参数<ProjectBatchEditref=”projectBatchEdit”:projectId=”this.projectId”@on-update=”search”></ProjectBatchEdit>importProj…

    2022年4月7日
    53
  • siamFC_silvahound

    siamFC_silvahound一SiamFC++网络结构及处理流程如下:注意大多数算法对于分类损失都采用交叉熵损失,而SiamFC++在分类分支中计算cls_score与center-nessscore采用了不同的损失函数,cls_score采用focalloss,这样做是为了缓解正负样本不均衡问题;center-nessscore则采用交叉熵loss。最终用这两部分对应元素相乘得到的结果得到更加合理的分类结果,center_ness的作用就是对每一个正样本位置施加权重,离中心进的权重高,离中心远的权重低使得分类更加合

    2026年4月14日
    6
  • 希尔伯特内积空间

    希尔伯特内积空间学习资料 来源 希尔伯特内积空间 知乎 如何理解希尔伯特内积空间 TimXP 的回答 https www zhihu com question 希尔伯特内积空间 我们一般接触的是线性空间 向量空间 首先看线性空间和各种空间之间的关系 1 线性空间 向量空间 线性空间又称作向量空间 关注的是向量的位置 对于一个线性空间 知道基 相当于三维空间中的

    2026年3月26日
    1

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号