rcmdnk's blog

HiLetgo BME280 温度 センサー 湿度 センサー 気圧 センサー Arduino センサー 大気圧センサ 温湿度センサー ブレイクアウト Arduinoに対応 [並行輸入品]

Google Spreadsheetsに色々とりあえずでもいいから保存していこう、ということで Raspberry PiでBME280を使って測定している温度、湿度、気圧の値を 保存するようにしました。

Raspberry Pi & BME280

BME280は温度、湿度、気圧を測定できるセンサーで、 Raspberry Piとかに接続して測定するときによく使われるものです。

ここにあるようにこれまでは Blynkというスマホからコンピューターへ命令を送ったり コンピューターから情報を取得したりするアプリを使っていました。

Blynk

2020-10-13-computer-raspberrypi-iot-google.md

こんな感じで簡単にグラフ表示が出来る。

とりあえずこれで満足してたんですが、 BlynkではBlynkのサーバーに情報をアップロードして、 それをスマホから見ている感じ。

Raspberry Pi側で1分置きに計測して送っていますが、 Blynkアプリ上だと1年分まで見れます。

Blynkの情報はスマホからExportしたり、 API を使って履歴を取ってこれたりします。

なので1年分は取ってこれるのかな、と思ったら どうやら1週間分しか取得することは出来ないようです。

Only getting a week of history when retrieving CSV - Need Help With My Project - Blynk Community

Pin history data - HTTP RESTful API - Need Help With My Project - Blynk Community

2年以上前の話で、改善するよ、とか、1ヶ月までは伸ばすつもりだよ、とか 答えてますが現時点でも1週間なので多分変わらないでしょう。 (APIでもアプリからExportしても同じ。)

ということで、見えてるのに過去の情報は取れない、という状態。

やはりせっかくなので記録として残したいのでSpreadsheetsに書いておきたい、と思った次第です。

記録方法

方法としては、

  1. Blynkサーバーにある情報を1日1回位取得して新しい分だけSpreadsheetsにアップデートで加える
  2. Raspberry Piから直接Google SpreadsheetsのAPIを使って書き込む

が考えられます。

1の方法に関しては、Blynkが API を提供してくれてるので、 NatureRemoの情報を記録 したのと同じ様なことをGoogle Apps Scriptでやればいいだけ。

ただ、今後Blynkを使い続けるかわからないのと、 Blynkの無料枠で使おうと思うと、上の様なチャートは結構無料枠内を埋めてしまい、 他のボタンとかが作りづらい、というので将来的に別のボタンに変更するかも、 という点から2の直接書き込む方法にしました。

計測などにちょっと無駄な重複が出ますが、 ちょっと見た感じではRaspberry Pi Zeroでも全然負荷になってないようなので 別々にしておきます。

BlynkがC++なプログラムでGoogleのAPIのパッケージとかがC++用とかなくてちょっと面倒だった、というのもあります。

1を使うと1週間分は遡れますが、まあそれくらい別にいいか、と。 (手で入れても良かったのですが面倒だったのでしなかった。)

Google Spreadsheets API

Pythonを使ったスクリプトを作成します。

Python Quickstart Sheets API Google Developers

Google公式のSpreadsheetsのAPIですが、 Python用の google-api-python-client というパッケージを 直接使おうと思うとPythonとかでも結構Spreadsheetsだけのためでも 書く量が多くなってしまいます。

ちょっと探したところ、 gspread というのがシンプルに書けて使い勝手がよさそうだったので使ってみました。

gspreadではサービスアカウントを使った認証も可能で、 自動で動かすのはそちらの方が便利なのでサービスアカウントを使います。

For Bots: Using Service Account

Sheetに書き込むだけなら簡単に

1
2
3
4
import gspread
gc = gspread.service_account()
wks = gc.open("Where is the money Lebowski?").sheet1
wks.update('A1', [[1, 2], [3, 4]])

だけで書けてしまいます。

書き込みスクリプト

こんな感じのスクリプトを作ります。

Spreadsheets書き込み部分は gspreadのドキュメント 参照。

Spreadsheetsに関しては先に作成しておいて、ID 1 を取得しておいてSHEET_KEYに設定します。

また、Livingという名前のシートを作っておきます。

このあたりはgspreadからAPI経由で作成することも出来ますが、 今回はその辺は省いて直接作っておくようにしています。

BME280に関しては 製造元のSwitch ScienceがPythonの例(BME280/bme280_sample.py) を公開してくれているのでそれを参考にして作っています。 (ほぼコピペ。)

bme280.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#!/usr/bin/env python3

