Apache Storm使用

Apache Storm使用

大家好,又见面了,我是全栈君。

    Apache Storm 是 Apache 基金会的开源的分布式实时计算系统。与 Hadoop 的批处理相类似,Storm 可以对大量的数据流进行可靠的实时处理,这一过程也称为“流式处理”,是分布式大数据处理的一个重要方向。Storm 支持多种类型的应用,包括:实时分析、在线机器学习、连续计算、分布式RPC(DRPC)、ETL等。Strom 的一个重要特点就是“快速”的数据处理,有 benchmark 显示 Storm 能够达到单个节点每秒百万级 tuple 处理(tuple 是 Storm 的最小数据单元)的速度。快速的数据处理、优秀的可扩展性与容错性、便捷的可操作性与维护性、活跃的社区技术支持,这就是 Storm。

Apache Storm使用

Storm 集群组件

Apache Storm使用

  Storm 集群中包含两类节点:主控节点(Master Node)和工作节点(Work Node)。其分别对应的角色如下:

  •  主控节点(Master Node)上运行一个被称为Nimbus的后台程序,它负责在Storm集群内分发代码,分配任务给工作机器,并且负责监控集群运行状态。Nimbus的作用类似于Hadoop中JobTracker的角色
  • 每个工作节点(Work Node)上运行一个被称为Supervisor的后台程序。Supervisor负责监听从Nimbus分配给它执行的任务,据此启动或停止执行任务的工作进程。每一个工作进程执行一个Topology的子集;一个运行中的Topology由分布在不同工作节点上的多个工作进程组成

    Nimbus 和 Supervisor 节点之间所有的协调工作是通过 Zookeeper 集群来实现的。此外,Nimbus 和 Supervisor 进程都是快速失败(fail-fast)和无状态(stateless)的;Storm 集群所有的状态要么在 Zookeeper 集群中,要么存储在本地磁盘上。这意味着你可以用 kill -9 来杀死 Nimbus 和 Supervisor 进程,它们在重启后可以继续工作。这个设计使得Storm集群拥有不可思议的稳定性。

Storm 部署步骤

  搭建一个Storm集群需要依次完成的安装步骤:

  1. 搭建Zookeeper集群
  2. 安装Storm依赖库(Java、Python)
  3. 下载并解压Storm发布版本
  4. 修改storm.yaml配置文件
  5. 启动Storm各个后台进程

Storm.yaml 配置

 Storm发行版本解压目录下有一个conf/storm.yaml文件,用于配置Storm。默认配置可以在这里查看。conf/storm.yaml中的配置选项将覆盖defaults.yaml中的默认配置。以下配置选项是必须在conf/storm.yaml中进行配置的:

  1. storm.zookeeper.servers: Storm集群使用的Zookeeper集群地址,其格式如下: storm.zookeeper.servers: – “111.222.333.444” – “555.666.777.888” 如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项
  2. storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如: storm.local.dir: “/home/admin/storm/workdir”
  3. java.library.path: Storm使用的本地库加载路径,默认为”/usr/local/lib:/opt/local/lib:/usr/lib”,一般不需要配置
  4. nimbus.host: Storm集群Nimbus机器地址,各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件,如: nimbus.host: “111.222.333.444”
  5. supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。如果需要在每个节点上运行4个workers,可以分别使用6700、6701、6702和6703端口,如: supervisor.slots.ports: – 6700 – 6701 – 6702 – 6703
  6. JVM options: 用于配置Storm使用JVM参数

   [注] yaml 文件的配置使用“-”来表示数据的层次结构,配置项的:后必须有空格,否则该配置项无法识别

  集群配置示例如下:

########### These MUST be filled in for a storm configuration
# storm.zookeeper.servers:
#     - "server1"
#     - "server2"
 storm.zookeeper.servers:
    - "192.168.9.182"
    - "192.168.9.185"
    - "192.168.91.128"
 storm.zookeeper.port: 2181
# storm's work directory
 storm.local.dir: "/home/storm/workdir"
# nimbus.host: "nimbus"
 nimbus.host: "192.168.9.185"
# supervisor's work ports
 supervisor.slots.ports:
    -6700
    -6701
    -6702
    -6703

# #### Netty transport configuration
# transmission protocol
 storm.messaging.transport: "backtype.storm.messaging.netty.Context"
# server's work threads number
 storm.messaging.netty.server_worker_threads: 1
# client's work threads number
 storm.messaging.netty.client_worker_threads: 1
# buffer size
 storm.messaging.netty.buffer_size: 5242880
# max retry times
 storm.messaging.netty.max_retries: 100
# max waiting time(ms)
 storm.messaging.netty.max_wait_ms: 1000
# min waiting time(ms)
 storm.messaging.netty.min_wait_ms: 100

