ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

空気品質センサでオフィスの二酸化炭素濃度をモニターする

 皆様こんにちは。ユニファの赤沼です。ユニファでは着々とメンバーが増え、できることも増えてきて嬉しい限りなのですが、反面オフィスが手狭になってきて、換気が不十分で空気の悪さが気になるようにもなってきました。二酸化炭素濃度が上がると生産性にも悪影響があるという事でチームラボさんも測定と改善に取り組まれているようです。

ch.nicovideo.jp

 そこで試しに空気品質センサを使って空気品質モニターを自作してみました。今回使ったセンサはスイッチサイエンスさんで販売されている、 CSS811 というセンサモジュールを使った SparkFun 製のブレークアウトボードです。

www.switch-science.com

Hookup

 ブレークアウトボードのチュートリアルは SparkFun の下記サイトで公開されていますので、これを参考に回路を組んでいきます。

CCS811 Air Quality Breakout Hookup Guide - learn.sparkfun.com

 正確な測定を行うには温度センサで測定した温度を使用する必要があるのですが、今回はひとまず単体で使用してみます。

 チュートリアルでは Arduino の互換ボードを使用していますが、 Raspberry Pi の方が色々と融通が聞かせやすいので、 Raspberry Pi Zero W を使用してみます。また、CCS811 ではインタラプトモードを使用する事で、データが発生した時だけセンサモジュールを稼働させて省電力で使用することができますが、今回は常時電源に接続して使用する想定なので、インタラプトモードは使わずにシンプルな構成にしています。回路図は下記の通りです。

f:id:akanuma-hiroaki:20180628084323p:plain

 CCS811 は I2C による接続なので、電源と GND 以外は SDA(Serial Data), SCL(Serial Clock) の2本を使用しています。 CCS811 の動作電圧は 3.3V なので、電源は Raspberry Pi の 3.3V ピンから供給します。実際に組んでみた回路は下記のようになりました。

f:id:akanuma-hiroaki:20180628084529j:plain

I2C の使用設定

 Raspberry Pi ではデフォルトでは I2C が使える設定になっていませんので、 raspi-config から設定を行います。

$ sudo raspi-config

 メニュー画面が表示されたら Interfacing Options を選択します。

f:id:akanuma-hiroaki:20180628084936p:plain

 I2C の項目を有効にすることで I2C が使用できるようになります。

f:id:akanuma-hiroaki:20180628085001p:plain

 次に設定ファイル /boot/config.txt に下記の1行を追記します。

dtparam=i2c_baudrate=10000

 ここまでの設定ができたら一度 Raspberry Pi を再起動します。再起動後に i2cdetect コマンドで下記のようにセンサが認識されていれば使用可能な状態になっています。このブレークアウトボードの I2C アドレスは 0X5B です。

$ sudo i2cdetect -y 1
     0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f
00:          -- -- -- -- -- -- -- -- -- -- -- -- -- 
10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
20: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
30: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
50: -- -- -- -- -- -- -- -- -- -- -- 5b -- -- -- -- 
60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 
70: -- -- -- -- -- -- -- --                        

 プログラムの実装には Python 3 を使います。

$ python -V
Python 3.6.5

 Python 3 系で I2C を使用するには smbus2 をインストールする必要がありますので、 pip でインストールします。

$ pip install smbus2

プログラム実装

 それではプログラムを実装していきたいと思います。 SparkFun では Arduino 用のライブラリは提供していますが、 Python 用のライブラリやサンプルは提供されていないようです。ですが CCS811 を使用したブレークアウトボードは Adafruit からも販売されていて、そちらでは Python 向けのライブラリやサンプルが Github の下記リポジトリで提供されていましたので、そちらから必要な部分を抜き出す形で実装してみました。

github.com

github.com

 まずは CCS811 を直接扱うクラスの実装です。基本的な操作方法としては、該当の I2C デバイスのアドレスを指定し、操作に応じたレジスタに対してバイトデータやブロックデータの読み書きをしています。初期化時に self.writeList(CCS811_BOOTLOADER_APP_START, []) でセンサモジュールの動作を開始させ、 self.disableInterrupt() でインタラプトモードをOFFにしています。また、 self.setDriveMode(mode) でデータの取得間隔を指定しています。

