IoT in The Cloud

ThatCat Dev
9 min readFeb 7, 2023

--

This is a continuation of a previous article I wrote a while back in which I set up a solution to control my AC from a website. There have been quite some improvements, such as adding authentication, moving from using VPN to using MQTT, allowing me to communicate with the IoT device directly, and utilizing Kafka to push data into Mongodb for keeping a record of temperature and humidity changes. You can view my previous article here.

Moving to MQTT

With the update to OpenVPN causing changes in the config, thus breaking the connection to the OpenVPN server, there was a need to look for a better solution. Enter MQTT. MQTT is just a pub/sub.

“Pub/Sub is an asynchronous and scalable messaging service that decouples services producing messages from services processing those messages.”

MQTT is a good fit as the IoT device initiates the connection. I do not have to reach into my home network to access my IoT device, and I can securely access it by securing the MQTT server with TLS and authentication (TLS not enabled in this article). There are also enough libraries and support for MQTT for IoT devices. There are other benefits to MQTT, such as smaller payloads for more messages and sending/receiving messages are faster due to not having to open a new connection. You can see more details here.

Infrastructure

Having decided on MQTT, we can utilize other event-driven architecture. Moving from a pulling system (with HTTP, we pulled data from the ESP8266, such as room temperature and humidity), we can move to a push system (ESP8266 will push data to our service, and we will push actions to the ESP8266).

We can add a few more requirements:

  • Be able to turn on/off the AC from the web and talk directly with the IoT device.
  • Get the temperature at an interval of every minute and save it.
  • Be able to display a chart of the changes in temperature/humidity.

To satisfy these requirements, we will add two more pieces:

  • Kafka: To handle messages from MQTT (more on that later)
  • MongoDB: To save to a database with fast access (we could move to something better like InfluxDB or connect to Prometheus?)

Below is a diagram of the new architecture:

Architecture

Why Kafka

MQTT is great; however, if you lose connection or the downstream service crashes, you also lose whatever messages came in at that moment. With Kafka, a message stays until it is committed. So if a downstream service crashes, it can resume where it left off, and the message is not lost. Additionally, there are many open-source connectors to connect services together (we can directly put data we get in MQTT into MongoDB). We can only expand from the options Kafka provides us, such as aggregating multiple sensors and devices and importing them all into MongoDB.

Building the Infrastructure

For building the infrastructure, we already have some parts:

  • Frontend application
  • Backend application

Let's start with MQTT. For MQTT, I have decided to use HiveQM as they provide a helm chart, which is easy to deploy. However, you are limited to the number of connections you may have (25 connections). You can find the helm chart I used here.

Below are the values I used to deploy.

Notes:

  • pspEnabled to false due to Kubernetes version 1.25+ (see details here)
  • Added annotations to ports, as these need to be tied to a new IP address with MetalLB
operator:
# Deploy a custom resource based on the hivemq section below. Set to false if you want to create a HiveMQCluster object yourself.
# By setting this to false, the operator will not start a HiveMQ cluster when deployed.
deployCr: false
global:
rbac:
create: true
# Create a PodSecurityPolicy, cluster role, role binding and service account for the HiveMQ pods and assign the service account to them.
pspEnabled: false
hivemq:
nodeCount: "2"
security:
allowEmptyClientId: false
ports:
- name: "mqtt"
port: 1883
expose: true
patch:
- '[{"op":"add","path":"/spec/selector/hivemq.com~1node-offline","value":"false"},{"op":"add","path":"/metadata/annotations","value":{"service.spec.externalTrafficPolicy":"Local", "metallb.universe.tf/allow-shared-ip": "key-to-share-192.168.9.85"}}]'
# If you want Kubernetes to expose the MQTT port
- '[{"op":"add","path":"/spec/type","value":"LoadBalancer"},{"op":"add","path":"/spec/loadBalancerIP","value":"192.168.9.85"}]'

To create a HiveMQ cluster, you can follow the below:

Notes:

  • RBAC extension used to support authenticating
apiVersion: hivemq.com/v1
kind: HiveMQCluster
metadata:
name: hivemq-cluster1
spec:
hivemqVersion: 4.3.3
nodeCount: 3
cpu: '2000m'
memory: '4096M'
logLevel: INFO
mqtt:
maxQos: 2
monitoring:
enabled: true
dedicated: false
env:
- name: HIVEMQ_ALLOW_ALL_CLIENTS
value: "false"
extensions:
- name: hivemq-file-rbac-extension
extensionUri: https://github.com/hivemq/hivemq-file-rbac-extension/releases/download/4.5.3/hivemq-file-rbac-extension-4.5.3.zip
enabled: true
configMap: rbac-extension
ports:
- name: "mqtt"
port: 1883
expose: true
patch:
- '[{"op":"add","path":"/spec/selector/hivemq.com~1node-offline","value":"false"},{"op":"add","path":"/metadata/annotations","value":{"service.spec.externalTrafficPolicy":"Local", "metallb.universe.tf/allow-shared-ip": "key-to-share-192.168.9.85"}}]'
# If you want Kubernetes to expose the MQTT port
- '[{"op":"add","path":"/spec/type","value":"LoadBalancer"},{"op":"add","path":"/spec/loadBalancerIP","value":"192.168.9.85"}]'
- name: "cc"
port: 8080
expose: true
patch:
- '[{"op":"add","path":"/spec/sessionAffinity","value":"ClientIP"}]'
# If you want Kubernetes to expose the HiveMQ control center via a load balancer.
# Warning: You should consider configuring proper security and TLS beforehand. Ingress may be a better option here.
# - '[{"op":"add","path":"/spec/type","value":"LoadBalancer"}]'

