Android MQTT客户端

Android MQTT客户端目的是为了实时监控室外广告屏的亮度 界面 声音 开关机等等 因为室外的网络情况是随时可变的 所以采用的 MQTT 协议 作为 Android 客户端来说因为用 MQTT 发送消息太繁琐 我们采用的是客户端只接收命令 然后用 Http 进行数据反馈 这个项目近期也做完了 故记录一下 第一步 导入在线库 mqtt 包导入 implementati org eclipse paho org eclipse paho client mqttv3 1 1 0 implementati org ecli

目的是为了实时监控室外广告屏的亮度,界面,声音,开关机等等…… 因为室外的网络情况是随时可变的,所以采用的MQTT协议,作为Android客户端来说因为用MQTT发送消息太繁琐,我们采用的是客户端只接收命令,然后用Http进行数据反馈,这个项目近期也做完了,故记录一下。

第一步:导入在线库

// mqtt 包导入 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

第二步:声明广播(这个是在线库自带的 直接这么写就行,不用自己写这个service)

 
 

第三步:MqttManager的全部代码

public class MqttManager { public final static String TAG = MqttManager.class.getSimpleName(); @SuppressLint("StaticFieldLeak") private static volatile MqttManager mInstance = null; private MqttCallback mCallback; public MqttClient client; private MqttConnectOptions conOpt; private Context context; public String[] topic; private MqttManager(Context context) { mCallback = new MqttCallbackBus(context); this.context = context; } public static MqttManager getInstance(Context context) { if (mInstance == null) { synchronized (MqttManager.class) { if (mInstance == null) { mInstance = new MqttManager(context); } } } return mInstance; } / * 释放单例, 及其所引用的资源 */ public static void release() { try { if (mInstance != null) { mInstance.disConnect(); mInstance = null; } } catch (Exception e) { e.printStackTrace(); } } / * 创建Mqtt 连接 * * @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863) * @param userName 用户名 * @param password 密码 * @return */ public boolean createConnect(String brokerUrl, String userName, String password) { topic = new String[]{PUBLISH_TOPIC}; String deviceId = Utils.getDeviceNo(); if (client != null && client.isConnected()) { return true; } boolean flag = false; try { conOpt = new MqttConnectOptions(); conOpt.setCleanSession(true); //不接收离线期间的消息 每次都是重新登陆 conOpt.setAutomaticReconnect(true); //自动重连 if (!TextUtils.isEmpty(password)) { conOpt.setPassword(password.toCharArray()); } if (!TextUtils.isEmpty(userName)) { conOpt.setUserName(userName); } client = new MqttClient(brokerUrl, deviceId, new MemoryPersistence()); client.setCallback(mCallback); flag = doConnect(); client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); //重连 String msg = "exception_reconnect"; EventBus.getDefault().post(msg); } return flag; } / * 建立连接 * * @return */ private boolean doConnect() { boolean flag = false; if (client != null) { try { client.connect(conOpt); flag = true; } catch (Exception e) { e.printStackTrace(); } } return flag; } public boolean publish(String topicName, int qos, byte[] payload) { boolean flag = false; try { if (client == null) { createConnect(HOST, USERNAME, PASSWORD); } if (!client.isConnected()) { this.reconnect(); } MqttMessage message = new MqttMessage(payload); message.setQos(qos); client.publish(topicName, message); flag = true; } catch (MqttException e) { e.printStackTrace(); } return flag; } private boolean subscribe(String topicName, int qos) { boolean flag = false; if (client != null && client.isConnected()) { try { client.subscribe(topicName, qos); flag = true; } catch (MqttException e) { e.printStackTrace(); } } return flag; } private boolean subscribe(String[] topicName, int qos[]) { boolean flag = false; if (client != null && client.isConnected()) { try { client.subscribe(topicName, qos); flag = true; } catch (MqttException e) { e.printStackTrace(); } } return flag; } / * 取消连接 * * @throws MqttException */ public void disConnect() throws MqttException { if (client != null && client.isConnected()) { client.disconnect(); } } / * 关闭连接 */ public void close() { if (client != null && client.isConnected()) { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } / * 重新连接 */ private void reconnect() { if (client != null && !client.isConnected()) { try { client.setCallback(mCallback); client.connect(conOpt); client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } } }

里面用eventBus做了下重连处理,后续响应如下

boolean isHavNet = isConnectIsNomarl(); //判断是否有网 if (isHavNet) { handler.removeMessages(4); MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD); }else{ handler.sendEmptyMessageDelayed(4, 10000);//无网等待有网再连 }

第四步:mqtt消息回调类

public class MqttCallbackBus implements MqttCallbackExtended { private Context mContext; public MqttCallbackBus(Context context) { this.mContext = context; } @Override public void connectComplete(boolean reconnect, String serverURI) { Log.e("MqttCallbackBus", "MQTT_connectComplete:"); //断开连接必须重新订阅才能收到之前订阅的session连接的消息 if(reconnect){ Log.e("MqttCallbackBus_重连订阅主题", "MQTT_connectComplete:"); //这里是发送消息去重新订阅 String msg = "reconnect"; EventBus.getDefault().postSticky(msg); } } @Override public void connectionLost(Throwable cause) { //掉线 Log.e("MqttCallbackBus>>>", "MQTT_connectionLost 掉线原因:"+cause.getMessage()); cause.printStackTrace(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { MqttReceivedMessage msg = (MqttReceivedMessage) message; String s = msg.toString(); Log.e("MQTT_messageArrived_msg", "MQTT_msg"+topic); HashMap hs = new Gson().fromJson(s, HashMap.class); EventBus.getDefault().post(hs); //抛出收到的消息 eventbus响应做自己的业务处理 } @Override public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 token.isComplete() Log.e("MqttCallbackBus>>>", "MQTT_deliveryComplete:"); } }

其中connectComplete的重连回调reconnect = true 的eventbus的响应是:

//重连 能进重连 说明必有网了 不必再else 定时重试网络 String[] topic = new String[]{PUBLISH_TOPIC}; try { MqttManager.getInstance(BaseApplication.getContext()).client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); }

MainActivity的使用如下:

//网络变化监听 首次连接mqtt boolean isHavNet = isConnectIsNomarl(); if (isHavNet) { handler.removeMessages(1); MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD); } else { handler.sendEmptyMessageDelayed(1, 5000); }

MQTT全局的配置参数:

Android MQTT客户端

 

附赠个判断是否有网的工具类

/ * 判断网络是否连接 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); @SuppressLint("MissingPermission") NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isConnected()) { String name = info.getTypeName(); return true; } else { return false; } }
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/228484.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月16日 下午6:54
下一篇 2026年3月16日 下午6:55


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号