#!/usr/bin/env python

import os
import smbus2 as smbus
from collections import OrderedDict
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from time import sleep

CCS811_ADDRESS  =  0x5B

CCS811_STATUS = 0x00
CCS811_MEAS_MODE = 0x01
CCS811_ALG_RESULT_DATA = 0x02
CCS811_HW_ID = 0x20

CCS811_DRIVE_MODE_IDLE = 0x00
CCS811_DRIVE_MODE_1SEC = 0x01
CCS811_DRIVE_MODE_10SEC = 0x02
CCS811_DRIVE_MODE_60SEC = 0x03
CCS811_DRIVE_MODE_250MS = 0x04

CCS811_BOOTLOADER_APP_START = 0xF4

CCS811_HW_ID_CODE = 0x81

class CCS811:
    LOG_FILE = '{script_dir}/logs/ccs811.log'.format(
        script_dir = os.path.dirname(os.path.abspath(__file__))
    )

    def __init__(self, mode = CCS811_DRIVE_MODE_1SEC, address = CCS811_ADDRESS):
        self.init_logger()

        if mode not in [CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC, CCS811_DRIVE_MODE_250MS]:
            raise ValueError('Unexpected mode value {0}.  Set mode to one of CCS811_DRIVE_MODE_IDLE, CCS811_DRIVE_MODE_1SEC, CCS811_DRIVE_MODE_10SEC, CCS811_DRIVE_MODE_60SEC or CCS811_DRIVE_MODE_250MS'.format(mode))

        self._address = address
        self._bus = smbus.SMBus(1)

        self._status = Bitfield([('ERROR' , 1), ('unused', 2), ('DATA_READY' , 1), ('APP_VALID', 1), ('unused2' , 2), ('FW_MODE' , 1)])
        
        self._meas_mode = Bitfield([('unused', 2), ('INT_THRESH', 1), ('INT_DATARDY', 1), ('DRIVE_MODE', 3)])

        self._error_id = Bitfield([('WRITE_REG_INVALID', 1), ('READ_REG_INVALID', 1), ('MEASMODE_INVALID', 1), ('MAX_RESISTANCE', 1), ('HEATER_FAULT', 1), ('HEATER_SUPPLY', 1)])

        self._TVOC = 0
        self._eCO2 = 0

        if self.readU8(CCS811_HW_ID) != CCS811_HW_ID_CODE:
            raise Exception("Device ID returned is not correct! Please check your wiring.")

        self.writeList(CCS811_BOOTLOADER_APP_START, [])
        sleep(0.1)

        if self.checkError():
            raise Exception("Device returned an Error! Try removing and reapplying power to the device and running the code again.")
        if not self._status.FW_MODE:
            raise Exception("Device did not enter application mode! If you got here, there may be a problem with the firmware on your sensor.")

        self.disableInterrupt()

        self.setDriveMode(mode)

    def init_logger(self):
        self._logger = getLogger(__class__.__name__)
        file_handler = FileHandler(self.LOG_FILE)
        formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
        self._logger.setLevel(DEBUG)

    def disableInterrupt(self):
        self._meas_mode.INT_DATARDY = 1
        self.write8(CCS811_MEAS_MODE, self._meas_mode.get())

    def setDriveMode(self, mode):
        self._meas_mode.DRIVE_MODE = mode
        self.write8(CCS811_MEAS_MODE, self._meas_mode.get())

    def available(self):
        self._status.set(self.readU8(CCS811_STATUS))
        if not self._status.DATA_READY:
            return False
        else:
            return True

    def readData(self):
        if not self.available():
            return False
        else:
            buf = self.readList(CCS811_ALG_RESULT_DATA, 8)
            self._eCO2 = (buf[0] << 8) | (buf[1])
            self._TVOC = (buf[2] << 8) | (buf[3])
            if self._status.ERROR:
                return buf[5]
            else:
                return 0

    def getTVOC(self):
        return self._TVOC

    def geteCO2(self):
        return self._eCO2

    def checkError(self):
        self._status.set(self.readU8(CCS811_STATUS))
        return self._status.ERROR

    def readU8(self, register):
        result = self._bus.read_byte_data(self._address, register) & 0xFF
        self._logger.debug("Read 0x%02X from register 0x%02X", result, register)
        return result

    def write8(self, register, value):
        value = value & 0xFF
        self._bus.write_byte_data(self._address, register, value)
        self._logger.debug("Wrote 0x%02X to register 0x%02X", value, register)

    def readList(self, register, length):
        results = self._bus.read_i2c_block_data(self._address, register, length)
        self._logger.debug("Read the following from register 0x%02X: %s", register, results)
        return results

    def writeList(self, register, data):
        self._bus.write_i2c_block_data(self._address, register, data)
        self._logger.debug("Wrote to register 0x%02X: %s", register, data)

