#include
#include "MQTTClient.h"
//#include "led.h"
#include "DomoticzMessageParser.h"
#include "LED0.h"
#include "LED1.h"
#include "HardwareControl.h"
//#define __DEBUG
#include "dprintf.h"
struct opts_struct
{
char* clientid;
int nodelimiter;
char* delimiter;
enum QoS qos;
char* username;
char* password;
char* host;
int port;
int showtopics;
} opts =
{
(char*)"subscriber on STM32", 0, (char*)"\n", QOS2, NULL, NULL, (char*)"192.168.1.230", 1883, 0
};
int is_over;
void quit_domoticz_thread(void)
{
is_over = 1;
}
//================== Added 2017-Apr-27 8:06:53 start ==================
//处理链接的状态机结构
typedef struct
{
enum{
UNCONNECTED = 0,
NETWORK_CONNECTED=1,
MQTT_CONNECTED=2,
SUBSCRIBING_SUCCESS=3,
WAIT_TIME_OUT=4,
//SUBSCRIBING_FAILURE = 4
}state;
int timeout_s;//超时时间,单位为秒
int times_count;//累计连续尝试链接次数,在链接成功后清零
}Connection_t;
void connect_time_out(void * data)
{
Connection_t* con =(Connection_t*)data;
if(con && con->timeout_s>0)
{
con->timeout_s --;
}
}
#define NETWORK_CONNECT_TIMEOUT 5//5s
#define MAX_NETWORK_CONNECT_TIMES 5
#define MQTT_CONNECT_TIMEOUT 1//1s
#define MAX_MQTT_CONNECT_TIMES 5
#define SUBSCRIB_TIMEOUT 1//1s
#define MAX_SUBSCRIB_TIMES 5
#define MAX_NO_PING_RESPONS_TIMES 10
Connection_t connection={UNCONNECTED,NETWORK_CONNECT_TIMEOUT,0};
//================== Added 2017-Apr-27 8:06:53 end ===================
void messageArrived(MessageData* md)
{
MQTTMessage* message = md->message;
#if 0/* Commented @ 2017-Apr-23 1:18:29 */
if (opts.showtopics)
rt_kprintf("%.*s\t", md->topicName->lenstring.len, md->topicName->lenstring.data);
if (opts.nodelimiter)
rt_kprintf("%.*s", (int)message->payloadlen, (char*)message->payload);
else
rt_kprintf("%.*s%s", (int)message->payloadlen, (char*)message->payload, opts.delimiter);
#endif/* Commented */
dprintf("payloadlen=%d,%s\n",(int)message->payloadlen,(char*)message->payload);
dprintf("strlen(payload)=%d\n",strlen((char*)message->payload));
//fflush(stdout);
((char*)message->payload)[(int)message->payloadlen]=0;
ParseDomoticzMessage((char*)message->payload);
//dprintf("MSG: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
}
void set_host(char *host)
{
opts.host = host;
}
#define MAX_BUF_SIZE 512
void domoticz_thread_entry(void* parameter)
{
int rc = 0;
unsigned char buf[MAX_BUF_SIZE]={0};//buf[100];
unsigned char readbuf[MAX_BUF_SIZE]={0};//readbuf[100];
char* topic = "domoticz/out";
Network n;
MQTTClient c;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
rt_timer_t timer = rt_timer_create("connect_timer", connect_time_out, &connection, RT_TICK_PER_SECOND, RT_TIMER_FLAG_PERIODIC);
is_over = 0;
rt_kprintf("domoticz_thread_entry\n");
//================== Added 2017-Apr-26 2:36:53 start ==================
RegisterHardware(Create_LED0(),1);
RegisterHardware(Create_LED1(),6);
OpenHardwares();
SetupDomoitczMessageParser();
SetEnableParseItem(KEY_IDX);
SetEnableParseItem(KEY_NVALUE);
SetEnableParseItem(KEY_SWITCH_TYPE);
initHardwareSettings();
//================== Added 2017-Apr-26 2:36:53 end ===================
data.willFlag = 0;
data.MQTTVersion = 3;
data.clientID.cstring = opts.clientid;
data.username.cstring = opts.username;
data.password.cstring = opts.password;
data.keepAliveInterval = 10;
data.cleansession = 0;
rt_timer_start(timer);
while(!is_over)
{
switch(connection.state)
{
case UNCONNECTED:
//dprintf("\n");
if(connection.timeout_s>0)
continue;
dprintf("state = UNCONNECTED\n");
rt_kprintf("Connecting to %s:%d\n", opts.host, opts.port);
NetworkInit(&n);
rc = NetworkConnect(&n, opts.host, opts.port);
dprintf("rc=%d\n",rc);
if(rc==SUCCESS)
{//socket ????
connection.state = NETWORK_CONNECTED;
connection.times_count = 0;
rt_kprintf("NetworkConnect ok!\n");
//MQTTClientInit(&c, &n, 1000, buf, sizeof(buf), readbuf, sizeof(readbuf));
MQTTClientInit(&c, &n, 1000, buf,MAX_BUF_SIZE, readbuf, MAX_BUF_SIZE);
}
else if(connection.times_count
{dprintf("\n");
connection.times_count++;
connection.timeout_s = NETWORK_CONNECT_TIMEOUT;
}
else
{//reboot system,restart
dprintf("\n");
connection.times_count=0;
connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
//usleep(30*1000000);//30????
//goto exit;
}
break;
case NETWORK_CONNECTED:
if(connection.timeout_s>0)
continue;
dprintf("\n");
rt_kprintf("state = NETWORK_CONNECTED\n");
rc = MQTTConnect(&c, &data);
dprintf("rc=%d\n",rc);
if(rc == SUCCESS)
{
connection.state = MQTT_CONNECTED;
connection.times_count = 0;
connection.timeout_s = 0;
//printf("MQTTConnected!\n");
rt_kprintf("MQTTConnected! Subscribing to %s\n", topic);
}
else if(connection.times_count
{
//printf("MQTTConnect times=%d, err:%d! \n",connection.times_count,rc);
rt_kprintf("MQTTConnect times=%d, err:%d!\n",connection.times_count,rc);
connection.times_count++;
connection.timeout_s = MQTT_CONNECT_TIMEOUT;
}
else
{//????network??
dprintf("\n");
NetworkDisconnect(&n);
connection.state = UNCONNECTED;
connection.times_count=0;
connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
}
break;
case MQTT_CONNECTED:
if(connection.timeout_s>0)
continue;
dprintf("state = MQTT_CONNECTED\n");
rc = MQTTSubscribe(&c, topic, opts.qos, messageArrived);
dprintf("rc=%d\n",rc);
if(rc == SUCCESS)
{
rt_kprintf("Subscribed %s\n", topic);
connection.state = SUBSCRIBING_SUCCESS;
connection.times_count = 0;
}
else if(connection.times_count
{
rt_kprintf("MQTTSubscribe times=%d, err:%d! \n",connection.times_count,rc);
connection.times_count++;
connection.timeout_s = MQTT_CONNECT_TIMEOUT;
}
else
{
if(MQTTIsConnected(&c)==1)
MQTTDisconnect(&c);
else
MQTTCleanSession(&c);
NetworkDisconnect(&n);
connection.state = UNCONNECTED;
connection.times_count=0;
connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
}
break;
case SUBSCRIBING_SUCCESS:
MQTTYield(&c, 1000);
if(c.ping_timeout_times>=MAX_NO_PING_RESPONS_TIMES+1 || MQTTIsConnected(&c)==0)
{
if(MQTTIsConnected(&c)==1)
MQTTDisconnect(&c);
else
MQTTCleanSession(&c);
NetworkDisconnect(&n);
connection.state = UNCONNECTED;
connection.times_count = 0;
connection.timeout_s = NETWORK_CONNECT_TIMEOUT*6;
rt_kprintf("we will reconnect after %d senconds\n",connection.timeout_s);
}
else
{
connection.times_count = 0;
}
break;
default:
rt_kprintf("satte = default,err!\n");
break;
}
}
exit:
rt_kprintf("Stopping\n");
MQTTDisconnect(&c);
NetworkDisconnect(&n);
rt_timer_delete(timer);
}
void domoticz_thread_init(void)
{
rt_thread_t domoticz_thread;
domoticz_thread = rt_thread_create("DomoticzThread", domoticz_thread_entry, RT_NULL,
0xf00, 28, 10);
if (domoticz_thread != RT_NULL)
rt_thread_startup(domoticz_thread);
}
#ifdef RT_USING_FINSH
#include
FINSH_FUNCTION_EXPORT(set_host, set domoticz host ip addr);
FINSH_FUNCTION_EXPORT(domoticz_thread_init,to run domoticz thread );
FINSH_FUNCTION_EXPORT(quit_domoticz_thread,quit domoticz thread );
#endif