Building a real-time messaging server in Go

Often, I need a simple real-time server and web interface that shows real-time events. Although there are many options available like socket.io for Node.js or services like Azure SignalR and PubNub, I decided to create a real-time server in Go with a simple web front-end:

The impressive UI of the real-time web front-end

For a real-time server in Go, there are several options. You could use Gorilla WebSocket of which there is an excellent tutorial, and use native WebSockets in the browser. There’s also Glue. However, if you want to use the socket.io client, you can use https://github.com/googollee/go-socket.io. It is an implementation, although not a complete one, of socket.io. For production scenarios, I recommend using socket.io with Node.js because it is heavily used, has more features, better documentation, etc…

With that out of the way, let’s take a look at the code. Some things to note in advance:

  • the code uses the concept of rooms (as in a chat room); clients can join a room and only see messages for that room; you can use that concept to create a “room” for a device and only subscribe to messages for that device
  • the code use the excellent https://github.com/mholt/certmagic to enable https via a Let’s Encrypt certificate (DNS-01 verification)
  • the code uses Redis as the back-end; applications send messages to Redis via a PubSub channel; the real-time Go server checks for messages via a subscription to one or more Redis channels

The code is over at https://github.com/gbaeke/realtime-go.

Server

Let’s start with the imports. Naturally we need Redis support, the actual go-socket.io packages and certmagic. The cloudflare package is needed because my domain baeke.info is managed by CloudFlare. The package gives certmagic the ability to create the verification record that Let’s Encrypt will check before issuing the certificate:

import (
"log"
"net/http"
"os"

"github.com/go-redis/redis"
socketio "github.com/googollee/go-socket.io"
"github.com/mholt/certmagic"
"github.com/xenolf/lego/providers/dns/cloudflare"
)

Next, the code checks if the RTHOST environment variable is set. RTHOST should contain the hostname you request the certificate for (e.g. rt.baeke.info).

Let’s check the block of code that sets up the Redis connection.

// redis connection
client := redis.NewClient(&redis.Options{
Addr: getEnv("REDISHOST", "localhost:6379"),
})

// subscribe to all channels
pubsub := client.PSubscribe("*")
_, err := pubsub.Receive()
if err != nil {
panic(err)
}

// messages received on a Go channel
ch := pubsub.Channel()

First, we create a new Redis client. We either use the address in the REDISHOST environment variable or default to localhost:6379. I will later run this server on Azure Container Instances (ACI) in a multi-container setup that also includes Redis.

With the call to PSubscribe, a pattern subscribe is used to subscribe to all PubSub channels (*). If the subscribe succeeds, a Go channel is setup to actually receive messages on.

Now that the Redis connection is configured, let’s turn to socket.io:

server, err := socketio.NewServer(nil)
if err != nil {
log.Fatal(err)
}

server.On("connection", func(so socketio.Socket) {
log.Printf("New connection from %s ", so.Id())

so.On("channel", func(channel string) {
log.Printf("%s joins channel %s\n", so.Id(), channel)
so.Join(channel)
})

so.On("disconnection", func() {
log.Printf("disconnect from %s\n", so.Id())
})
})

The above code is pretty simple. We create a new socket.io server and subsequently setup event handlers for the following events:

  • connection: code that runs when a web client connects; gives us the socket the client connects on which is further used by the channel and disconnection handler
  • channel: this handler runs when a client sends a message of the chosen type channel; the channel contains the name of the socket.io room to join; this is used by the client to indicate what messages to show (e.g. just for device01); in the browser, the client sends a channel message that contains the text “device01”
  • disconnection: code to run when the client disconnects from the socket

Naturally, something crucial is missing. We need to check Redis for messages in Redis channels and broadcast them to matching socket.io “channels”. This is done in a Go routine that runs concurrently with the main code:

 go func(srv *socketio.Server) {
   for msg := range ch {
      log.Println(msg.Channel, msg.Payload)
      srv.BroadcastTo(msg.Channel, "message", msg.Payload)
   }
 }(server)

The anonymous function accepts a parameter of type socketio.Server. We use the BroadcastTo method of socketio.Server to broadcast messages arriving on the Redis PubSub channels to matching socket.io channels. Note that we send a message of type “message” so the client will have to check for “message” coming in as well. Below is a snippet of client-side code that does that. It adds messages to the messages array defined on the Vue.js app:

