Further improvements to the IoT Hub to TimescaleDB Azure Function

In the post Improving an Azure Function that writes IoT Hub data to TimescaleDB, we added some improvements to an Azure Function that uses the Event Hub trigger to write messages from IoT Hub to TimescaleDB:

  • use of the Event Hub enqueuedTime timestamp instead of NOW() in the INSERT statement (yes, I know, using NOW() did not make sense 😉)
  • make the code idempotent to handle duplicates (basically do nothing when a unique constraint is violated)

In general, I prefer to use application time (time at the event publisher) versus the time the message was enqueued. If you don’t have that timestamp, enqueuedTime is the next best thing.

How can we optimize the function even further? Read on about the cardinality setting!

Event Hub trigger cardinality setting

Our JavaScript Azure Function has its settings in function.json. For reference, here is its content:

{
"bindings": [
{
"type": "eventHubTrigger",
"name": "IoTHubMessages",
"direction": "in",
"eventHubName": "hub-pg",
"connection": "EH",
"cardinality": "one",
"consumerGroup": "pg"
}
]
}

Clearly, the function uses the eventHubTrigger for an Event Hub called hub-pg. In connection, EH refers to an Application Setting which contains the connections string to the Event Hub. Yes, I excel at naming stuff! The Event Hub has defined a consumer group called pg that we are using in this function.

The cardinality setting is currently set to “one”, which means that the function can only process one message at a time. As a best practice, you should use a cardinality of “many” in order to process batches of messages. A setting of “many” is the default.

To make the required change, modify function.json and set cardinality to “many”. You will also have to modify the Azure Function to process a batch of messages versus only one:

Processing batches of messages

With cardinality set to many, the IoTHubMessages parameter of the function is now an array. To retrieve the enqueuedTime from the messages, grab it from the enqueuedTimeUtcArray array using the index of the current message. Notice I also switched to JavaScript template literals to make the query a bit more readable.

The number of messages in a batch is controlled by maxBatchSize in host.json. By default, it is set to 64. Another setting,prefetchCount, determines how many messages are retrieved and cached before being sent to your function. When you change maxBatchSize, it is recommended to set prefetchCount to twice the maxBatchSize setting. For instance:

{
"version": "2.0",
"extensions": {
"eventHubs": {
"batchCheckpointFrequency": 1,
"eventProcessorOptions": {
"maxBatchSize": 128,
"prefetchCount": 256
}
}
}
}

It’s great to have these options but how should you set them? As always, the answer is in this book:

Afbeeldingsresultaat voor it depends joke

A great resource to get a feel for what these settings do is this article. It also comes with a Power BI report that allows you to set the parameters to see the results of load tests.

Conclusion

In this post, we used the function.json cardinality setting of “many” to process a batch of messages per function call. By default, Azure Functions will use batches of 64 messages without prefetching. With the host.json settings of maxBatchSize and prefetchCount, that can be changed to better handle your scenario.

Azure Functions with Consumption Plan on Linux

In a previous post, I talked about saving time-series data to TimescaleDB, which is an extension on top of PostgreSQL. The post used an Azure Function with an Event Hub trigger to save the data in TimescaleDB with a regular INSERT INTO statement.

The Function App used the Windows runtime which gave me networking errors (ECONNRESET) when connecting to PostgreSQL. I often encounter those issues with the Windows runtime. In general, for Node.js, I try to stick to the Linux runtime whenever possible. In this post, we will try the same code but with a Function App that uses the Linux runtime in a Consumption Plan.

Make sure Azure CLI is installed and that you are logged in. First, create a Storage Account:

az storage account create --name gebafuncstore --location westeurope --resource-group funclinux --sku Standard_LRS

Next, create the Function App. It references the storage account you created above:

az functionapp create --resource-group funclinux --name funclinux --os-type Linux --runtime node --consumption-plan-location westeurope --storage-account gebafuncstore

You can also use a script to achieve the same results. For an example, see
https://docs.microsoft.com/en-us/azure/azure-functions/scripts/functions-cli-create-serverless.

Now, in the Function App, set the following Application Settings. These settings will be used in the code we will deploy later.

  • host: hostname of the PostgreSQL server (e.g. servername.postgres.database.azure.com)
  • user: user name (e.g. user@servername)
  • password
  • database: name of the PostgreSQL database
  • EH: connection string to the Event Hub interface of your IoT Hub; if your are unsure how to set this, see this post

You can set the above values from the Azure Portal:

Application Settings of the Function App

The function uses the first four Application Settings in the function code via process.env:

Using Application Settings in JavaScript

The application setting EH is used to reference the Event Hub in function.json:

function.json with Event Hub details such as the connection, cardinality and the consumerGroup

Now let’s get the code from my GitHub repo in the Azure Function. First install Azure Function Core Tools 2.x. Next, create a folder called funcdemo. In that folder, run the following commands:

git clone https://github.com/gbaeke/pgfunc.git
cd pgfunc
npm install
az login
az account show

The npm install command installs the pg module as defined in package.json. The last two commands log you in and show the active subscription. Make sure that subscription contains the Function App you deployed above. Now run the following command:

func init

Answer the questions: we use Node and JavaScript. You should now have a local.settings.json file that sets the FUNCTIONS_WORKER_RUNTIME to node. If you do not have that, the next command will throw an error.

Now issue the following command to package and deploy the function to the Function App we created earlier:

func azure functionapp publish funclinux

This should result in the following feedback:

Feedback from function deployment

You should now see the function in the Function App:

Deployed function

To verify that the function works as expected, I started my IoT Simulator with 100 devices that send data every 5 seconds. I also deleted all the existing data from the TimescaleDB hypertable. The Live Metrics stream shows the results. In this case, the function is running smoothly without connection reset errors. The consumption plan spun up 4 servers:

Live Metrics Stream of IoT Hub to PostgreSQL function