import os
import sys
from datetime import datetime, timedelta, timezone
from smbus2 import SMBus

SHEET_KEY = "<SHEET_KEY>"
CREDENTIALS = "~/.config/gspread/credentials.json"
WORKSHEET_NAME = "Living"


class SpreadSheets():
    def __init__(self, sheet_key, worksheet_name, credentials):
        self.sheet_key = sheet_key
        self.worksheet_name = worksheet_name
        self.credentials = credentials

    def get_credentials(self):
        import gspread
        return gspread.service_account(self.credentials)

    def get_sheet(self):
        gc = self.get_credentials()
        return gc.open_by_key(self.sheet_key)

    def get_worksheet(self):
        sh = self.get_sheet()
        return sh.worksheet(self.worksheet_name)

    def write(self):
        bme280 = BME280()
        data = bme280.read_data()

        worksheet = self.get_worksheet()
        worksheet.append_row(data)


class BME280():
    def __init__(self, bus_number=1, i2c_address=0x76, my_timedelta=9):
        self.bus_number = bus_number
        self.i2c_address = i2c_address
        self.my_timedelta = my_timedelta

        self.bus = SMBus(self.bus_number)

        self.dig_t = []
        self.dig_p = []
        self.dig_h = []

        self.t_fine = 20.00 * 5120.0

    def write_reg(self, reg_address, data):
        self.bus.write_byte_data(self.i2c_address, reg_address, data)

    def get_calib_param(self):
        calib = []

        for i in range(0x88, 0x88+24):
            calib.append(self.bus.read_byte_data(self.i2c_address, i))
        calib.append(self.bus.read_byte_data(self.i2c_address, 0xA1))
        for i in range(0xE1, 0xE1+7):
            calib.append(self.bus.read_byte_data(self.i2c_address, i))

        self.dig_t.append((calib[1] << 8) | calib[0])
        self.dig_t.append((calib[3] << 8) | calib[2])
        self.dig_t.append((calib[5] << 8) | calib[4])
        self.dig_p.append((calib[7] << 8) | calib[6])
        self.dig_p.append((calib[9] << 8) | calib[8])
        self.dig_p.append((calib[11] << 8) | calib[10])
        self.dig_p.append((calib[13] << 8) | calib[12])
        self.dig_p.append((calib[15] << 8) | calib[14])
        self.dig_p.append((calib[17] << 8) | calib[16])
        self.dig_p.append((calib[19] << 8) | calib[18])
        self.dig_p.append((calib[21] << 8) | calib[20])
        self.dig_p.append((calib[23] << 8) | calib[22])
        self.dig_h.append(calib[24])
        self.dig_h.append((calib[26] << 8) | calib[25])
        self.dig_h.append(calib[27])
        self.dig_h.append((calib[28] << 4) | (0x0F & calib[29]))
        self.dig_h.append((calib[30] << 4) | ((calib[29] >> 4) & 0x0F))
        self.dig_h.append(calib[31])

        for i in range(1, 2):
            if self.dig_t[i] & 0x8000:
                self.dig_t[i] = (-self.dig_t[i] ^ 0xFFFF) + 1

        for i in range(1, 8):
            if self.dig_p[i] & 0x8000:
                self.dig_p[i] = (-self.dig_p[i] ^ 0xFFFF) + 1

        for i in range(0, 6):
            if self.dig_h[i] & 0x8000:
                self.dig_h[i] = (-self.dig_h[i] ^ 0xFFFF) + 1

    def print_data(self):
        now, temp, pres, hum = self.read_data()
        print(f"datetime : {now}")
        print(f"temperature : {temp:.2f} degree")
        print(f"pressure    : {pres:.2f} hPa")
        print(f"humidity    : {hum:.2f} %")

    def read_data(self):
        self.setup()
        self.get_calib_param()

        now = datetime.now(timezone(timedelta(hours=self.my_timedelta))
                           ).strftime('%Y-%m-%d %H:%M:%S')
        data = []
        for i in range(0xF7, 0xF7+8):
            data.append(self.bus.read_byte_data(self.i2c_address, i))
        pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        hum_raw = (data[6] << 8) | data[7]

        temp = self.compensate_t(temp_raw)
        pres = self.compensate_p(pres_raw)
        hum = self.compensate_h(hum_raw)

        return now, temp, pres, hum

    def compensate_t(self, adc_t):
        v1 = (adc_t / 16384.0 - self.dig_t[0] / 1024.0) * self.dig_t[1]
        v2 = (adc_t / 131072.0 - self.dig_t[0] / 8192.0)\
            * (adc_t / 131072.0 - self.dig_t[0] / 8192.0) * self.dig_t[2]
        self.t_fine = v1 + v2
        return self.t_fine / 5120.0

    def compensate_p(self, adc_p):
        pressure = 0.0

        v1 = (self.t_fine / 2.0) - 64000.0
        v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * self.dig_p[5]
        v2 = v2 + ((v1 * self.dig_p[4]) * 2.0)
        v2 = (v2 / 4.0) + (self.dig_p[3] * 65536.0)
        v1 = (((self.dig_p[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)
              + ((self.dig_p[1] * v1) / 2.0)) / 262144
        v1 = ((32768 + v1) * self.dig_p[0]) / 32768

        if v1 == 0:
            return -1
        pressure = ((1048576 - adc_p) - (v2 / 4096)) * 3125
        if pressure < 0x80000000:
            pressure = (pressure * 2.0) / v1
        else:
            pressure = (pressure / v1) * 2
        v1 = (self.dig_p[8] * (((pressure / 8.0) * (pressure / 8.0))
                               / 8192.0)) / 4096
        v2 = ((pressure / 4.0) * self.dig_p[7]) / 8192.0
        pressure = pressure + ((v1 + v2 + self.dig_p[6]) / 16.0)

        return pressure / 100

    def compensate_h(self, adc_h):
        var_h = self.t_fine - 76800.0
        if var_h != 0:
            var_h = (adc_h - (
                self.dig_h[3] * 64.0 + self.dig_h[4]/16384.0 * var_h
            )) * (
                self.dig_h[1] / 65536.0 * (
                    1.0 + self.dig_h[5] / 67108864.0 * var_h
                    * (1.0 + self.dig_h[2] / 67108864.0 * var_h)))
        else:
            return -1
        var_h = var_h * (1.0 - self.dig_h[0] * var_h / 524288.0)
        if var_h > 100.0:
            var_h = 100.0
        elif var_h < 0.0:
            var_h = 0.0
        return var_h

    def setup(self):
        # reset default t_fine
        self.t_fine = 20.00 * 5120.0

        osrs_t = 1                      # Temperature oversampling x 1
        osrs_p = 1                      # Pressure oversampling x 1
        osrs_h = 1                      # Humidity oversampling x 1
        mode = 3                        # Normal mode
        t_sb = 5                        # Tstandby 1000ms
        is_filter = 0                   # Filter off
        spi3w_en = 0                    # 3-wire SPI Disable

        ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
        config_reg = (t_sb << 5) | (is_filter << 2) | spi3w_en
        ctrl_hum_reg = osrs_h

        self.write_reg(0xF2, ctrl_hum_reg)
        self.write_reg(0xF4, ctrl_meas_reg)
        self.write_reg(0xF5, config_reg)


def main(argv=None):
    if argv is None:
        argv = []
    if len(argv) < 2:
        bme280 = BME280()
        bme280.print_data()
    else:
        sh = SpreadSheets(sheet_key=SHEET_KEY,
                          worksheet_name=WORKSHEET_NAME,
                          credentials=CREDENTIALS)
        sh.write()


if __name__ == '__main__':
    try:
        main(sys.argv)
    except KeyboardInterrupt:
        pass

Raspberry Piで定期実行

systemdを使った定期実行を設定します。

上にあるbme280.pyをRaspberry Piの/home/pi/scripts/bme280.pyに置いて、 以下の様な2つのsystemd用ファイルを用意し、 /etc/systemd/system/に設置します。

/etc/systemd/system/bme280.service
1
2
3
4
5
6
7
8
9
[Unit]
Description = BME280 logging

[Service]
Type = oneshot
ExecStart = /home/pi/scripts/bme280.py

[Install]
WantedBy=multi-user.target
/etc/systemd/system/bme280.timer
1
2
3
4
5
6
7
8
9
[Unit]
Description = BME280 logging timer

[Timer]
OnCalendar = *-*-* *:*:00
AccuracySec = 1s

[Install]
WantedBy = timers.target

1分毎に計測して記録する設定。

設置後、

$ cmd sudo systemctl enable bme280.timer
$ cmd sudo systemctl restart bme280.timer

として有効にしてタイマーを開始させれば計測し、Spreadsheetsに記録していってくれます。

Sponsored Links
  1. Spreadsheetsを開いときのURL

    https://docs.google.com/spreadsheets/d/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX/edit
    

    となってるXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXとなってる部分がSpreadsheetsのID。

Sponsored Links

« systemdでcronジョブの代わりの設定をする