皆様こんにちは。ユニファの赤沼です。ユニファでは着々とメンバーが増え、できることも増えてきて嬉しい限りなのですが、反面オフィスが手狭になってきて、換気が不十分で空気の悪さが気になるようにもなってきました。二酸化炭素濃度が上がると生産性にも悪影響があるという事でチームラボさんも測定と改善に取り組まれているようです。
そこで試しに空気品質センサを使って空気品質モニターを自作してみました。今回使ったセンサはスイッチサイエンスさんで販売されている、 CSS811 というセンサモジュールを使った SparkFun 製のブレークアウトボードです。
Hookup
ブレークアウトボードのチュートリアルは SparkFun の下記サイトで公開されていますので、これを参考に回路を組んでいきます。
CCS811 Air Quality Breakout Hookup Guide - learn.sparkfun.com
正確な測定を行うには温度センサで測定した温度を使用する必要があるのですが、今回はひとまず単体で使用してみます。
チュートリアルでは Arduino の互換ボードを使用していますが、 Raspberry Pi の方が色々と融通が聞かせやすいので、 Raspberry Pi Zero W を使用してみます。また、CCS811 ではインタラプトモードを使用する事で、データが発生した時だけセンサモジュールを稼働させて省電力で使用することができますが、今回は常時電源に接続して使用する想定なので、インタラプトモードは使わずにシンプルな構成にしています。回路図は下記の通りです。
CCS811 は I2C による接続なので、電源と GND 以外は SDA(Serial Data), SCL(Serial Clock) の2本を使用しています。 CCS811 の動作電圧は 3.3V なので、電源は Raspberry Pi の 3.3V ピンから供給します。実際に組んでみた回路は下記のようになりました。
I2C の使用設定
Raspberry Pi ではデフォルトでは I2C が使える設定になっていませんので、 raspi-config から設定を行います。
$ sudo raspi-config
メニュー画面が表示されたら Interfacing Options
を選択します。
I2C
の項目を有効にすることで I2C が使用できるようになります。
次に設定ファイル /boot/config.txt
に下記の1行を追記します。
dtparam=i2c_baudrate=10000
ここまでの設定ができたら一度 Raspberry Pi を再起動します。再起動後に i2cdetect
コマンドで下記のようにセンサが認識されていれば使用可能な状態になっています。このブレークアウトボードの I2C アドレスは 0X5B です。
$ sudo i2cdetect -y 1 0 1 2 3 4 5 6 7 8 9 a b c d e f 00: -- -- -- -- -- -- -- -- -- -- -- -- -- 10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 20: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 30: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 50: -- -- -- -- -- -- -- -- -- -- -- 5b -- -- -- -- 60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 70: -- -- -- -- -- -- -- --
プログラムの実装には Python 3 を使います。
$ python -V
Python 3.6.5
Python 3 系で I2C を使用するには smbus2 をインストールする必要がありますので、 pip でインストールします。
$ pip install smbus2
プログラム実装
それではプログラムを実装していきたいと思います。 SparkFun では Arduino 用のライブラリは提供していますが、 Python 用のライブラリやサンプルは提供されていないようです。ですが CCS811 を使用したブレークアウトボードは Adafruit からも販売されていて、そちらでは Python 向けのライブラリやサンプルが Github の下記リポジトリで提供されていましたので、そちらから必要な部分を抜き出す形で実装してみました。
まずは CCS811 を直接扱うクラスの実装です。基本的な操作方法としては、該当の I2C デバイスのアドレスを指定し、操作に応じたレジスタに対してバイトデータやブロックデータの読み書きをしています。初期化時に self.writeList(CCS811_BOOTLOADER_APP_START, [])
でセンサモジュールの動作を開始させ、 self.disableInterrupt()
でインタラプトモードをOFFにしています。また、 self.setDriveMode(mode)
でデータの取得間隔を指定しています。
#!/usr/bin/env python import os import smbus2 as smbus from collections import OrderedDict from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter from time import sleep CCS811_ADDRESS = 0x5B CCS811_STATUS = 0x00 CCS811_MEAS_MODE = 0x01 CCS811_ALG_RESULT_DATA = 0x02 CCS811_HW_ID = 0x20 CCS811_DRIVE_MODE_IDLE = 0x00 CCS811_DRIVE_MODE_1SEC = 0x01 CCS811_DRIVE_MODE_10SEC = 0x02 CCS811_DRIVE_MODE_60SEC = 0x03 CCS811_DRIVE_MODE_250MS = 0x04 CCS811_BOOTLOADER_APP_START = 0xF4 CCS811_HW_ID_CODE = 0x81 class CCS811: LOG_FILE = '{script_dir}/logs/ccs811.log'.format( script_dir = os.path.dirname(os.path.abspath(__file__)) ) def __init__(self, mode = CCS811_DRIVE_MODE_1SEC, address = CCS811_ADDRESS): self.init_logger() if mode not in [CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC, CCS811_DRIVE_MODE_250MS]: raise ValueError('Unexpected mode value {0}. Set mode to one of CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC or CCS811_DRIVE_MODE_250MS'.format(mode)) self._address = address self._bus = smbus.SMBus(1) self._status = Bitfield([('ERROR' , 1), ('unused', 2), ('DATA_READY' , 1), ('APP_VALID', 1), ('unused2' , 2), ('FW_MODE' , 1)]) self._meas_mode = Bitfield([('unused', 2), ('INT_THRESH', 1), ('INT_DATARDY', 1), ('DRIVE_MODE', 3)]) self._error_id = Bitfield([('WRITE_REG_INVALID', 1), ('READ_REG_INVALID', 1), ('MEASMODE_INVALID', 1), ('MAX_RESISTANCE', 1), ('HEATER_FAULT', 1), ('HEATER_SUPPLY', 1)]) self._TVOC = 0 self._eCO2 = 0 if self.readU8(CCS811_HW_ID) != CCS811_HW_ID_CODE: raise Exception("Device ID returned is not correct! Please check your wiring.") self.writeList(CCS811_BOOTLOADER_APP_START, []) sleep(0.1) if self.checkError(): raise Exception("Device returned an Error! Try removing and reapplying power to the device and running the code again.") if not self._status.FW_MODE: raise Exception("Device did not enter application mode! If you got here, there may be a problem with the firmware on your sensor.") self.disableInterrupt() self.setDriveMode(mode) def init_logger(self): self._logger = getLogger(__class__.__name__) file_handler = FileHandler(self.LOG_FILE) formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) self._logger.addHandler(file_handler) self._logger.setLevel(DEBUG) def disableInterrupt(self): self._meas_mode.INT_DATARDY = 1 self.write8(CCS811_MEAS_MODE, self._meas_mode.get()) def setDriveMode(self, mode): self._meas_mode.DRIVE_MODE = mode self.write8(CCS811_MEAS_MODE, self._meas_mode.get()) def available(self): self._status.set(self.readU8(CCS811_STATUS)) if not self._status.DATA_READY: return False else: return True def readData(self): if not self.available(): return False else: buf = self.readList(CCS811_ALG_RESULT_DATA, 8) self._eCO2 = (buf[0] << 8) | (buf[1]) self._TVOC = (buf[2] << 8) | (buf[3]) if self._status.ERROR: return buf[5] else: return 0 def getTVOC(self): return self._TVOC def geteCO2(self): return self._eCO2 def checkError(self): self._status.set(self.readU8(CCS811_STATUS)) return self._status.ERROR def readU8(self, register): result = self._bus.read_byte_data(self._address, register) & 0xFF self._logger.debug("Read 0x%02X from register 0x%02X", result, register) return result def write8(self, register, value): value = value & 0xFF self._bus.write_byte_data(self._address, register, value) self._logger.debug("Wrote 0x%02X to register 0x%02X", value, register) def readList(self, register, length): results = self._bus.read_i2c_block_data(self._address, register, length) self._logger.debug("Read the following from register 0x%02X: %s", register, results) return results def writeList(self, register, data): self._bus.write_i2c_block_data(self._address, register, data) self._logger.debug("Wrote to register 0x%02X: %s", register, data) class Bitfield: def __init__(self, _structure): self._structure = OrderedDict(_structure) for key, value in self._structure.items(): setattr(self, key, 0) def get(self): fullreg = 0 pos = 0 for key, value in self._structure.items(): fullreg = fullreg | ( (getattr(self, key) & (2**value - 1)) << pos ) pos = pos + value return fullreg def set(self, data): pos = 0 for key, value in self._structure.items(): setattr(self, key, (data >> pos) & (2**value - 1)) pos = pos + value
CO2濃度が閾値を超えた時に Slack に通知するためのクラスも下記のように実装しておきます。ユーザ名やテキストなどを指定して Slack の Webhook にポストします。
#!/usr/bin/env python import json import requests class SlackNotifier: def __init__(self, webhook_url): self.webhook_url = webhook_url def notify(self, fallback, text, username, icon_emoji, channel, color): headers = { 'Content-Type': 'application/json' } payload = { 'username': username, 'channel': channel, 'icon_emoji': icon_emoji, 'attachments': [ { 'fallback': fallback, 'text': text, 'color': color } ] } response = requests.post(self.webhook_url, json = payload, headers = headers) return { 'status_code': response.status_code, 'text': response.text }
そして上記のクラスをしようするメインのスクリプトを実装します。 CCS811 の初期化後にセンサが有効になったら、 self._ccs811.readData()
で現在のCO2濃度などを計算し、計算後のデータを self._ccs811.geteCO2()
で取得しています。取得した CO2 濃度から現在のステータスを判別し、ステータスが変わった場合には Slack に通知します。
#!/usr/bin/env python import os import sys from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter from time import sleep import slack_notifier from CCS811 import CCS811 class AirConditionMonitor: CO2_PPM_THRESHOLD_1 = 1000 CO2_PPM_THRESHOLD_2 = 2000 CO2_LOWER_LIMIT = 400 CO2_HIGHER_LIMIT = 8192 CO2_STATUS_CONDITIONING = 'CONDITIONING' CO2_STATUS_LOW = 'LOW' CO2_STATUS_HIGH = 'HIGH' CO2_STATUS_TOO_HIGH = 'TOO HIGH' CO2_STATUS_ERROR = 'ERROR' SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX' SLACK_USERNAME = 'AIR_CONDITION_MONITOR' SLACK_EMOJI_ICON = ':loudspeaker:' SLACK_CHANNEL = '#air_condition_monitor' LOG_FILE = '{script_dir}/logs/air_condition_monitor.log'.format( script_dir = os.path.dirname(os.path.abspath(__file__)) ) def __init__(self): self._ccs811 = CCS811() self.slack_notifier = slack_notifier.SlackNotifier(self.SLACK_WEBHOOK_URL) self.co2_status = self.CO2_STATUS_LOW self.init_logger() def init_logger(self): self._logger = getLogger(__class__.__name__) file_handler = FileHandler(self.LOG_FILE) formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) self._logger.addHandler(file_handler) self._logger.setLevel(DEBUG) def notify_to_slack(self, co2, co2_status): if co2_status == self.CO2_STATUS_ERROR: fallback = "Something Went Wrong..." text = fallback color = 'danger' else: fallback = "CO2 level is {0}: {1:,}ppm".format(co2_status, co2) text = fallback color = 'good' if co2_status == self.CO2_STATUS_HIGH: color = 'warning' elif co2_status == self.CO2_STATUS_TOO_HIGH: color = 'danger' res = self.slack_notifier.notify( fallback, text, self.SLACK_USERNAME, self.SLACK_EMOJI_ICON, self.SLACK_CHANNEL, color ) self._logger.debug( 'Notified to Slack. STATUS_CODE - {status_code}, MSG - {msg}'.format( status_code = res['status_code'], msg = res['text'] ) ) def status(self, co2): if co2 < self.CO2_LOWER_LIMIT or co2 > self.CO2_HIGHER_LIMIT: return self.CO2_STATUS_CONDITIONING elif co2 < self.CO2_PPM_THRESHOLD_1: return self.CO2_STATUS_LOW elif co2 < self.CO2_PPM_THRESHOLD_2: return self.CO2_STATUS_HIGH else: return self.CO2_STATUS_TOO_HIGH def execute(self): while not self._ccs811.available(): pass while True: if not self._ccs811.available(): sleep(1) continue try: if not self._ccs811.readData(): co2 = self._ccs811.geteCO2() co2_status = self.status(co2) if co2_status == self.CO2_STATUS_CONDITIONING: self._logger.debug("Under Conditioning...") sleep(2) continue if co2_status != self.co2_status: self.notify_to_slack(co2, co2_status) self.co2_status = co2_status self._logger.info("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC())) else: self._logger.error('ERROR!') while True: pass except: self._logger.error(sys.exc_info()) self.notify_to_slack(-1, self.CO2_STATUS_ERROR) sleep(2) if __name__ == '__main__': air_condition_monitor = AirConditionMonitor() air_condition_monitor.execute()
動作確認
プログラムを実行すると下記のように二酸化炭素濃度と総揮発性有機化合物の測定値がログに出力されていきます。
2018-06-28 08:58:00,383 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1 2018-06-28 08:58:02,435 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1 2018-06-28 08:58:04,483 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1 2018-06-28 08:58:06,531 - AirConditionMonitor - INFO - CO2: 411ppm, TVOC: 1 2018-06-28 08:58:08,579 - AirConditionMonitor - INFO - CO2: 411ppm, TVOC: 1 2018-06-28 08:58:10,627 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1 2018-06-28 08:58:12,675 - AirConditionMonitor - INFO - CO2: 405ppm, TVOC: 0 2018-06-28 08:58:14,722 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1 2018-06-28 08:58:16,770 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
二酸化炭素濃度が閾値を越えると下記のように Slack に通知されます。下記の例では閾値はテスト用に低めに設定しています。
ちなみに CCS811 では初回使用時に48時間のエージングが推奨されています。また、起動時には20分間のコンディショニングが推奨されていて、起動直後には正しい値が測定されません。
まとめ
ひとまず測定と通知が行えることは確認できましたが、オフィス内のどこに置くかによって実際の有効性には違いが出てくるかと思います。また、今回は温度を考慮していないので、今後温度も使用して測定したり、二酸化炭素濃度の推移をクラウド上に保存して可視化したりできると、より役に立ちそうかなと思っています。測定した結果どうやって空気品質を改善するかというのはまた課題ではありますが、まずは現状をモニタできるようにした上で、その後の改善を考えられると良いかと思います。