import os import requests import hashlib import json from typing import Optional, Union class api_Temperature: def __init__(self): self.session = requests.Session() self.base_url = "http://192.169.200.5:3900" self.base_url = os.getenv("ROOM_API_URL") # 获取加盐值 salt_url = f"{self.base_url}/api/Plugin.Base/Log/Salt" try: response = self.session.get(salt_url) response.raise_for_status() re_body = response.json() # print(f"re_body: {re_body}") fixed_salt = re_body.get("data", {}).get("FixedSalt") temp_salt = re_body.get("data", {}).get("TempSalt") # print(f"fixed_salt: {fixed_salt}") # print(f"temp_salt: {temp_salt}") # 登录 login_url = f"{self.base_url}/api/Plugin.Base/Log/In" account = os.getenv("ROOM_API_USERNAME") password = os.getenv("ROOM_API_USERPASS") remember_me = "true" # 计算密文 md5(md5({账号}+{固定加盐值}+{密码})+{临时加盐值}) password_step1 = self.md5_hash(account + fixed_salt + password) encrypted_password = self.md5_hash(password_step1 + temp_salt) post_data = { "Account": account, "Password": encrypted_password, "RememberMe": remember_me } login_response = self.session.post(login_url, data=post_data) login_response.raise_for_status() # 输出登陆结果 print("=" * 50) #Agent 智能体 print("登录结果详情:") # print(f"请求URL: {login_url}") # print(f"请求方法: POST") # print(f"请求体: {login_response.request.body}") print(f"响应状态码: {login_response.status_code}") print(f"响应内容: {login_response.text}") print("=" * 50) except requests.exceptions.RequestException as e: print(f"初始化失败: {e}") @staticmethod def md5_hash(text: str) -> str: """MD5哈希计算""" return hashlib.md5(text.encode('utf-8')).hexdigest() def get_temperature_value(self, object_id: str) -> float: """取单个温度值""" re_value = 0.0 url = f"{self.base_url}/api/Plugin.Data/Property/RealTimeData" params = { "Clazz": "Plugin.Env.Model.Device", "objectId": object_id, "provider": "Plugin.Env.Providers.Data.Device.Property_Point", "propertyId": "HT_Temp" } try: response = self.session.get(url, params=params) response.raise_for_status() re_body = response.json() # 屏幕打印日志 #print(f"[] {json.dumps(re_body)}") ht_temp_value = re_body.get("data", {}).get("value") if ht_temp_value: re_value = float(ht_temp_value) except (requests.exceptions.RequestException, ValueError) as e: print(f"获取温度值失败: {e}") return re_value def get_leak_judge_value(self, object_id: str, property_id: str) -> bool: """取单个漏水判定""" url = f"{self.base_url}/api/Plugin.Data/Property/RealTimeData" params = { "Clazz": "Plugin.Env.Model.Device", "objectId": object_id, "provider": "Plugin.Env.Providers.Data.Device.Property_Point", "propertyId": property_id } try: response = self.session.get(url, params=params) response.raise_for_status() re_body = response.json() #print(json.dumps(re_body)) mk_judge_value = re_body.get("data", {}).get("value") re_value = float(mk_judge_value) return re_value == 1 except (requests.exceptions.RequestException, ValueError) as e: print(f"获取漏水判定失败: {e}") return False def get_leak_locate_value(self, object_id: str, property_id: str) -> float: """取单个漏水位置""" re_value = 0.0 url = f"{self.base_url}/api/Plugin.Data/Property/RealTimeData" params = { "Clazz": "Plugin.Env.Model.Device", "objectId": object_id, "provider": "Plugin.Env.Providers.Data.Device.Property_Point", "propertyId": property_id } try: response = self.session.get(url, params=params) response.raise_for_status() re_body = response.json() #print(json.dumps(re_body)) mk_locate_value = re_body.get("data", {}).get("value") re_value = float(mk_locate_value) except (requests.exceptions.RequestException, ValueError) as e: print(f"获取漏水位置失败: {e}") return re_value def get_all_value(self) -> str: """获取所有值""" # 分院信息 SD_Temp_Main = 0.0 SD_Temp_UPS = 0.0 # 获取温度值 SD_Temp_Main = self.get_temperature_value("30.wsd5") SD_Temp_UPS = self.get_temperature_value("30.wsd1") # 获取漏水判断值 SD_Leak_Judge_Main1 = self.get_leak_judge_value("30.ls1", "Leak_Judge") SD_Leak_Judge_Main2 = self.get_leak_judge_value("30.ls2", "Leak_Judge") SD_Leak_Locate_Main1 = 0.0 SD_Leak_Locate_Main2 = 0.0 # 处理漏水定位逻辑[1](@ref) if SD_Leak_Judge_Main1: SD_Leak_Locate_Main1 = self.get_leak_locate_value("30.ls1", "Leak_Locate") if SD_Leak_Judge_Main2: SD_Leak_Locate_Main2 = self.get_leak_locate_value("30.ls2", "Leak_Locate") if SD_Leak_Locate_Main2 > SD_Leak_Locate_Main1: SD_Leak_Locate_Main1 = SD_Leak_Locate_Main2 #获取总院信息 LY_Temp_Main = 0.0 LY_Temp_UPS = 0.0 # 获取温度值[2](@ref) LY_Temp_Main = self.get_temperature_value("32.wsd3") LY_Temp_UPS = self.get_temperature_value("32.wsd1") # 获取漏水判断值 LY_Leak_Judge_Main = self.get_leak_judge_value("32.mk", "Path0") LY_Leak_Judge_UPS = self.get_leak_judge_value("32.mk", "Path1") # 构建分院报告 msgBody = "分院院区" if SD_Temp_Main == 0: msgBody += ",中心机房温度探测器故障" if SD_Temp_UPS == 0: msgBody += ",电源机房温度探测器故障" if SD_Temp_Main > 0: msgBody += f",中心机柜温度{SD_Temp_Main}" if SD_Temp_UPS > 0: msgBody += f",电源机柜温度{SD_Temp_UPS}" # 漏水状态判断 if not SD_Leak_Judge_Main1 and not SD_Leak_Judge_Main2: msgBody += ",中心机柜无漏水" if SD_Leak_Judge_Main1 or SD_Leak_Judge_Main2: msgBody += f",中心机柜漏水,漏水位置传感绳第{SD_Leak_Locate_Main1}米处" msgBody += "。" msgBody += " 总院院区" # 构建总院院区报告 if LY_Temp_Main == 0: msgBody += ",中心机柜温度探测器故障" if LY_Temp_UPS == 0: msgBody += ",电源机柜温度探测器故障" if LY_Temp_Main > 0: msgBody += f",中心机柜温度{LY_Temp_Main}" if LY_Temp_UPS > 0: msgBody += f",电源机柜温度{LY_Temp_UPS}" # 漏水状态判断 if not LY_Leak_Judge_Main: msgBody += ",中心机柜无漏水" if LY_Leak_Judge_Main: msgBody += ",中心机柜漏水" if not LY_Leak_Judge_UPS: msgBody += ",电源机柜无漏水" if LY_Leak_Judge_UPS: msgBody += ",电源机柜漏水" msgBody += "。" return msgBody if __name__ == "__main__": # 创建API实例(会自动登录) api = api_Temperature() # 获取所有值 getall = api.get_all_value() print(getall)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/235420.html原文链接:https://javaforall.net
