storm之spout

storm之spout一 什么是 spoutspout 喷嘴 喷口 即数据从这里发出 spout 是 storm 的数据来源 而 spout 的数据来源又是从其他地方 比如数据库或者消息中间件中流入的 以 Kafka 为例 spout 先从 kafka 中拉取数据 然后封装为一个 tuple 发给下游的 bolt 进行处理 对于 Kafka 来说 spout 是消费者 对于 bolt 来说 spout 是生产者 为什么要用 spout 去拉取消息 而不是直

一、什么是spout

spout:喷嘴、喷口。即数据从这里发出。

spout是storm的数据来源,而spout的数据来源又是从其他地方,比如数据库或者消息中间件中流入的。

以Kafka为例,spout先从kafka中拉取数据,然后封装为一个tuple,发给下游的bolt进行处理。对于Kafka来说,spout是消费者;对于bolt来说spout是生产者。

为什么要用spout去拉取消息,而不是直接由bolt接收推送的数据呢,这中拉模式有什么好处呢?

如果,将数据直接推送给bolt,当数据量突然增加的时候,可能导致某一个bolt瘫痪,继而影响整个topology运行;而当没有数据的时候,整个topolog又处于空闲状态,浪费资源。而由spout去拉取消息则不会出现这样的问题。

二、KafkaSpout

KafkaSpout实现了从Kafka拉取数据为storm提供数据源。并且重新实现了ack机制。一般的我们通过简单的配置就可以使用了。 //kafkaSpout配置 private KafkaSpoutConfig 
  
    kafkaSpoutConfig() { final Fields outputFields = new Fields("topic", "partition", "offset", "timestamp", "key", "msg_from_kafka"); KafkaSpoutConfig 
   
     config; //consumer的配置 Properties props = new Properties(); //默认由kafkaSpout进行ack后才提交(false),如果自动提交,则kafkaspout的ack失效,可能丢失或重复数据 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff( TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), 1, TimeInterval.seconds(10)); config = KafkaSpoutConfig .builder("ip:9092", "topic_test") //首次消费消息的offset .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) //最后一个参数为输出字段 .setRecordTranslator((r) -> new Values(r.topic(), r.partition(), r.offset(), r.timestamp(), r.key(), r.value()), outputFields) //offset自动提交时间间隔,如果设置了enable.auto.commit=true则无效 .setOffsetCommitPeriodMs(1_000)//1秒 //达到这个值后向提交offset .setMaxUncommittedOffsets(1_000_000)//10万 //group .setGroupId("test-w") //kafka consumer配置 .setProp(props) .setRetry(kafkaSpoutRetryService) .build(); return config; } //拓扑结构 private StormTopology stormTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new ProducerSpout(kafkaSpoutConfig()), 1); builder.setBolt("bolt1", new BoltTest(), 1).shuffleGrouping("spout"); return builder.createTopology(); } 
    
  

kafkaspout的所有配置项:

public static final long DEFAULT_POLL_TIMEOUT_MS = 200L; public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30000L; public static final int DEFAULT_MAX_RETRIES = ; public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = ; public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2000L; public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(2L), , TimeInterval.seconds(10L)); public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0L), TimeInterval.milliSeconds(0L), , TimeInterval.milliSeconds(0L)); private final Map 
  
    kafkaProps; private final Subscription subscription; private final SerializableDeserializer 
   
     keyDes; private final Class 
    > keyDesClazz; private final SerializableDeserializer 
    
      valueDes; private final Class 
     > valueDesClazz; private final long pollTimeoutMs; private final RecordTranslator 
     
       translator; private final long offsetCommitPeriodMs; private final int maxUncommittedOffsets; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final KafkaSpoutRetryService retryService; private final long partitionRefreshPeriodMs; private final boolean emitNullTuples; 
      
     
    
  

具体含义在后面会总结。


参考资料:

《storm技术内幕与大数据实战》

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/208843.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 上午10:38
下一篇 2026年3月19日 上午10:39


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号