socket.on('message', function(msg){
app.messages.push(msg)
}

The rest of the server code basically configures certmagic to request the Let’s Encrypt certificate and sets up the http handlers for the static web client and the socket.io server:

// certificate magic
certmagic.Agreed = true
certmagic.CA = certmagic.LetsEncryptStagingCA

cloudflare, err := cloudflare.NewDNSProvider()
if err != nil {
log.Fatal(err)
}

certmagic.DNSProvider = cloudflare

mux := http.NewServeMux()
mux.Handle("/socket.io/", server)
mux.Handle("/", http.FileServer(http.Dir("./assets")))

certmagic.HTTPS([]string{rthost}, mux)

Let’s try it out! The GitHub repository contains a file called multi.yaml, which deploys both the socket.io server and Redis to Azure Container Instances. The following images are used:

  • gbaeke/realtime-go-le: built with this Dockerfile; the image has a size of merely 14MB
  • redis: the official Redis image

To make it work, you will need to update the environment variables in multi.yaml with the domain name and your CloudFlare credentials. If you do not use CloudFlare, you can use one of the other providers. If you want to use the Let’s Encrypt production CA, you will have to change the code, rebuild the container, store it in your registry and modify multi.yaml accordingly.

In Azure Container Instances, the following is shown:

socket.io and Redis container in ACI

To test the setup, I can send a message with redis-cli, from a console to the realtime-redis container:

Testing with redis-cli in the Redis container

You should be aware that using CertMagic with ephemeral storage is NOT a good idea due to potential Let’s Encrypt rate limiting. You should store the requested certificates in persistent storage like an Azure File Share and mount it at /.local/share/certmagic!

Client

The client is a Vue.js app. It was not created with the Vue cli so it just grabs the Vue.js library from the content delivery network (CDN) and has all logic in a single page. The socket.io library (v1.3.7) is also pulled from the CDN. The socket.io client code is kept at a minimum for demonstration purposes:

 var socket = io();
socket.emit('channel','device01');
socket.on('message', function(msg){
app.messages.push(msg)
})

When the page loads, the client emits a channel message to the server with a payload of device01. As you have seen in the server section, the server reacts to this message by joining this client to a socket.io room, in this case with name device01.

Whenever the client receives a message from the server, it adds the message to the messages array which is bound to a list item (li) with a v-for directive.

Surprisingly easy no? With a few lines of code you have a fully functional real-time messaging solution!

Deploying Azure Cognitive Services Containers with IoT Edge

Introduction

Azure Cognitive Services is a collection of APIs that make your applications smarter. Some of those APIs are listed below:

  • Vision: image classification, face detection (including emotions), OCR
  • Language: text analytics (e.g. key phrase or sentiment analysis), language detection and translation

To use one of the APIs you need to provision it in an Azure subscription. After provisioning, you will get an endpoint and API key. Every time you want to classify an image or detect sentiment in a piece of text, you will need to post an appropriate payload to the cloud endpoint and pass along the API key as well.

What if you want to use these services but you do not want to pass your payload to a cloud endpoint for compliance or latency reasons? In that case, the Cognitive Services containers can be used. In this post, we will take a look at the Text Analytics containers, specifically the one for Sentiment Analysis. Instead of deploying the container manually, we will deploy the container with IoT Edge.

IoT Edge Configuration

To get started, create an IoT Hub. The free tier will do just fine. When the IoT Hub is created, create an IoT Edge device. Next, configure your actual edge device to connect to IoT Hub with the connection string of the device you created in IoT Hub. Microsoft have a great tutorial to do all of the above, using a virtual machine in Azure as the edge device. The tutorial I linked to is the one for an edge device running Linux. When finished, the device should report its status to IoT Hub:

If you want to install IoT Edge on an existing device like a laptop, follow the procedure for Linux x64.

Once you have your edge device up and running, you can use the following command to obtain the status of your edge device: sudo systemctl status iotedge. The result:

Deploy Sentiment Analysis container

With the IoT Edge daemon up and running, we can deploy the Sentiment Analysis container. In IoT Hub, select your IoT Edge device and select Set modules:

In Set Modules you have the ability to configure the modules for this specific device. Modules are always deployed as containers and they do not have to be specifically designed or developed for use with IoT Edge. In the three step wizard, add the Sentiment Analysis container in the first step. Click Add and then select IoT Edge Module. Provide the following settings:

Although the container can freely be pulled from the Image URI, the container needs to be configured with billing info and an API key. In the Billing environment variable, specify the endpoint URL for the API you configured in the cloud. In ApiKey set your API key. Note that the container always needs to be connected to the cloud to verify that you are allowed to use the service. Remember that although your payload is not sent to the cloud, your container usage is. The full container create options are listed below:

{
"Env": [
"Eula=accept",
"Billing=https://westeurope.api.cognitive.microsoft.com/text/analytics/v2.0",
"ApiKey=<yourKEY>"
],
"HostConfig": {
"PortBindings": {
"5000/tcp": [
{
"HostPort": "5000"
}
]
}
}
}

In HostConfig we ask the container runtime (Docker) to map port 5000 of the container to port 5000 of the host. You can specify other create options as well.

On the next page, you can configure routing between IoT Edge modules. Because we do not use actual IoT Edge modules, leave the configuration as shown below:

Now move to the last page in the Set Modules wizard to review the configuration and click Submit.

Give the deployment some time to finish. After a while, check your edge device with the following command: sudo iotedge list. Your TextAnalytics container should be listed. Alternatively, use sudo docker ps to list the Docker containers on your edge device.

Testing the Sentiment Analysis container

If everything went well, you should be able to go to http://localhost:5000/swagger to see the available endpoints. Open Sentiment Analysis to try out a sample:

You can use curl to test as well:

curl -X POST "http://localhost:5000/text/analytics/v2.0/sentiment" -H  "accept: application/json" -H  "Content-Type: application/json-patch+json" -d "{  \"documents\": [    {      \"language\": \"en\",      \"id\": \"1\",      \"text\": \"I really really despise this product!! DO NOT BUY!!\"    }  ]}"

As you can see, the API expects a JSON payload with a documents array. Each document object has three fields: language, id and text. When you run the above command, the result is:

{"documents":[{"id":"1","score":0.0001703798770904541}],"errors":[]}

In this case, the text I really really despise this product!! DO NOT BUY!! clearly results in a very bad score. As you might have guessed, 0 is the absolute worst and 1 is the absolute best.

Just for fun, I created a small Go program to test the API:

The Go program can be found here: https://github.com/gbaeke/sentiment. You can download the executable for Linux with: wget https://github.com/gbaeke/sentiment/releases/download/v0.1/ta. Make ta executable and use ./ta –help for help with the parameters.

Summary

IoT Edge is a great way to deploy containers to edge devices running Linux or Windows. Besides deploying actual IoT Edge modules, you can deploy any container you want. In this post, we deployed a Cognitive Services container that does Sentiment Analysis at the edge.

IoT Hub Scaling

When you work with Azure IoT Hub, it is not always easy to tell what will happen when you reach the limits of IoT Hub and what to do when you reach those limits. As a reminder, recall that the scale of IoT Hub is defined by its tier and the number of units in the tier. There are three paying tiers, besides the free tier:

image

Although these tiers make it clear how many messages you can send, other limits such as the amount of messages per second cannot be seen here. To have an idea about the amount of messages you can send and the sustained throughput see https://azure.microsoft.com/en-us/documentation/articles/iot-hub-scaling/#device-to-cloud-and-cloud-to-device-message-throughput

The specific burst performance numbers can be found here: https://azure.microsoft.com/en-us/documentation/articles/iot-hub-devguide-quotas-throttling/. Typically, the limit you are concerned with is the amount of device-to-cloud sends which are as follows:

  • S1: 12/sec/unit (but you get at least 100/sec in total; not per unit obviously); 10 units give you 120/sec and not 100+120/sec
  • S2: 120/sec/unit
  • S3: 6000/sec/unit

Now suppose you think about deploying 300 devices which send data every half a second. What tier should you use and how many units? It is clear that you need to send 600 messages per second so 5 units of S2 will suffice. You could also take 50 units of S1 for the same performance and price. With 5 units of S2 though, you can send more messages.

Now it would be nice to test the above in advance. At ThingTank we use Docker containers for this and we schedule them with Rancher, a great and easy to use Docker orchestration tool. If you want to try it, just use the container you can find on Docker Hub or the new Docker Store (still in beta). Just search for gbaeke and you will find the following container:

image

If you want to check out the code (warning: written hastily!), you can find it on GitHub here: https://github.com/xyloscloudservices/docker-itproceed. It is a simple NodeJs script that uses the Azure IoT Hub libraries to create a new device in the registry with a GUID for the name. Afterwards, the code sends a simple JSON payload to IoT Hub every half a second.

To use the script, start it as follows with three parameters:

app.js IoT_Hub_Short_Name IoT_Hub_Connection_String millis

Note: the millis parameter is the amount of milliseconds to wait between each send

Now you can run the containers in Rancher (for instance). I won’t go into the details how to add Docker Hosts to Rancher and how to create a new Stack (as they call it). Alternatively, you can run the containers on Azure Container Service or similar solutions.

In the PowerBI chart below, you see the eventcount every five seconds which is around 420-440 events which is a bit lower than expected for one S1 unit:

image

Note: the spike you see happens after the launch of 300 containers; throttling quickly kicks in

When switched to 5 S2 units, the graph looks as follows:

image

You see the eventcount jump to 3000 (near the end) which is what you would expect (300 containers send 600 messages per second = 3000 messages per 5 seconds which is possible with 5 S2 units that deliver 120 messages/sec/unit)

You really need to think if you want to send data every half a second or second. For our ThingTank Air Quality solution, we take measurements every second but aggregate them over a minute at the edge. Sending every minute with 5 S2 units would amount to thousands of devices before you reach the limits of IoT Hub!

Bots in an IoT context

At ThingTank (@thingtankBE), we are constantly looking to expose IoT data in different ways. A chat bot can be a great way to ask for device measurements or even instruct devices to perform actions. In this post, I will describe a bot that gets air quality data for a meeting room with Slack.

I chose to write the bot in Node.js for simplicity reasons and publish it to Azure’s App Service. The basics about writing a bot with Node.js can be found in the documentation of Microsoft’s Bot Framework here: https://docs.botframework.com/en-us/node/builder/overview.

Our bot is really simple for now. After getting the basics up and running, the bot can be enhanced with a natural language interface. What we want to do now:

  • Set the room name and save it in the session (UserData)
  • Change the room name and save it in the session
  • Simple help: list the commands you can use
  • Get air quality measurements (a subset)

To achieve the above, you use dialogs, intents and some simple regular expressions. Check out the source code to see how it is done (remember, this is a basic script to get it working at a minimum). The basic logic is as follows:

  • If the intent is unknown, check if the room name is set. If not, switch to the /roomName dialog that asks for the room name and stores it in session.UserData
  • if the intent matches commands, repond with a list of commands
  • if the intent matches change room, switch to the /roomName dialog that asks for the room name
  • if the intent matches air quality, get the measurements for the selected room using the getRoom function in an external module airq.js. Our real-time air quality data comes from a pubsub channel and the getRoom function just retrieves it from there

Writing an intent is very simple. The change room intent for instance:

intents.matches(/^change room/i, [
function (session) {
session.send(“Ok, let’s change the room name…”);
session.beginDialog(‘/roomName’);
},
function(session, results) {
session.send(‘Changed room to %s’, session.userData.roomName);
}
]);

If you look at the source code, you will see we use the Chat Connector. When you are writing your bot in the beginning, I recommend you use the ConsoleConnector instead. You can then simply run your bot with node .js and interact with it from the command line. In our case, we use the ChatConnector so you should use the Bot Framework Channel Emulator from here to interact with and test your bot.

image

To get the emulator working, you need to obtain an App Id and App Password from Microsoft and make sure you use those in both your bot source code and the emulator. In the source code, these two values come from environment variables.Note that for local testing, you can leave these values blank.

Now it’s time to publish the bot on the web so we can register it with Microsoft and then enable it on Slack. To publish the bot, use the instructions here. You will use the Azure CLI and git to make this work so be sure to install both on your machine. After the bot is installed and running on App Service, set the environment variables for App Id and App Password in the website properties. Next, you can test your bot using the Channel Emulator.

Important: when you test your bot in the cloud using the Channel Emulator, be sure to use ngrok as specified here: https://docs.botframework.com/en-us/tools/bot-framework-emulator/#using-the-emulator-with-ngrok-to-interact-with-your-bot-in-the-cloud.

Now we have the bot running, it’s time to register it with Microsoft at https://dev.botframework.com/bots/new. As part of the registration process, you need to supply the URL to your bot in the cloud and obtain a new App Id and App Password. Update the website settings with these new values. After registration, you get:

image

From the above page, you can test your bot and add other channels. One of those channels is Slack. When you add Slack as a channel, you will be guided to create an app in Slack, authenticate, and of course, create a Slack bot. In Slack, you will get something like:

image

To summarize:

  • Creating a simple bot with the Bot Framework is easy; the fun starts when you want to enable things like natural language processing
  • When you deploy you bot to the cloud and want to test it with the Channel Emulator, use ngrok
  • When you want to deploy the bot to Slack, register the bot with Microsoft and simply add Slack as a channel