influxDB


influxdb相关知识

influxDB是用golang开发的一款开源的时序库。所谓时序库,就是其中存放的数据与时间 有着密切的关系。现实中某些场景,把时间作为x轴,数值作为y轴,定时的不断采样,将 数据绘制成二维坐标的点,连点成线,以此进行数据的统计和分析预测。

influxDB可应用在包括运维监视,应用程序度量,存储无联网传感器数据和实时分析系统等 场景下的存储海量时间戳的时序库。通过配置,influxDB可以保存定义时间长度的数据, 自动删除已超期的数据存储系统。

InfluxDB提供与传统关系型数据库几乎一样的SQL语句来查询数据。

influxdb 相关名词

  • database: 数据库
  • measurement: 数据库中的表
  • points: 表中的行

point由时间戳(time), 数据(field)和标签(tags)组成。

time: 每条记录的时间戳,可自己生成,也可由influxdb自动生成 fields: 各种记录的数值 tags: 标签。在influxDB中,tag+表名一起作为数据库的索引,是k-v形式。


influxdb时序库

influxdb 官网 influxdb github


安装

1
2
3
wget https://dl.influxdata.com/influxdb/releases/influxdb_1.5.1_amd64.deb

sudo dpkg -i influxdb_1.5.1_amd64.deb  

默认配置文件:

1
/etc/influxdb/influxdb.conf  

安装成功之后,启动服务:

1
systemctl start influxdb  

注意: 如果改变了配置,比如data, meta, wal等文件路径时,要确保infludb对此有读写权限。可执行chown -R influxdb:influxdb xxx, 否则可能导致启动不了


CLI

influxd程序对外提供HTTP API。可使用curl与其进行交互。同时还提供influx客户端程序。 influx实际对一系列HTTP API进行CLI形式的封装转换(默认向127.0.0.1:8086发送请求)。

以下交互在influx shell中进行。

  • 指定返回RFC3339(YYYY-MM-DDTHH:MM:SS.nnnnnnnnZ)响应时间戳响应格式

    influx -precision rfc3339

  • 创建库

    create database <db-name>

  • 查看数据库

    show databases

    注意: _internal库是influx内部使用的库,内部存储influx运行时的必要数据

  • 切换库

    use <db-name>

  • 删除库

    drop database <db-name>

  • 查看数据库中的表

    show measurements

  • 删除表

    drop measurement <measurement-name>

  • 插入数据

    influxdb使用Line Protocol语法向某个measurement中插入一条数据(Point).

    Line Protocol Format: 关于Line Protocol,InfluxDB有详细文档说明。InfluxDB Line Protocol reference

    <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]

    example:

    insert cpu,host=serverA,region=us_wese value=0.64

    insert temperature,machine=unit42,type=assembly external=25,internal=37

    其中: cpu, temperature为measurement名称; host,region和machine,type为tags; value和external,internal为value

  • 查询记录

    select "host", "region", "value" from "cpu"

    select * from cpu limit 1

    select * from cpu where value>0.9

    注意: 如果使用*而不带任何条件(where,limit等)限定,当表数据量大的时候会造成性能问题

    更多的查询语句可在Influx Query Language reference了解.


influx聚合函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
- count()

返回某个field字段非空的数量

`select count(<field>) from <measurement> [where <stuff>] [group by <stuff>]`


- distinct()

返回一个字段(field)的唯一值

`select distinct(<field>) from <measurement> [where <stuff>] [group by <stuff>]`


- mean()

返回一个字段(field)中的值的算术平均值。字段类型必须是长整形或float64.

`select mean(<field>) from <measurement> [where <stuff>][group by <stuff>]`


- median()

从单个field中的排序值返回中间值(中位数)。字段值的类型必须是长整形或float64格式。

`select median(<field>) from <measurement> [where <stuff>][group by <stuff>]`


- spread()

返回字段的最小值和最大值之间的差值。数据的类型必须是长整形或float64.

`select spread(<field>) from <measurement> [where <stuff>][group by <stuff>]`


- sum()

返回一个字段中的所有值的和。字段的类型必须是整形或float64.

`select sum(<field>) from <measuremt> [where <stuff>][group by <stuff>]`


- top()

返回一个字段中最大的N个值,字段类型必须是长整形或float64类型。

