zabbix监控elasticsearch集群「建议收藏」

zabbix监控elasticsearch集群「建议收藏」今天同事负责的es集群发生了脑裂,具体原因还有待查看日志。顺便分享一套zabbix监控es集群的脚本。生产改进与建议:所有监控统一status值,比如0是ok的,1是警告,2是error因为es集群会自己维护整个集群的元数据,因此数据收集不是按节点来的而是整个集群现在的配置是从salt的pillar中获取端口(或者说集群名)然后渲染下面的脚本,然后再自动发现集群下面的节点。建议集群也使…

大家好,又见面了,我是你们的朋友全栈君。

今天同事负责的es集群发生了脑裂,具体原因还有待查看日志。
顺便分享一套zabbix监控es集群的脚本。

生产改进与建议:

  • 所有监控统一status值,比如0是ok的,1是警告,2是error
  • 因为es集群会自己维护整个集群的元数据,因此数据收集不是按节点来的而是整个集群
  • 现在的配置是从salt的pillar中获取端口(或者说集群名)然后渲染下面的脚本,然后再自动发现集群下面的节点。建议集群也使用脚本自动检查,特别是一个主机上有多个属于不同集群的es节点时。
  • es内部gc的平均时间没有计算出来,后期可以加
  • 最后还是脚本中要收集的数据还是用字符串的+,也在此收集更高效的处理方法

架构方面:

  • 初步估计es集群脑裂不外乎两个原因
    • 网络原因,适当调整es集群内节点发现超时时间
    • master节点和data节点混在一起,当data节点由于业务原因hang住时,可能会导致集群将此节点剔除,引起es集群重新选举。
  • 解决:
    • master节点和data节点分离,使用3个master节点,其余做data节点,master节点的jvm配置可以配置小一点。es集群的监控放在master节点上。
#!/usr/bin/env python
#coding:utf-8

from __future__ import division   #必须在第一行
import json,requests,sys,os
import pickle



#在os.popen中执行的时候\b要加一层\
#如果没有指定ip,就使用默认的,用系统命令抓取本地ip地址:
cmd = "ip addr|grep '\\binet\\b'|grep -E 'bond0|eth0'|awk '{print $2}'|awk -F'/' '{print $1}'"
local_ip = os.popen(cmd).read().strip()

#设置一个默认端口9200,这个后期使用自动发现比较好
local_port = 9200


def discovery(local_ip,local_port):
    r = { 
   }
    r['data'] = []

    res = requests.get("http://{0}:{1}/_cat/nodes?v&h=name".format(local_ip,local_port))
    if res.status_code == 200:
        ret = res.text.splitlines()
        for i in range(1,len(ret)):
            r['data'].append({ 
   '{#NODE}':ret[i]})
    return json.dumps(r)


