Raspberry Pi & BME280
BME280は温度、湿度、気圧を測定できるセンサーで、
Raspberry Piとかに接続して測定するときによく使われるものです。
ここにあるようにこれまでは
Blynkというスマホからコンピューターへ命令を送ったり
コンピューターから情報を取得したりするアプリを使っていました。
Blynk
こんな感じで簡単にグラフ表示が出来る。
とりあえずこれで満足してたんですが、
BlynkではBlynkのサーバーに情報をアップロードして、
それをスマホから見ている感じ。
Raspberry Pi側で1分置きに計測して送っていますが、
Blynkアプリ上だと1年分まで見れます。
Blynkの情報はスマホからExportしたり、
API
を使って履歴を取ってこれたりします。
なので1年分は取ってこれるのかな、と思ったら
どうやら1週間分しか取得することは出来ないようです。
Only getting a week of history when retrieving CSV - Need Help With My Project - Blynk Community
Pin history data - HTTP RESTful API - Need Help With My Project - Blynk Community
2年以上前の話で、改善するよ、とか、1ヶ月までは伸ばすつもりだよ、とか
答えてますが現時点でも1週間なので多分変わらないでしょう。
(APIでもアプリからExportしても同じ。)
ということで、見えてるのに過去の情報は取れない、という状態。
やはりせっかくなので記録として残したいのでSpreadsheetsに書いておきたい、と思った次第です。
記録方法
方法としては、
- Blynkサーバーにある情報を1日1回位取得して新しい分だけSpreadsheetsにアップデートで加える
- Raspberry Piから直接Google SpreadsheetsのAPIを使って書き込む
が考えられます。
1の方法に関しては、Blynkが
API
を提供してくれてるので、
NatureRemoの情報を記録
したのと同じ様なことをGoogle Apps Scriptでやればいいだけ。
ただ、今後Blynkを使い続けるかわからないのと、
Blynkの無料枠で使おうと思うと、上の様なチャートは結構無料枠内を埋めてしまい、
他のボタンとかが作りづらい、というので将来的に別のボタンに変更するかも、
という点から2の直接書き込む方法にしました。
計測などにちょっと無駄な重複が出ますが、
ちょっと見た感じではRaspberry Pi Zeroでも全然負荷になってないようなので
別々にしておきます。
BlynkがC++なプログラムでGoogleのAPIのパッケージとかがC++用とかなくてちょっと面倒だった、というのもあります。
1を使うと1週間分は遡れますが、まあそれくらい別にいいか、と。
(手で入れても良かったのですが面倒だったのでしなかった。)
Google Spreadsheets API
Pythonを使ったスクリプトを作成します。
Python Quickstart Sheets API Google Developers
Google公式のSpreadsheetsのAPIですが、
Python用の
google-api-python-client
というパッケージを
直接使おうと思うとPythonとかでも結構Spreadsheetsだけのためでも
書く量が多くなってしまいます。
ちょっと探したところ、
gspread
というのがシンプルに書けて使い勝手がよさそうだったので使ってみました。
gspreadではサービスアカウントを使った認証も可能で、
自動で動かすのはそちらの方が便利なのでサービスアカウントを使います。
For Bots: Using Service Account
Sheetに書き込むだけなら簡単に
1
2
3
4
| import gspread
gc = gspread.service_account()
wks = gc.open("Where is the money Lebowski?").sheet1
wks.update('A1', [[1, 2], [3, 4]])
|
だけで書けてしまいます。
書き込みスクリプト
こんな感じのスクリプトを作ります。
Spreadsheets書き込み部分は
gspreadのドキュメント
参照。
Spreadsheetsに関しては先に作成しておいて、ID
1
を取得しておいてSHEET_KEY
に設定します。
また、Living
という名前のシートを作っておきます。
このあたりはgspread
からAPI経由で作成することも出来ますが、
今回はその辺は省いて直接作っておくようにしています。
また、以下を参考にして書き込みのためのService Accountを作成し、credentials.jsonを取得しておきます。また、作成されたclient_email
を書き込むSpreadsheetsの共有から編集者として追加しておきます。
Authentication — gspread 3.6.0 documentation
BME280に関しては
製造元のSwitch ScienceがPythonの例(BME280/bme280_sample.py)
を公開してくれているのでそれを参考にして作っています。
(ほぼコピペ。)
bme280.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
| #!/usr/bin/env python3
import os
import sys
from datetime import datetime, timedelta, timezone
from smbus2 import SMBus
SHEET_KEY = "<SHEET_KEY>"
CREDENTIALS = "~/.config/gspread/credentials.json"
WORKSHEET_NAME = "Living"
class SpreadSheets():
def __init__(self, sheet_key, worksheet_name, credentials):
self.sheet_key = sheet_key
self.worksheet_name = worksheet_name
self.credentials = credentials
def get_credentials(self):
import gspread
return gspread.service_account(self.credentials)
def get_sheet(self):
gc = self.get_credentials()
return gc.open_by_key(self.sheet_key)
def get_worksheet(self):
sh = self.get_sheet()
return sh.worksheet(self.worksheet_name)
def write(self):
bme280 = BME280()
data = bme280.read_data()
worksheet = self.get_worksheet()
worksheet.append_row(data)
class BME280():
def __init__(self, bus_number=1, i2c_address=0x76, my_timedelta=9):
self.bus_number = bus_number
self.i2c_address = i2c_address
self.my_timedelta = my_timedelta
self.bus = SMBus(self.bus_number)
self.dig_t = []
self.dig_p = []
self.dig_h = []
self.t_fine = 20.00 * 5120.0
def write_reg(self, reg_address, data):
self.bus.write_byte_data(self.i2c_address, reg_address, data)
def get_calib_param(self):
calib = []
for i in range(0x88, 0x88+24):
calib.append(self.bus.read_byte_data(self.i2c_address, i))
calib.append(self.bus.read_byte_data(self.i2c_address, 0xA1))
for i in range(0xE1, 0xE1+7):
calib.append(self.bus.read_byte_data(self.i2c_address, i))
self.dig_t.append((calib[1] << 8) | calib[0])
self.dig_t.append((calib[3] << 8) | calib[2])
self.dig_t.append((calib[5] << 8) | calib[4])
self.dig_p.append((calib[7] << 8) | calib[6])
self.dig_p.append((calib[9] << 8) | calib[8])
self.dig_p.append((calib[11] << 8) | calib[10])
self.dig_p.append((calib[13] << 8) | calib[12])
self.dig_p.append((calib[15] << 8) | calib[14])
self.dig_p.append((calib[17] << 8) | calib[16])
self.dig_p.append((calib[19] << 8) | calib[18])
self.dig_p.append((calib[21] << 8) | calib[20])
self.dig_p.append((calib[23] << 8) | calib[22])
self.dig_h.append(calib[24])
self.dig_h.append((calib[26] << 8) | calib[25])
self.dig_h.append(calib[27])
self.dig_h.append((calib[28] << 4) | (0x0F & calib[29]))
self.dig_h.append((calib[30] << 4) | ((calib[29] >> 4) & 0x0F))
self.dig_h.append(calib[31])
for i in range(1, 2):
if self.dig_t[i] & 0x8000:
self.dig_t[i] = (-self.dig_t[i] ^ 0xFFFF) + 1
for i in range(1, 8):
if self.dig_p[i] & 0x8000:
self.dig_p[i] = (-self.dig_p[i] ^ 0xFFFF) + 1
for i in range(0, 6):
if self.dig_h[i] & 0x8000:
self.dig_h[i] = (-self.dig_h[i] ^ 0xFFFF) + 1
def print_data(self):
now, temp, pres, hum = self.read_data()
print(f"datetime : {now}")
print(f"temperature : {temp:.2f} degree")
print(f"pressure : {pres:.2f} hPa")
print(f"humidity : {hum:.2f} %")
def read_data(self):
self.setup()
self.get_calib_param()
now = datetime.now(timezone(timedelta(hours=self.my_timedelta))
).strftime('%Y-%m-%d %H:%M:%S')
data = []
for i in range(0xF7, 0xF7+8):
data.append(self.bus.read_byte_data(self.i2c_address, i))
pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
hum_raw = (data[6] << 8) | data[7]
temp = self.compensate_t(temp_raw)
pres = self.compensate_p(pres_raw)
hum = self.compensate_h(hum_raw)
return now, temp, pres, hum
def compensate_t(self, adc_t):
v1 = (adc_t / 16384.0 - self.dig_t[0] / 1024.0) * self.dig_t[1]
v2 = (adc_t / 131072.0 - self.dig_t[0] / 8192.0)\
* (adc_t / 131072.0 - self.dig_t[0] / 8192.0) * self.dig_t[2]
self.t_fine = v1 + v2
return self.t_fine / 5120.0
def compensate_p(self, adc_p):
pressure = 0.0
v1 = (self.t_fine / 2.0) - 64000.0
v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * self.dig_p[5]
v2 = v2 + ((v1 * self.dig_p[4]) * 2.0)
v2 = (v2 / 4.0) + (self.dig_p[3] * 65536.0)
v1 = (((self.dig_p[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)
+ ((self.dig_p[1] * v1) / 2.0)) / 262144
v1 = ((32768 + v1) * self.dig_p[0]) / 32768
if v1 == 0:
return -1
pressure = ((1048576 - adc_p) - (v2 / 4096)) * 3125
if pressure < 0x80000000:
pressure = (pressure * 2.0) / v1
else:
pressure = (pressure / v1) * 2
v1 = (self.dig_p[8] * (((pressure / 8.0) * (pressure / 8.0))
/ 8192.0)) / 4096
v2 = ((pressure / 4.0) * self.dig_p[7]) / 8192.0
pressure = pressure + ((v1 + v2 + self.dig_p[6]) / 16.0)
return pressure / 100
def compensate_h(self, adc_h):
var_h = self.t_fine - 76800.0
if var_h != 0:
var_h = (adc_h - (
self.dig_h[3] * 64.0 + self.dig_h[4]/16384.0 * var_h
)) * (
self.dig_h[1] / 65536.0 * (
1.0 + self.dig_h[5] / 67108864.0 * var_h
* (1.0 + self.dig_h[2] / 67108864.0 * var_h)))
else:
return -1
var_h = var_h * (1.0 - self.dig_h[0] * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
return var_h
def setup(self):
# reset default t_fine
self.t_fine = 20.00 * 5120.0
osrs_t = 1 # Temperature oversampling x 1
osrs_p = 1 # Pressure oversampling x 1
osrs_h = 1 # Humidity oversampling x 1
mode = 3 # Normal mode
t_sb = 5 # Tstandby 1000ms
is_filter = 0 # Filter off
spi3w_en = 0 # 3-wire SPI Disable
ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
config_reg = (t_sb << 5) | (is_filter << 2) | spi3w_en
ctrl_hum_reg = osrs_h
self.write_reg(0xF2, ctrl_hum_reg)
self.write_reg(0xF4, ctrl_meas_reg)
self.write_reg(0xF5, config_reg)
def main(argv=None):
if argv is None:
argv = []
if len(argv) < 2:
bme280 = BME280()
bme280.print_data()
else:
sh = SpreadSheets(sheet_key=SHEET_KEY,
worksheet_name=WORKSHEET_NAME,
credentials=CREDENTIALS)
sh.write()
if __name__ == '__main__':
try:
main(sys.argv)
except KeyboardInterrupt:
pass
|
Raspberry Piで定期実行
systemdを使った定期実行を設定します。
上にあるbme280.pyをRaspberry Piの/home/pi/scripts/bme280.pyに置いて、
以下の様な2つのsystemd用ファイルを用意し、
/etc/systemd/system/に設置します。
/etc/systemd/system/bme280.service
1
2
3
4
5
6
7
8
9
| [Unit]
Description = BME280 logging
[Service]
Type = oneshot
ExecStart = /home/pi/scripts/bme280.py
[Install]
WantedBy=multi-user.target
|
/etc/systemd/system/bme280.timer
1
2
3
4
5
6
7
8
9
| [Unit]
Description = BME280 logging timer
[Timer]
OnCalendar = *-*-* *:*:00
AccuracySec = 1s
[Install]
WantedBy = timers.target
|
1分毎に計測して記録する設定。
設置後、
$ cmd sudo systemctl enable bme280.timer
$ cmd sudo systemctl restart bme280.timer
として有効にしてタイマーを開始させれば計測し、Spreadsheetsに記録していってくれます。