`select top(<field>[,<tag>],<N>)[,<tag>|<field>][INTO_clause] from ...`


- bottom()

返回字段中最小的N个值。

`select bottom(<field>[,<tag>],<N>)[,<tag>|<field>][INTO_clause] from ...`


- first()

返回一个字段中最旧的值。

`select first(<field>)[,tag] from <measurement> [where] [group by]`


- last()

返回一个字段中最新的取值。

`select last(<field>)[,tag] from <measurement> [where] [group by]`


- max()

返回一个字段中的最大值。

`select max(<field>)[,<tag>] from <measurement> [where] [group by]`


- min()

返回一个字段中的最小值。

`select max(<field>)[,<tag>] from <measurement> [where] [group by]`


- precentile()

返回排序值排位为N的百分值。 百分值是介于100到0之间的整数或浮点数,包括100.

`select precentile(<field>, <N>)[,<tag>] from <measurement> [where] [group by]`


- derivative()

返回一个字段在series中的变化率。

influx会计算按照时间进行排序的字段值之间的差异,并将这些结果转化为单位变化率。
其中,单位可以指定,默认为1s。

`select derivative(<field>,[unit]) from <measuremant> [where]`

此函数可搭配聚合函数完成更复杂的计算。更详细内容可在influxDB文档中了解。


- difference()

返回一个字段中连续的时间值之间的差异。字段类型必须是长整形或float64.

`select differnece(<field>) from <measurement> [where]`

`select difference(<function>(field)) from <measurement> [where] [group by time(<time_interval>)]`


- elapsed()

返回一个字段在连续的时间间隔的差异,间隔单位可选,默认为1纳秒。

`select elasped(<field>, <unit>) from <measurement> [where]`


- moving_average()

返回一个连续字段值的移动平均值,字段类型必须是长整形或float64类型。

`select moving_average(<field>,<window>) from <measurement> [where]`

`select moving_average(<function>(<field>), <window>) from <measurement> [where] [group by time(<time_interval>)]`


- non_negative_derivative()

返回一个series中的一个字段中值的非负速率。


- stddev()

返回一个字段中的值的标准偏差。值的类型必须是长整形或float64类型。

select stddev(<field>) from <measurement> [where] [group by]

influx数据管理

influx吞吐量高达每秒几十万。长期运行势必会引起存储问题。

influx提供了两个特性来解决海量数据存储问题: Continuous Queries(CQ), Retention Policies(RP)。

Continuous Queries:

CQ主要用在将数据归档,主要是损失精度为代价来换取较低的系统空间占用率。

CQ是一组作用于某个数据库上的自动定时启动的语句,语句中必须包含SELECT和GROUP BY time()关键词。

Retention Policy:

RP用来描述InfluxDB保存数据的时长。InfluxDB在存储的数据时间戳与本地系统时间戳以及RP DURATION 三者之间对比决定是否该清除数据。一个数据库可同时拥有多个RP,但每个RP只属于某个数据库。


Python操作InfluxDB


安装influxdb-python

1
2
pip install influxdb
pip install --upgrade influxdb

examples

借助influx-python库,我们写一个简单的获取cpu利用率的脚本:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import psutil
from influxdb import InfluxDBClient


class InfluxWrap(object):

    def __init__(self, database, host='localhost', port=8086, **kwargs):
        self._cli = InfluxDBClient(host=host, port=port, database=database, **kwargs)
        super(InfluxWrap, self).__init__()

    def __enter__(self):
        return self

    def __exit__(self, exec_type, exec_val, exec_traceback):
        if exec_type is not None and self._cli:
            self._cli.close()

    @property
    def handler(self):
        return self._cli


if __name__ == "__main__":
    while True:
        data = [{
            "measurement": "cpu",
            "tags": {
                "host": "server01",
                "region": "us-west",
            },
            "fields":{
                "value": psutil.cpu_percent(None) # 获取cpu利用率
            }
        }]

        with InfluxWrap("mydb") as mydb:
            if not mydb.handler.write_points(data):
                print 'insert failed'

            query_smt = 'select * from cpu where time > now() - 5s'

            result_list = mydb.handler.query(query_smt)

            print "response: "
            for result in result_list:
                for o in result:
                    print o

        time.sleep(5)