def send(local_ip,local_port):
    r_str = ""
    zbx_sender_cmd = "{0} -c {1} -i {2}"
    zbx_conf = "/usr/local/services/zabbix-3.0.0/etc/zabbix_agentd.conf"
    zbx_sender_file = "/tmp/.zbx_elastic_sender.txt"
    zbx_sender = "/usr/local/services/zabbix-3.0.0/bin/zabbix_sender"
    last_file = "/usr/local/services/zabbix-3.0.0/etc/.zbx_elstic_last_data.txt"

    #需要pickle保存到文件中的字典
    this_data = { 
   }

    #导入历史数据
    if os.path.exists(last_file):
        with open(last_file,"r") as f1:
            last_data = pickle.load(f1)
    else:
        os.popen("touch /usr/local/services/zabbix-3.0.0/etc/.zbx_elstic_last_data.txt")
        last_data = { 
   }
    #print(last_data)

    # 获取node信息的url
    url_node = "http://{0}:{1}/_nodes/stats?pretty".format(local_ip, local_port)
    res_node = requests.get(url_node)
    if res_node.status_code == 200:
        ret_node = res_node.json()
        for node,node_value in ret_node["nodes"].items():
            #print(ret_node["nodes"][node]["name"]) #打印出节点名字
            node_name = node_value["name"]
            this_data[node_name] = { 
   }

            #需要获取docs,segments,get,search,merges,flush,warmer等信息
            #docs,segments
            r_str += "- elastic.indices.docs.count.[{0}] {1}\n".format(node_name,node_value["indices"]["docs"]["count"])
            r_str += "- elastic.indices.docs.deleted.[{0}] {1}\n".format(node_name, node_value["indices"]["docs"]["deleted"])
            r_str += "- elastic.indices.segments.count.[{0}] {1}\n".format(node_name, node_value["indices"]["segments"]["count"])
            r_str += "- elastic.indices.segments.memory.[{0}] {1}\n".format(node_name,node_value["indices"]["segments"]["memory_in_bytes"])

            #indexing,get,search
            indexing_num = node_value["indices"]["indexing"]["index_total"]
            indexing_time = node_value["indices"]["indexing"]["index_time_in_millis"]/1000
            r_str += "- elastic.indices.indexing.total.[{0}] {1}\n".format(node_name, indexing_num)
            this_data[node_name].update({ 
   "indexing_num": indexing_num})
            r_str += "- elastic.indices.indexing.time.[{0}] {1}\n".format(node_name, indexing_time)
            this_data[node_name].update({ 
   "indexing_time": indexing_time})
            if node_name not in last_data.keys() or (indexing_num - last_data[node_name]["indexing_num"]) == 0:
                r_str += "- elastic.indices.indexing.per_time.[{0}] {1}\n".format(node_name,0)
            else:
                r_str += "- elastic.indices.indexing.per_time.[{0}] {1}\n".format(node_name,
                        round((indexing_time - last_data[node_name]["indexing_time"])/(indexing_num - last_data[node_name]["indexing_num"]),3)
                                                                          )

            get_num = node_value["indices"]["get"]["total"]
            get_time = node_value["indices"]["get"]["time_in_millis"]/1000
            r_str += "- elastic.indices.get.total.[{0}] {1}\n".format(node_name, get_num)
            this_data[node_name].update({ 
   "get_num": get_num})
            r_str += "- elastic.indices.get.time.[{0}] {1}\n".format(node_name, get_time)
            this_data[node_name].update({ 
   "get_time": get_time})
            if node_name not in last_data.keys() or (get_num - last_data[node_name]["get_num"]) == 0:
                r_str += "- elastic.indices.get.per_time.[{0}] {1}\n".format(node_name,0)
            else:
                r_str += "- elastic.indices.get.per_time.[{0}] {1}\n".format(node_name,
                        round((get_time - last_data[node_name]["get_time"])/(get_num - last_data[node_name]["get_num"]),3)
                                                                     )

            query_num = node_value["indices"]["search"]["query_total"]
            query_time = node_value["indices"]["search"]["query_time_in_millis"]/1000
            r_str += "- elastic.indices.search.query_total.[{0}] {1}\n".format(node_name, query_num)
            this_data[node_name].update({ 
   "query_num": query_num})
            r_str += "- elastic.indices.search.query_time.[{0}] {1}\n".format(node_name, query_time)
            this_data[node_name].update({ 
   "query_time": query_time})
            if node_name not in last_data.keys() or (query_num - last_data[node_name]["query_num"]) == 0:
                r_str += "- elastic.indices.query.per_time.[{0}] {1}\n".format(node_name,0)
            else:
                r_str += "- elastic.indices.query.per_time.[{0}] {1}\n".format(node_name,
                        round((query_time - last_data[node_name]["query_time"])/(query_num - last_data[node_name]["query_num"]),3)
                                                                         )

            fetch_num = node_value["indices"]["search"]["fetch_total"]
            fetch_time = node_value["indices"]["search"]["fetch_time_in_millis"]/1000
            r_str += "- elastic.indices.search.fetch_total.[{0}] {1}\n".format(node_name, fetch_num)
            this_data[node_name].update({ 
   "fetch_num": fetch_num})
            r_str += "- elastic.indices.search.fetch_time.[{0}] {1}\n".format(node_name,fetch_time)
            this_data[node_name].update({ 
   "fetch_time": fetch_time})
            if node_name not in last_data.keys() or (fetch_num - last_data[node_name]["fetch_num"]) == 0:
                r_str += "- elastic.indices.fetch.per_time.[{0}] {1}\n".format(node_name,0)
            else:
                r_str += "- elastic.indices.fetch.per_time.[{0}] {1}\n".format(node_name,
                        round((fetch_time - last_data[node_name]["fetch_time"])/(fetch_num - last_data[node_name]["fetch_num"]),3)
                                                                         )

            #merges,refresh,flush,warmer
            for oper in ["merges","refresh","flush","warmer"]:
                #这里有点容易混淆
                p_data = { 
   }
                str_num = "{0}_num".format(oper)
                str_time = "{0}_time".format(oper)
                p_data[str_num] = node_value["indices"][oper]["total"]
                p_data[str_time] = node_value["indices"][oper]["total_time_in_millis"]/1000

                r_str += "- elastic.indices.{0}.total.[{1}] {2}\n".format(oper,node_name, p_data[str_num])
                this_data[node_name].update({ 
   "{0}_num".format(oper): p_data[str_num]})
                r_str += "- elastic.indices.{0}.time.[{1}] {2}\n".format(oper,node_name, p_data[str_time])
                this_data[node_name].update({ 
   "{0}_time".format(oper): p_data[str_time]})
                if node_name not in last_data.keys() or (p_data[str_num] - last_data[node_name][str_num]) == 0:
                    r_str += "- elastic.indices.{0}.per_time.[{1}] {2}\n".format(oper,node_name, 0)
                else:
                    r_str += "- elastic.indices.{0}.per_time.[{1}] {2}\n".format(oper,node_name,
                                round((p_data[str_time] - last_data[node_name][str_time]) / (p_data[str_num] - last_data[node_name][str_num]),3)
                                                                               )

            #jvm基本
            r_str += "- elastic.jvm.heap_max_in_bytes.[{0}] {1}\n".format(node_name,node_value["jvm"]["mem"]["heap_max_in_bytes"])
            r_str += "- elastic.jvm.heap_used_in_bytes.[{0}] {1}\n".format(node_name,node_value["jvm"]["mem"]["heap_used_in_bytes"])
            r_str += "- elastic.jvm.threads.[{0}] {1}\n".format(node_name,node_value["jvm"]["threads"]["count"])
            r_str += "- elastic.jvm.buffer_pools.used_in_bytes.[{0}] {1}\n".format(node_name,node_value["jvm"]["buffer_pools"]["direct"]["used_in_bytes"])
            r_str += "- elastic.jvm.buffer_pools.total_capacity_in_bytes.[{0}] {1}\n".format(node_name,node_value["jvm"]["buffer_pools"]["direct"]["total_capacity_in_bytes"])

            #jvm垃圾回收的两个总量,要用减法。后面改进要计算出平均垃圾回收的时间
            r_str += "- elastic.jvm.gc.young.num.[{0}] {1}\n".format(node_name,node_value["jvm"]["gc"]["collectors"]["young"]["collection_count"])
            r_str += "- elastic.jvm.gc.young.time.[{0}] {1}\n".format(node_name,node_value["jvm"]["gc"]["collectors"]["young"]["collection_time_in_millis"]/1000)
            r_str += "- elastic.jvm.gc.old.num.[{0}] {1}\n".format(node_name,node_value["jvm"]["gc"]["collectors"]["old"]["collection_count"])
            r_str += "- elastic.jvm.gc.old.time.[{0}] {1}\n".format(node_name,node_value["jvm"]["gc"]["collectors"]["old"]["collection_time_in_millis"]/1000)

            for m,n in node_value["jvm"]["mem"]["pools"].items():
                r_str += "- elastic.jvm.{0}.used_in_bytes.[{1}] {2}\n".format(m,node_name,n["used_in_bytes"])
                r_str += "- elastic.jvm.{0}.max_in_bytes.[{1}] {2}\n".format(m,node_name,n["max_in_bytes"])

            #thread_pool,这个东西正常情况下没什么,有些时候还是能发现很多问题的
            for k,v in node_value["thread_pool"].items():
                r_str += "- elastic.thread_pool.{0}.threads.[{1}] {2}\n".format(k,node_name,v[u"threads"])
                r_str += "- elastic.thread_pool.{0}.threads.queue.[{1}] {2}\n".format(k,node_name,v[u"queue"])

            #http和script,其中script是总的数据,做减法
            r_str += "- elastic.http.current_open.[{0}] {1}\n".format(node_name,node_value["http"]["current_open"])
            r_str += "- elastic.script.compilations.[{0}] {1}\n".format(node_name,node_value["script"]["compilations"])

            #处理每种操作的平均时间time差值/total差值
            with open(last_file,"w") as f2:
                pickle.dump(this_data,f2)



    else:
        sys.stderr.write("Fetch node info error!")


    # 获取集群信息的url
    url_cluster = "http://{0}:{1}/_cluster/health".format(local_ip, local_port)
    res_cluster = requests.get(url_cluster)
    if res_cluster.status_code == 200:
        ret_cluster = res_cluster.json()

        #print(ret_cluster)
        #绿是0,黄是1,红是2
        if ret_cluster[u'status'] == u"green":
            status = 0
        elif ret_cluster[u'status'] == u"yellow":
            status = 1
        else:
            status = 2

        r_str +=  "- elastic.cluster.status {0}\n".format(status)
        r_str += "- elastic.cluster.non {0}\n".format(ret_cluster[u"number_of_nodes"])
        r_str += "- elastic.cluster.us {0}\n".format(ret_cluster[u"unassigned_shards"])
        r_str += "- elastic.cluster.nopt {0}\n".format(ret_cluster[u"number_of_pending_tasks"])
        r_str += "- elastic.cluster.noiff {0}\n".format(ret_cluster[u"number_of_in_flight_fetch"])
        r_str += "- elastic.cluster.aps {0}\n".format(ret_cluster[u"active_primary_shards"])
        r_str += "- elastic.cluster.tmwiqm {0}\n".format(ret_cluster[u"task_max_waiting_in_queue_millis"])
        r_str += "- elastic.cluster.rs {0}\n".format(ret_cluster[u"relocating_shards"])
        r_str += "- elastic.cluster.aspan {0}\n".format(ret_cluster[u"active_shards_percent_as_number"])
        r_str += "- elastic.cluster.as {0}\n".format(ret_cluster[u"active_shards"])
        r_str += "- elastic.cluster.is {0}\n".format(ret_cluster[u"initializing_shards"])
        r_str += "- elastic.cluster.dus {0}\n".format(ret_cluster[u"delayed_unassigned_shards"])
        r_str += "- elastic.cluster.nodn {0}\n".format(ret_cluster[u"number_of_data_nodes"])
    else:
        sys.stderr.write("Fetch node info error!")

    with open(zbx_sender_file,"w") as f:
        f.write(r_str)


    send_ret = os.popen(zbx_sender_cmd.format(zbx_sender, zbx_conf, zbx_sender_file))
    #print(zbx_sender_cmd.format(zbx_sender, zbx_conf, zbx_sender_file))
    if "failed: 0" in send_ret.read():  #这一步,用一个普通的item来触发,并返回执行结果,1是正常的,0是发送异常
        print(1)
    else:
        print(0)



