import unittest import settings import time import mosquitto import serial def on_message(mosq, obj, msg): obj.message_queue.append(msg) class mqtt_basic(unittest.TestCase): message_queue = [] @classmethod def setUpClass(self): self.client = mosquitto.Mosquitto("pubsubclient_ut", clean_session=True,obj=self) self.client.connect(settings.server_ip) self.client.on_message = on_message self.client.subscribe("outTopic",0) @classmethod def tearDownClass(self): self.client.disconnect() def test_one(self): i=30 while len(self.message_queue) == 0 and i > 0: self.client.loop() time.sleep(0.5) i -= 1 self.assertTrue(i>0, "message receive timed-out") self.assertEqual(len(self.message_queue), 1, "unexpected number of messages received") msg = self.message_queue[0] self.assertEqual(msg.mid,0,"message id not 0") self.assertEqual(msg.topic,"outTopic","message topic incorrect") self.assertEqual(msg.payload,"hello world") self.assertEqual(msg.qos,0,"message qos not 0") self.assertEqual(msg.retain,False,"message retain flag incorrect")