class Bitfield:
    def __init__(self, _structure):
        self._structure = OrderedDict(_structure)
        for key, value in self._structure.items():
            setattr(self, key, 0)

    def get(self):
        fullreg = 0
        pos = 0
        for key, value in self._structure.items():
            fullreg = fullreg | ( (getattr(self, key) & (2**value - 1)) << pos )
            pos = pos + value

        return fullreg

    def set(self, data):
        pos = 0
        for key, value in self._structure.items():
            setattr(self, key, (data >> pos) & (2**value - 1))
            pos = pos + value

 CO2濃度が閾値を超えた時に Slack に通知するためのクラスも下記のように実装しておきます。ユーザ名やテキストなどを指定して Slack の Webhook にポストします。

#!/usr/bin/env python

import json
import requests

class SlackNotifier:
    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def notify(self, fallback, text, username, icon_emoji, channel, color):
        headers = {
            'Content-Type': 'application/json'
        }

        payload = {
            'username':    username,
            'channel':     channel,
            'icon_emoji':  icon_emoji,
            'attachments': [
                {
                    'fallback': fallback,
                    'text':     text,
                    'color':    color
                }
            ]
        }

        response = requests.post(self.webhook_url, json = payload, headers = headers)

        return {
            'status_code': response.status_code,
            'text':        response.text
        }

 そして上記のクラスをしようするメインのスクリプトを実装します。 CCS811 の初期化後にセンサが有効になったら、 self._ccs811.readData() で現在のCO2濃度などを計算し、計算後のデータを self._ccs811.geteCO2() で取得しています。取得した CO2 濃度から現在のステータスを判別し、ステータスが変わった場合には Slack に通知します。

#!/usr/bin/env python

import os
import sys
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from time import sleep

import slack_notifier
from CCS811 import CCS811