if __name__ == "__main__":
    if len(sys.argv) == 2 and sys.argv[1]=="discovery":
        ret = discovery(local_ip,local_port)
        print(ret)
    elif len(sys.argv) == 1:
        send(local_ip,local_port)
    else:
        sys.stderr.write("Args is wrong!")

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/139561.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 设计模式:组合模式

    设计模式:组合模式

    2022年1月18日
    43
  • CentOS7 安装 Python 3.9.0[通俗易懂]

    CentOS7 安装 Python 3.9.0[通俗易懂]文章目录1.安装编译相关工具2.创建Python文件夹下载安装包3.编译安装4.创建软连接5.验证1.安装编译相关工具安装开发库yum-ygroupinstall”Developmenttools”安装依赖环境yum-yinstallzlibzlib-develbzip2-developenssl-develncurses-develsqlite-develreadline-develtk-develgdbm-develdb4-devel

    2022年9月24日
    4
  • Scrapy爬虫案例-淘宝比价定向爬虫学习笔记

    Scrapy爬虫案例-淘宝比价定向爬虫学习笔记说明Scrapy爬虫案例-淘宝比价定向爬虫学习笔记学习教程:Python网络爬虫与信息提取授课老师:嵩天官方网站:https://python123.io教程链接:https://python123.io/index/courses/804“淘宝比价定向爬虫”实例功能描述目标:获取淘宝搜索页面的信息,提取其中的商品名称和价格理解:淘宝的搜索接口翻页的处理技术路…

    2022年6月26日
    26
  • radius认证服务器ip该怎么填_radius认证服务器拒绝原因

    radius认证服务器ip该怎么填_radius认证服务器拒绝原因1.AAA和Radius概述  AAA是验证授权和记账Authentication,Authorization,andAccounting的简称。它是运行于NAS上的客户端程序,它提供了一个用来对验证、授权和记账这三种安全功能进行配置的一致的框架。AAA的配置实际上是对网络安全的一种管理,这里的网络安全主要指访问控制,包括哪些用户可以访问网络服务器,具有访问权的用户可以得到哪些服务,如何

    2025年8月3日
    7
  • 解惑4:java是值传递还是引用传递

    解惑4:java是值传递还是引用传递一、概述曾经纠结了很久java的参数传递方式是什么样的,后面粗略的了解了一鳞半爪以后有了大概的印象:“传参数就是值传递,传对象就是引用传递”,后面进一步查找了相关资料和文章以后,发现这么理解是不正确

    2022年8月16日
    7
  • el-table高度自适应_镶嵌html如何自适应

    el-table高度自适应_镶嵌html如何自适应分析如下图(此方案中使用的是ElementTable官网copy的代码(多用于OA,CMS,ERP这类系统中)如上图大体目前没有问题,但是存在细节问题那就是在table在滚动的过程中表头没有了如果说这里的列比较多,用户需要查看的数据在最后面,每次某个列的数据对应的是什么意思(尤其是表格数字比较多的话,非常恼火),需要上下来回滚动table内容才能解决所以说我们要解决的就是表头固定①(标记…

    2025年9月26日
    2

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号