node-red

3 min read

To store sensor data from my mqtt message broker I use influxdb. Influxdb is a time series database to make historical analysis, predictive analysis and anomaly detection. The nice thing about it, its open source (MIT) and easy to install, though not so easy to configure or manage.

In this post I will describe how I use influxdb to archive my data, I will describe how the data goes into influxdb and how I use the retention policies and continuous queries to reduce the amount of data points over time.

Influxdb - A brief summary

Influxdb uses a totally different approach as relational database (e.g. mysql), it is a so called NoSQL (Not only SQL) database.
In influxdb everything revolves around time. Time is the central concept in influxdb hence all data in the database are associated with a corresponding timestamp.
Further the data points have at least one field set. Field sets are used to store data in key-value pairs, with the field key as a string describing the value. Each data point can have multiple of such field sets.
Beside that all data points can optionally have multiple tags as another key-value pair. They are used to store further meta data to semantically describe the data point. and can help you to create easy queries.
All the data points are aggregated in a measurement which acts as container for data points and are conceptional similar to tables. They are identified by a name.

Here is a short example from my influxdb database, at the moment I only use one field for each measurement, but this may change in the future:

> select * from temperature LIMIT 10
name: temperature
-----------------
time				heatings	places		rooms		value
2016-11-12T17:15:29.233Z			Moensheim			1.7
2016-11-12T17:15:30.945Z			Moensheim	Schlafzimmer	17.1
2016-11-12T17:15:30.951Z	Essbereich	Moensheim	Wohnzimmer	22
2016-11-12T17:15:30.97Z		Heizung		Moensheim	Schlafzimmer	17.1
2016-11-12T17:15:31.086Z			Moensheim	Wohnzimmer	22
2016-11-12T17:16:29.886Z	Heizung		Moensheim	Bad		15.5
2016-11-12T17:16:29.892Z			Moensheim	Bad		15.5
2016-11-12T17:16:33.482Z			Moensheim			0.7
2016-11-12T17:17:36.856Z	Heizung		Moensheim	Schlafzimmer	17.1
2016-11-12T17:17:36.859Z			Moensheim	Schlafzimmer	17.1

Each measurement belongs to a specific retention policy. The policy defines how long data points are stored and when they should be discarded to free up space. They also define how many copies of the data should be saved if you have a cluster.

Every data point with the same retention policy, measurement and tag set belongs to the same series.

Retention policies and continuous queries

Each database can have multiple retention policies, one of them has to be set as default where all new points are stored. I use three retention policies to store my measurements:

  • default: Retention: 1 week;
  • a_year: Retention: 1 year; Data point interval: 1h;
  • infinite: Retention: infinite; Data point interval: 1d;

All new data points are saved in the default retention policy and are deleted after 1 week. To move points between retention policies you can use continuous queries or use the INTO clause in a normal query. Both require a function, an INTO clause and a GROUP BY time() clause.

Continuous queries are executed in the time interval defined in the GROUP BY time() clause. Here are the continuous queries I use to move data from the default retention policy to the a_year and the infinite retention policy:

name: metrics
-------------
name	query
cq_1d	CREATE CONTINUOUS QUERY cq_1d ON metrics BEGIN SELECT mean(*) INTO metrics.infinite.:MEASUREMENT FROM metrics."default"./.*/ GROUP BY time(1d), * END
cq_1h	CREATE CONTINUOUS QUERY cq_1h ON metrics BEGIN SELECT mean(*) INTO metrics.a_year.:MEASUREMENT FROM metrics."default"./.*/ GROUP BY time(1h), * END

The goal of the continuous query is to reduce the amount of data stored. There for the mean function together with the GROUP BY time() clause is used. This means for the provided example in the Listing above, that every hour the mean value over the last hour will be calculated for every series in the default retention policy. The calculated value will then be moved to the a_year retention policy and saved for one year. The second contininuous query will calculate the same but for the last day.
The raw data in the default retention policy will be deleted after one weak, at this moment only the aggregated data will be available.

How the data get into influxdb

As priviously described the central data hub in my system is a MQTT broker. The data flow is managed by node-RED, hence the data points are added with a specific flow in node-RED, that can be seen in the screenshot.

Input node is a mqtt node to subscribe to everything in [Places]/# followed by a function to transform the mqtt topic into tags:

var regexTags = /(?:\[([\w]*)\]\/([^\/]*)?)/g;
var regexMeasurement = /^(?:\[[\w]+\]\/[\w]+\/)*([\w]+)$/g;

if (measurement = regexMeasurement.exec(msg.topic)) {
    msg.measurement = measurement[1];
    
    msg.tags = {};
    while(match = regexTags.exec(msg.topic))
    {
        msg.tags[match[1].toLowerCase()] = match[2];
    }
    return msg;
}

In the following function the message is formated into the database format:

node.send(
    {
        topic: msg.topic,
        payload: [
            JSON.parse(msg.payload),
            msg.tags
        ],
        measurement: msg.measurement
    }
); 

After filtering out some unwanted measurements the data is written into the influxdb database.

Jochen Scheib

Read more posts by this author.