class AirConditionMonitor:
    CO2_PPM_THRESHOLD_1 = 1000
    CO2_PPM_THRESHOLD_2 = 2000

    CO2_LOWER_LIMIT  =  400
    CO2_HIGHER_LIMIT = 8192

    CO2_STATUS_CONDITIONING = 'CONDITIONING'
    CO2_STATUS_LOW          = 'LOW'
    CO2_STATUS_HIGH         = 'HIGH'
    CO2_STATUS_TOO_HIGH     = 'TOO HIGH'
    CO2_STATUS_ERROR        = 'ERROR'

    SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX'
    SLACK_USERNAME    = 'AIR_CONDITION_MONITOR'
    SLACK_EMOJI_ICON  = ':loudspeaker:'
    SLACK_CHANNEL     = '#air_condition_monitor'

    LOG_FILE = '{script_dir}/logs/air_condition_monitor.log'.format(
        script_dir = os.path.dirname(os.path.abspath(__file__))
    )

    def __init__(self):
        self._ccs811 = CCS811()
        self.slack_notifier = slack_notifier.SlackNotifier(self.SLACK_WEBHOOK_URL)
        self.co2_status = self.CO2_STATUS_LOW
        self.init_logger()

    def init_logger(self):
        self._logger = getLogger(__class__.__name__)
        file_handler = FileHandler(self.LOG_FILE)
        formatter = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        self._logger.addHandler(file_handler)
        self._logger.setLevel(DEBUG)

    def notify_to_slack(self, co2, co2_status):
        if co2_status == self.CO2_STATUS_ERROR:
            fallback = "Something Went Wrong..."
            text = fallback
            color = 'danger'
        else:
            fallback = "CO2 level is {0}: {1:,}ppm".format(co2_status, co2)
            text = fallback

            color = 'good'
            if co2_status == self.CO2_STATUS_HIGH:
                color = 'warning'
            elif co2_status == self.CO2_STATUS_TOO_HIGH:
                color = 'danger'

        res = self.slack_notifier.notify(
            fallback, text, self.SLACK_USERNAME, self.SLACK_EMOJI_ICON, self.SLACK_CHANNEL, color
        )
        self._logger.debug(
            'Notified to Slack. STATUS_CODE - {status_code}, MSG - {msg}'.format(
                status_code = res['status_code'], msg = res['text']
            )
        )

    def status(self, co2):
        if co2 < self.CO2_LOWER_LIMIT or co2 > self.CO2_HIGHER_LIMIT:
            return self.CO2_STATUS_CONDITIONING
        elif co2 < self.CO2_PPM_THRESHOLD_1:
            return self.CO2_STATUS_LOW
        elif co2 < self.CO2_PPM_THRESHOLD_2:
            return self.CO2_STATUS_HIGH
        else:
            return self.CO2_STATUS_TOO_HIGH

    def execute(self):
        while not self._ccs811.available():
            pass

        while True:
            if not self._ccs811.available():
                sleep(1)
                continue

            try:
                if not self._ccs811.readData():
                    co2 = self._ccs811.geteCO2()
                    co2_status = self.status(co2)
                    if co2_status == self.CO2_STATUS_CONDITIONING:
                        self._logger.debug("Under Conditioning...")
                        sleep(2)
                        continue

                    if co2_status != self.co2_status:
                        self.notify_to_slack(co2, co2_status)
                        self.co2_status = co2_status

                    self._logger.info("CO2: {0}ppm, TVOC: {1}".format(co2, self._ccs811.getTVOC()))
                else:
                    self._logger.error('ERROR!')
                    while True:
                        pass
            except:
                self._logger.error(sys.exc_info())
                self.notify_to_slack(-1, self.CO2_STATUS_ERROR)

            sleep(2)

if __name__ == '__main__':
    air_condition_monitor = AirConditionMonitor()
    air_condition_monitor.execute()

動作確認

 プログラムを実行すると下記のように二酸化炭素濃度と総揮発性有機化合物の測定値がログに出力されていきます。

2018-06-28 08:58:00,383 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
2018-06-28 08:58:02,435 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
2018-06-28 08:58:04,483 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
2018-06-28 08:58:06,531 - AirConditionMonitor - INFO - CO2: 411ppm, TVOC: 1
2018-06-28 08:58:08,579 - AirConditionMonitor - INFO - CO2: 411ppm, TVOC: 1
2018-06-28 08:58:10,627 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
2018-06-28 08:58:12,675 - AirConditionMonitor - INFO - CO2: 405ppm, TVOC: 0
2018-06-28 08:58:14,722 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1
2018-06-28 08:58:16,770 - AirConditionMonitor - INFO - CO2: 407ppm, TVOC: 1

 二酸化炭素濃度が閾値を越えると下記のように Slack に通知されます。下記の例では閾値はテスト用に低めに設定しています。

f:id:akanuma-hiroaki:20180628090010p:plain

 ちなみに CCS811 では初回使用時に48時間のエージングが推奨されています。また、起動時には20分間のコンディショニングが推奨されていて、起動直後には正しい値が測定されません。

まとめ

 ひとまず測定と通知が行えることは確認できましたが、オフィス内のどこに置くかによって実際の有効性には違いが出てくるかと思います。また、今回は温度を考慮していないので、今後温度も使用して測定したり、二酸化炭素濃度の推移をクラウド上に保存して可視化したりできると、より役に立ちそうかなと思っています。測定した結果どうやって空気品質を改善するかというのはまた課題ではありますが、まずは現状をモニタできるようにした上で、その後の改善を考えられると良いかと思います。