jstorm 使用demo

jstorm 使用demo结合 fieldsGroupi 分组模式的一个 jstorm 使用实例 指定分组模式可以保证被指定的字段 如果相同值则 tuple 会分配给同个 task 处理 例如一个 tuple 有两个字段 field1 field2 如果指定 field1 为分组字段 现在有两个 tuple 分别是 tuple1 tuple2 tuple1 中 filed1 值为 a filed2 值为 b tuple2 的 filed1 值为 a filed2 值

结合 fieldsGrouping分组模式的一个jstorm使用实例,指定分组模式可以保证被指定的字段,如果相同值 则tuple会分配给同个task处理。例如 一个tuple有两个字段field1、field2,如果指定field1为分组字段,现在有两个tuple分别是tuple1、tuple2,tuple1中filed1值为a、filed2值为b,tuple2的filed1值为a、filed2值为c。则tuple1和tuple2会被分到相同的线程处理

1.创建spout数据源,实例中是不断从Redis中拉取数据

import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import com.demo.config.SpringBeans; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import java.util.Map; import java.util.UUID; public class MySpout extends BaseRichSpout { private SpoutOutputCollector collector; private String componentId; private int taskId; private RedisTemplate 
  
    redisTemplate; private static final Logger logger = LoggerFactory.getLogger(MySpout.class); public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; componentId = context.getThisComponentId(); this.taskId = context.getThisTaskId(); redisTemplate = SpringBeans.getBean("redisTemplate",RedisTemplate.class); } public void nextTuple() { try { Object value = redisTemplate.opsForList().leftPop("student"); logger.info("MySpout nextTuple componentId:{} taskId:{} value:{}",componentId,taskId,value); //value = "{\"name\":\"zhangsan\",\"address\":\"hangzhou\"}" if (null == value){ Thread.sleep(1000); return; } String uuid = UUID.randomUUID().toString(); collector.emit(new Values(value),uuid); }catch (Exception e){ e.printStackTrace(); } } //设置输出流的字段 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("student")); } } 
  

 

2.创建第一个bolt

import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class FirstBolt implements IRichBolt { private Logger logger = LoggerFactory.getLogger(FirstBolt.class); private OutputCollector collector; private Integer taskId; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.taskId = context.getThisTaskId(); } public void execute(Tuple input) { try { Map 
  
    student = (Map 
   
     ) input.getValue(0); //student = "{\"name\":\"zhangsan\",\"address\":\"hangzhou\"}" String address = student.get("address"); //address = "hangzhou" collector.emit(input,new Values(address,student)); //上面input(old tuple)和 new Values(new tuple)关联一起,才能保证后续子tuple的ack对spout自作用 logger.info("FirstBolt execute taskId:{} value:{}",taskId,student); }catch (Exception e){ e.printStackTrace(); } } public void cleanup() { } //设置输出流的字段 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("address","student")); //address 是分组字段。 new Fields 字段跟上面 execute 中collector.emit 的new Values字段对应 } public Map 
    
      getComponentConfiguration() { return null; } } 
     
    
  

3。创建第二个bolt

import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; public class SecondBolt extends BaseRichBolt { private Logger logger = LoggerFactory.getLogger(SecondBolt.class); private OutputCollector outputCollector; private Integer taskId; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.outputCollector = collector; this.taskId = context.getThisTaskId(); } @Override public void execute(Tuple input) { try { String address = input.getString(0); //address="hangzhou" Map 
  
    student = (Map 
   
     ) input.getValue(1); //student = "{\"name\":\"zhangsan\",\"address\":\"hangzhou\"}" logger.info("SecondBolt execute taskId:{} address:{} student:{}",taskId,address,student); // outputCollector.ack(input); //BaseRichBolt 自动 ack 。IRichBolt需要使用者手动调用ack }catch (Exception e){ e.printStackTrace(); } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public Map 
    
      getComponentConfiguration() { return null; } } 
     
    
  

4.生成我们的topology

import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.SpoutDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.JdbcTemplateAutoConfiguration; import java.util.HashMap; import java.util.Map; @SpringBootApplication(exclude = {JdbcTemplateAutoConfiguration.class}, scanBasePackages = "com.demo") public class MyTopology { public static void main(String[] args) { SpringApplication.run(MyTopology.class,args); Map conf = new HashMap(); //topology所有自定义的配置均放入这个Map TopologyBuilder builder = new TopologyBuilder(); //创建topology的生成器 int spoutParal = 1; //spout的并发设置 builder.setSpout("my-spout", new MySpout(), spoutParal); //创建Spout, 其中new MySpout() 为真正spout对象,"my-spout" 为spout的名字,注意名字中不要含有空格 int boltParal = 3; //bolt的并发设置 builder.setBolt("first-bolt", new FirstBolt(), boltParal) .localOrShuffleGrouping("my-spout"); //创建bolt, "first-bolt" 为bolt名字,new FirstBolt( 为bolt对象,boltParal为bolt并发数, //shuffleGrouping("my-spout"),表示接收"my-spout"的数据,并且以shuffle方式, //即每个spout随机轮询发送tuple到下一级bolt中 builder.setBolt("second-bolt",new SecondBolt(),boltParal) .fieldsGrouping("first-bolt",new Fields("address")); //创建bolt, "second-bolt" 为bolt名字,new SecondBolt() 为bolt对象,boltParal为bolt并发数, //fieldsGrouping("first-bolt",new Fields("address")), //表示接收"first-bolt"的数据,并以fieldsGrouping方式, //即每个上一级bolt(这里既是 first-bolt),以"address"分组发送tuple到下一级bolt中 int ackerParal = 1; Config.setNumAckers(conf, ackerParal); //设置表示acker的并发数 int workerNum = 1; conf.put(Config.TOPOLOGY_WORKERS, workerNum); //表示整个topology将使用几个worker conf.put(Config.STORM_CLUSTER_MODE, "distributed"); //设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行 try { // StormSubmitter.submitTopology("first-topology", conf, // builder.createTopology()); //本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("first-topology", conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } //提交topology } }

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/229364.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午4:52
下一篇 2026年3月16日 下午4:53


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号