[C#, WPF]MQTTクライアントの基本(M2Mqtt)

スポンサーリンク
.Net FrameWork

はじめに

本記事は、C#でMqttクライアント(M2Mqtt)を実装した時のメモ書きです。
今回は接続、切断などの基本的な部分の確認です。

M2Mqttライブラリをインストールするところから順番に確認していきます。
broker(サーバー側)はmosquittoを使います。

以下で紹介するコードの環境はWindows10でWPFアプリを使っています。

mosquittoブローカー起動

インストールは以下から。
今回はWindows10なのでWindows64bit版をインストールします。

Download
Source mosquitto-2.0.18.tar.gz (GPG signature) Git source code repository (github.com) Older downloads are available at ...

インストール後は、ディレクトリの移動

cd C:\Program Files\mosquitto

移動後、mosquittto起動

mosquitto.exe -v

起動に成功した場合、以下のような表示が続きに表示されます。

1680622327: mosquitto version 2.0.15 starting
1680622327: Using default config.
1680622327: Starting in local only mode. Connections will only be possible from clients running on this machine.
1680622327: Create a configuration file which defines a listener to allow remote access.
1680622327: For more details see https://mosquitto.org/documentation/authentication-methods/
1680622327: Opening ipv4 listen socket on port 1883.
1680622327: Opening ipv6 listen socket on port 1883.
1680622327: mosquitto version 2.0.15 running

ということでブローカー(サーバー側)の用意はこれで完了です。
次は、WPFでM2Mqttライブラリをインストールして接続する処理などを用意してきます。

プロジェクト作成

VisualStudioで新規の「WPFアプリ(.Net Framework)」を作成します。

M2Mqttインストール

NuGetの管理コンソールを開いて、「M2Mqtt」で検索しインストールを行います。

MqttClientクラス作成の基本構成

まずは、MqttClientクラスを作成。
クラス名は「MqttListener」としておきます。(適当なので各々クラス名は変更して)
※名前空間の追加忘れないでください(using uPLibrary.Networking.M2Mqtt;)

using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace WpfApp1
{
    public class MqttListener
    {
        //Mqttクライアント
        private MqttClient _client;       
    }
}

今回はMyMqttClientクラスに以下のメソッドを用意していきます。

  • Connect()・・・接続
  • Disconnect()・・・切断
  • Subscribe()・・・受信
  • Publish()・・・送信
  • AddTopic()・・・トピック追加
  • RemoveTopic()・・・トピック削除
  • CheckConnectStatus()・・・接続状態確認

Connect()とDisconnect()

接続

/// <summary>
/// 接続
/// </summary>
/// <param name="host">ホスト名</param>
/// <param name="port">ポート番号</param>
/// <param name="username">ユーザー名</param>
/// <param name="userpass">ユーザーパス</param>
public bool Connect(string host, int port, string username, string userpass)
{
    try
    {
        _client = new MqttClient(host, port, false, null, null, MqttSslProtocols.None);
        if (_client == null)
        {
            return false;
        }
        byte ret = _client.Connect(Guid.NewGuid().ToString(), username, userpass);
        //受信ハンドラ追加 
        //MqttReceived()は後述
        _client.MqttMsgPublishReceived += MqttReceived;
        return true;
    }
    catch
    {
        return false;
    }           
}

とりあえずここでは、接続までです。
受信とトピックの追加はあとで行います。

切断

/// <summary>
/// 切断
/// </summary>
public void Disconnect()
{
    if (_client != null)
    {
        _client.Disconnect();
        _client = null;
    }
}

Subscribe()

受信したメッセージを処理する関数です。

/// <summary>
/// 受信ハンドラ
/// </summary>
private void MqttReceived(object sender, MqttMsgPublishEventArgs e)
{
    string topic    = e.Topic;
    string message  = Encoding.ASCII.GetString(e.Message);
    //メッセージを使った処理
    Console.WriteLine($"[topic]{topic} [msg]{message}");
}

AddTopic()とRemoveTopic()

トピック追加するときの気を付ける箇所が1つあります。
このあと、トピック追加するときにMqttQosというものを設定します。

MqttQos

Mqttの通信の品質のことです3段階あります。

Qos0送信する回数は1回のみ(エラーで届かない可能性もある)。
メッセージが届くことが保証されていません。
Qos1エラーなどでメッセージが届かなかった場合に再送される仕組みが含まれている
送信側はメッセージを送ると、受信側からACKが届くのを待つ。
メッセージは少なくとも1回は届く。
但し、重複チェックの機能はないため複数回メッセージが届いてしまう可能性もある。
Qos2メッセージが必ず1回届く。そのために再送と重複チェックを行う仕組みが含まれている。

m2mqttではこんな感じです。

再送処理重複チェックm2mqtt
Qos0××MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE
Qos1×MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE
Qos2MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE

登録トピックの追加

ここでは、トピックの追加とあわせて、先ほど説明したMqttQosの設定も併せて行います。
以下では、qos0(MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE)をセットしています。

/// <summary>
/// トピック追加
/// </summary>
/// <param name="topic">追加トピック</param>
public bool AddTopic(string topic)
{
    if (_client == null)
    {
        return false;
    }
    _client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
    return true;
}

登録トピックの削除

/// <summary>
/// トピック削除
/// </summary>
/// <param name="topic">削除トピック</param>
public bool RemoveTopic(string topic)
{
    if (_client == null)
    {
        return false;
    }
    var ret = _client.Unsubscribe(new string[] { topic });
    return true;
}

Publish()

/// <summary>
/// 送信
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool Publish(string topic, string message)
{
    if (_client == null)
    {
        return false;
    }
    _client.Publish(topic, Encoding.UTF8.GetBytes(message));
    return true;
}

接続状態確認

/// <summary>
/// 接続状態チェック
/// </summary>
/// <returns></returns>
public bool CheckConnectStatus()
{
    return _client.IsConnected;
}

MqttListenerクラス全体

上記で説明したものをまとめたものです。
例外処理やエラー処理などはほぼ入れていませんので利用される場合にはそれらを追加してください。

using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
namespace WpfApp1
{
    public class MqttListener
    {
        //Mqttクライアント
        private MqttClient _client;
        
        /// <summary>
        /// コンストラクタ
        /// </summary>
        public MqttListener()
        {
            //処理なし
        }
        /// <summary>
        /// 接続
        /// </summary>
        /// <param name="host">ホスト名</param>
        /// <param name="port">ポート番号</param>
        /// <param name="username">ユーザー名</param>
        /// <param name="userpass">ユーザーパス</param>
        public bool Connect(string host, int port, string username, string userpass)
        {
            try
            {
                _client = new MqttClient(host, port, false, null, null, MqttSslProtocols.None);
                if (_client == null)
                {
                    return false;
                }
                byte ret = _client.Connect(Guid.NewGuid().ToString(), username, userpass);
                //受信ハンドラ追加 
                //MqttReceived()は後述
                _client.MqttMsgPublishReceived += MqttReceived;
                return true;
            }
            catch
            {
                return false;
            }
        }
        /// <summary>
        /// 切断
        /// </summary>
        public void Disconnect()
        {
            if (_client != null)
            {
                _client.Disconnect();
                _client = null;
            }
        }
        /// <summary>
        /// 受信ハンドラ
        /// </summary>
        private void MqttReceived(object sender, MqttMsgPublishEventArgs e)
        {
            string topic = e.Topic;
            string message = Encoding.ASCII.GetString(e.Message);
            //メッセージを使った処理
            Console.WriteLine($"[topic]{topic} [msg]{message}");
        }
        /// <summary>
        /// トピック追加
        /// </summary>
        /// <param name="topic">追加トピック</param>
        public bool AddTopic(string topic)
        {
            if (_client == null)
            {
                return false;
            }
            _client.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
            return true;
        }
        /// <summary>
        /// トピック削除
        /// </summary>
        /// <param name="topic">削除トピック</param>
        public bool RemoveTopic(string topic)
        {
            if (_client == null)
            {
                return false;
            }
            var ret = _client.Unsubscribe(new string[] { topic });
            return true;
        }
        /// <summary>
        /// 送信
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public bool Publish(string topic, string message)
        {
            if (_client == null)
            {
                return false;
            }
            _client.Publish(topic, Encoding.UTF8.GetBytes(message));
            return true;
        }
    }
}

m2mqtt example

上記で紹介、作成したMqttListenerクラスの利用サンプルです。

bool ret;
//オブジェクト作成
MqttListener listener = new MqttListener();
//接続
//※ホスト名のところで指定しているIPアドレスなどは任意に書き換え
ret = listener.Connect("192.168.10.242", 1883, "username", "userpass");
if (!ret)
{
    //接続失敗
    return;
}
            
//トピック追加
listener.AddTopic("testA");
//トピックさらに追加
listener.AddTopic("testB");
//トピック削除したい
listener.RemoveTopic("testB");
//接続が切れるとループを抜ける
while (listener.CheckConnectStatus())
{
}
//切断
listener.Disconnect();

Mqttブローカーとの切断を検出

方法は2つあります。
1つは接続から別スレッドで行い、whileでIsConnectedプロパティをチェックする。
もう1つは、タイマー処理などで指定秒毎にIsConnectedプロパティをチェックする。

1つ目の別スレッドで接続して切断するまでの処理を行うのが無難かな…

タイトルとURLをコピーしました
デスクトップ用コンテンツ
タブレット用コンテンツ
モバイル用コンテンツ