## JVM parameters can be configured here
 nimbus.childopts: "-Xloggc:/home/enjoyor/storm/apache-storm-0.9.3/logs/nimbusGC.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps"
 supervisor.childopts: "-Xloggc:/home/enjoyor/storm/apache-storm-0.9.3/logs/nimbusGC.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps"
 worker.childopts: "-Xloggc:/home/enjoyor/storm/apache-storm-0.9.3/logs/nimbusGC.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps"

Storm 运行

 和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Storm才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Supervisors被重启,运行中的Topologies不会受到影响。以下是启动Storm各个后台进程的方式:

  • Nimbus: 在Storm主控节点上运行 nohup storm nimbus & 启动Nimbus后台程序,并放到后台执行;
  • Supervisor: 在Storm各个工作节点上运行nohup storm supervisor & 启动Supervisor后台程序,并放到后台执行;
  • UI: 在Storm主控节点上运行nohup storm ui & 启动UI后台程序,并放到后台执行,启动后可以通过 http://{nimbus host}:8080 观察集群的worker资源使用情况、Topologies的运行状态等信息。

注意事项

  • Storm后台进程被启动后,将在Storm安装部署目录下的logs/子目录下生成各个进程的日志文件。
  • 经测试,Storm UI必须和Storm Nimbus 部署在同一台机器上,否则UI无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。
  • 为了方便使用,可以将bin/storm加入到系统环境变量中。

至此,Storm集群已经部署、配置完毕,可以向集群提交拓扑运行了。

向Storm 集群提交任务

  1. 启动 Storm Topology:
storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3

  其中,allmycode.jar 是包含 Topology 实现代码的 jar 包,org.me.MyTopology 的 main 方法是 Topology 的入口,arg1、arg2 和 arg3 为 org.me.MyTopology 执行时需要传入的参数。

        2. 停止 Storm Topology:

storm kill {toponame}

  其中,{toponame} 为 Topology 提交到 Storm 集群时指定的 Topology 任务名称。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/108699.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 计算经纬度的距离_经纬度测距

    计算经纬度的距离_经纬度测距PHP根据经纬度坐标计算距离在有些应用中需要用到计算距离的功能,例如附近的商家、离我最近等功能。W为纬度对应的弧度,J为经度对应的弧度,如上图所示下面代码lat是纬度lng是经度看类代码/***根据经纬度算距离,返回结果单位是公里,先纬度,后经度*@param$lat1*@param$lng1*@param$lat2*@param$lng2*@returnfloat|…

    2025年11月26日
    5
  • rtp载荷类型_架体荷载

    rtp载荷类型_架体荷载 1简介在Internet上用分组传送话音的质量不够好的一个重要原因是比较高的丢包率。尤其在广域网中,这个问题相当突出。不幸的是,实时多媒体业务对于延时的要求相当严格,因此不大可能通过重传来解决丢包的问题。正是出于这个原因,大家提出用前向纠错(FEC)来解决Internet上的丢包问题[1][2]。尤其是对于传统纠错码如校验码、RS码、汉明码等的使用引起了很多人的注意。为了能够更好地应用这些纠错码

    2022年8月11日
    5
  • HTML 5 <canvas> 标签

    HTML 5 <canvas> 标签

    2021年10月10日
    38
  • Linux中添加路由_linux添加临时路由命令

    Linux中添加路由_linux添加临时路由命令一:使用route命令添加使用route命令添加的路由,机器重启或者网卡重启后路由就失效了,方法://添加到主机的路由#routeadd–host192.168.1.11deveth0#routeadd–host192.168.1.12gw192.168.1.1//添加到网络的路由#routeadd–net192.168.1.11netmask255.255.255.0eth0#routeadd–net192.168.1.11netm

    2022年10月5日
    2
  • rbac权限管理设计 7表_数据库角色权限表设计

    rbac权限管理设计 7表_数据库角色权限表设计RBAC(Role-BasedAccessControl,基于角色的访问控制),就是用户通过角色与权限进行关联。简单地说,一个用户拥有若干角色,每一个角色拥有若干权限。这样,就构造成“用户-角色-权限”的授权模型。在这种模型中,用户与角色之间,角色与权限之间,一般者是多对多的关系。(如下图)当用户量非常多的时候,逐一的给用户授权角色是一件很痛苦的事情,于是引出组的概念。

    2022年9月29日
    2
  • RabbitMQ的优先级队列「建议收藏」

    RabbitMQ的优先级队列「建议收藏」优先级队列队列需要设置优先级队列,消息需要设置消息的优先级。消费者需要等待消息已经发送到队列中,然后对队列中的消息进行排序,最后再去消费。Map<String,Object>arguments=newHashMap<>();arguments.put(“x-max-priority”,10);//设置优先级队列channel.queueDeclare(QUEUE_NAME,false,false,fal

    2022年9月23日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号