Now that the MQTT is set up, we must set up our backend to push or subscribe to the MQTT broker.

import { Module } from '@nestjs/common'
import { ConfigModule } from '../config/config.module'
import { ConfigService } from '../config/config.service'
import { MQTTConfig } from './mqtt.config'
import { MQTTService } from './mqtt.service'

const MQTTFactory = {
provide: 'MQTT',
imports: [ConfigModule.register(MQTTConfig)],
useFactory: async (
optionsProvider: ConfigService<MQTTConfig>,
) => {
const mqttService = new MQTTService(
optionsProvider,
)
await mqttService.connect()
return mqttService
},
inject: [ConfigService],
}

@Module({
imports: [ConfigModule.register(MQTTConfig)],
providers: [MQTTFactory],
exports: ['MQTT'],
})
export class MQTTModule {}
import { Injectable } from '@nestjs/common';
import { ConfigService } from '../config/config.service';
import { MQTTConfig } from './mqtt.config';
import * as mqtt from 'mqtt';

@Injectable()
export class MQTTService {
private client: mqtt.MqttClient;
constructor(private readonly config: ConfigService<MQTTConfig>) {}

connect(): void {
this.client = mqtt.connect(`mqtt://${this.config.env.MQTT_HOST}:${this.config.env.MQTT_PORT}`, {
username: this.config.env.MQTT_USERNAME,
password: this.config.env.MQTT_PASSWORD,
});
this.client.on('connect', () => {
console.log('connected');
});
this.client.on('error', (error) => {
console.log('client', error); // never fires
});
}

publish(topic: string, message: string): void {
this.client.publish(topic, message);
}

subscribe(topic: string): void {
this.client.subscribe(topic);
}

}

You can see the full MQTT module used here.

Inside the ac module service, we can push messages:

async updateAC(data: IACData): Promise<void> {
const query: string = queryString.stringify(data)
this.mqttService.publish('ac', JSON.stringify(data))

return this.saveState(data)
}

We can test this is working by using the below script taken from HiveMQ:

/*
* Copyright 2021 HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import axios from 'axios';
import mqtt from 'mqtt';
import * as queryString from 'query-string'

// your credentials
const options = {
username: '',
password: '',
};

// connect to your cluster, insert your host name and port
const client = mqtt.connect('mqtt://hivemq.myhost.com:1883', options);



// prints a received message
client.on('message', async function(topic: string, message: Buffer) {
console.log(topic, message.toString()); // need to convert the byte array to string
});

// reassurance that the connection worked
client.on('connect', () => {
console.log('Connected!');
});

// prints an error message
client.on('error', (error) => {
console.log('Error:', error);
});

// subscribe and publish to the same topic
client.subscribe('ac');

Setting up the ESP8266

Now that our backend can send out commands to the MQTT broker, we need to set up our ESP8266 to listen to these. Using https://github.com/marvinroger/async-mqtt-client, this is quite straightforward:

void connectToMqtt() {
Serial.println("Connecting to MQTT...");
mqttClient.connect();
}

void onWifiConnect() {
Serial.println("Connected to Wi-Fi.");
connectToMqtt();
}

void onWifiDisconnect(const WiFiEventStationModeDisconnected& event) {
Serial.println("Disconnected from Wi-Fi.");
mqttReconnectTimer.detach(); // ensure we don't reconnect to MQTT while reconnecting to Wi-Fi
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason) {
Serial.println("Disconnected from MQTT.");

if (WiFi.isConnected()) {
mqttReconnectTimer.once(2, connectToMqtt);
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
}

void onMqttUnsubscribe(uint16_t packetId) {
Serial.println("Unsubscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}

void onMqttPublish(uint16_t packetId) {
Serial.println("Publish acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
}

void publishRoomConditions() {
String data = getRoomConditions();
mqttClient.publish("roomconditions", 1, true, data.c_str());
}

void onMqttConnect(bool sessionPresent) {
Serial.println("Connected to MQTT.");
Serial.print("Session present: ");
Serial.println(sessionPresent);
mqttClient.subscribe("ac", 1);
}

void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
Serial.println("Publish received.");
Serial.print(" topic: ");
Serial.println(topic);
Serial.print(" qos: ");
Serial.println(properties.qos);
Serial.print(" dup: ");
Serial.println(properties.dup);
Serial.print(" retain: ");
Serial.println(properties.retain);
Serial.print(" len: ");
Serial.println(len);
Serial.print(" index: ");
Serial.println(index);
Serial.print(" total: ");
Serial.println(total);
Serial.println(" payload: ");
// payload to string
String payloadString = "";
for (int i = 0; i < len; i++) {
payloadString += (char)payload[i];
}
Serial.println(payloadString);
// topic to string
String str = "";
for (int i = 0; i < strlen(topic); i++) {
str += (char)topic[i];
}
Serial.println(str);
if (str == "ac") {
Serial.println("Setting AC");
setAC(payloadString);
}
}
void setup(void) {
mqttClient.onConnect(onMqttConnect);
mqttClient.onDisconnect(onMqttDisconnect);
mqttClient.onSubscribe(onMqttSubscribe);
mqttClient.onUnsubscribe(onMqttUnsubscribe);
mqttClient.onMessage(onMqttMessage);
mqttClient.onPublish(onMqttPublish);
mqttClient.setCredentials("admin-user", "admin-password");
mqttClient.setServer(MQTT_HOST, MQTT_PORT);
wifiConnectionManager.setOnConnectCallback(onWifiConnect);
wifiConnectionManager.setup();
}

Keep in mind, since this is async, and if you use the IRremoteESP8266 library, you will need to set the: ALLOW_DELAY_CALLS=false, which can be set by providing this as an argument for build flags. Below is a snippet of my platformio.ini file.

; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html

[env:nodemcu]
platform = espressif8266
board = nodemcu
framework = arduino
lib_ldf_mode = deep+
upload_port = /dev/cu.usbserial*
build_type = debug
monitor_filters = esp8266_exception_decoder
build_flags =
'-DUMM_POISON_CHECK'
'-D ALLOW_DELAY_CALLS=false'
board_build.filesystem = littlefs
board_build.ldscript = eagle.flash.4m2m.ld
monitor_speed = 115200
lib_deps =
;Async Timer
https://github.com/Aasim-A/AsyncTimer
;Async MQTT Dependency
https://github.com/me-no-dev/ESPAsyncTCP
;Async MQTT client
https://github.com/marvinroger/async-mqtt-client
;WIFI MANAGER
https://github.com/tzapu/WiFiManager

;IRREMOTE
https://github.com/markszabo/IRremoteESP8266

;DHT Sensor
https://github.com/adafruit/DHT-sensor-library

;ArduinoJSON
https://github.com/bblanchon/ArduinoJson.git

;ESPAsyncWebServer
https://github.com/me-no-dev/ESPAsyncWebServer

;AsyncTCP
https://github.com/me-no-dev/AsyncTCP

With this, you can now subscribe to events from MQTT and handle them to turn on or off the AC.

The Final Touches

At this point, we have a working system where we can control our AC from a webpage and could stop here. However, if you are like me, you want to do a little more. I like graphs.

Temperature in Celcius
Humidity Percentage

But, to create graphs, we need data. I have a module on my ESP8266 that can get the temperature and humidity from the air. Since we are using MQTT, we can just push this data to MQTT and save it to a database. As mentioned before, we will use Kafka to connect MQTT and MongoDB to save this data. Our backend can then communicate with MongoDB to get this data to build a graph. We can easily do this by using https://github.com/Aasim-A/AsyncTimer.

#include "AsyncTimer.h"

AsyncTimer t;

void onMqttConnect(bool sessionPresent) {
t.setInterval([]() {
publishRoomConditions();
}, 60000);
}

void loop(void) {
t.handle();
}

This will send a message to roomconditions every minute. Using Kafka, we can use a connector and a sink (MQTT is the connector, MongoDB is the sink). I have deployed Kafa with Strimzi. It is quite involved, so here is a reference to their docs: https://strimzi.io/documentation/. You can also use Confluent Cloud if you like; however, they do not have as many open-source connectors.

Example Strimzi connector is below:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mqtt-connector
annotations:
strimzi.io/use-connector-resources: "true"
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.mqtt5source.CamelMqtt5sourceSourceConnector
tasksMax: 2
config:
topics: roomconditions
group.id: my-connect-cluster
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
tasksMax: 2
camel.kamelet.mqtt5-source.topic: roomconditions
camel.kamelet.mqtt5-source.brokerUrl: tcp://hivemq-hivemq-cluster1-mqtt.hivemq.svc.cluster.local:1883
camel.kamelet.mqtt5-source.username: admin-user
camel.kamelet.mqtt5-source.password: admin-password
camel.kamelet.mqtt5-source.clientId: roomconditions

Repos for backend, frontend, ESP8266:

--

--

ThatCat Dev
ThatCat Dev

Written by ThatCat Dev

Professional Software Engineer

No responses yet