皆様こんにちは。ユニファの赤沼です。ユニファでは様々な職種で採用を続けていまして、今年に入ってさらにメンバーが増えています。今のオフィスには昨年11月に移転したばかりですが、早くもスペースが埋まって来ました。人数が増えて困ることの一つがトイレ事情ですね。特に今のオフィスでは男性トイレの個室が一つしかなく、行ってみたけど使用中で戻ってくるのを繰り返す、ということも発生するようになって来ました。そこで多少でも状況を改善できないかと、手元にあった機材でトイレセンサーを作ってみました。
サービス構成
サービスの全体像としては、個室の使用状況をセンサーで検知し、ステータスが変わった時に Slack に通知するというシンプルなものです。今のオフィスのトイレはドアの開閉は使用状況に連動しない(空室でもドアが閉めておける)のでどうやって使用状況を判定しようか迷ったのですが、電源を取れる場所の都合などもあり、ひとまず距離センサーで近くに何かを検知した場合は人が入って来たということで使用中のステータスとするようにしました。ステータスは空室(Vacant)か使用中(Occupied)のいずれかです。ネットワークへの接続はオフィスの AP が利用できる範囲だったので、 Wi-Fi で接続しています。
回路図
まずは回路図です。下記の図では作図素材の都合上 Raspberry Pi 3 を使っていますが、実際は Raspberry Pi Zero W を使用しています。
距離センサーとしては今回は超音波センサーを使用しています。
また、見た目で稼働状況がわかるように LED を配置しています。それぞれ下記のようにGPIOピンを接続します。
【距離センサー】
- VCC - 2番ピン(5V Power)
- TRIG - 11番ピン(GPIO17)
- ECHO - 13番ピン(GPIO27)
- GND - 9番ピン(GROUND)
【LED】
- アノード - 12番ピン(GPIO18)
- カソード - 9番ピン(GROUND)
スクリプト実装
それでは上記回路をしようするためのコーディングです。今回は Python で実装してみました。まずは距離センサーを扱うクラスを下記のように実装しました。コンストラクタでは使用するピンやロガーの設定をしています。超音波センサーの仕組みとしては、TRIGピンから出したパルスが対象物に反射してECHOピンで受け取れるまでの時間から距離を計算します。詳細については先ほどの製品ページのサンプルなどもご覧いただくとしてここでは割愛しますが、かかった時間を計算する calc_duration() メソッドと、それをベースに距離を計算する distance() メソッドを用意しています。
#!/usr/bin/env python import RPi.GPIO as GPIO import time from logging import getLogger, FileHandler, Formatter, DEBUG from timeout_decorator import timeout, TimeoutError class DistanceSensor: TRIG_PIN = 17 ECHO_PIN = 27 DEBUG_LOG_FILE = 'logs/distance_sensor_debug.log' TIMEOUT_SEC = 3 def __init__(self): GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(self.TRIG_PIN, GPIO.OUT) GPIO.setup(self.ECHO_PIN, GPIO.IN) self.debug_logger = getLogger(__name__ + '_Debug') debug_file_handler = FileHandler(self.DEBUG_LOG_FILE) formatter = Formatter('%(asctime)s:%(levelname)s:%(message)s') debug_file_handler.setFormatter(formatter) self.debug_logger.addHandler(debug_file_handler) self.debug_logger.setLevel(DEBUG) @timeout(TIMEOUT_SEC) def calc_duration(self): GPIO.output(self.TRIG_PIN, GPIO.LOW) time.sleep(0.3) GPIO.output(self.TRIG_PIN, GPIO.HIGH) time.sleep(0.00001) GPIO.output(self.TRIG_PIN, GPIO.LOW) pulse_off = None while GPIO.input(self.ECHO_PIN) == 0: pulse_off = time.time() if pulse_off is None: self.debug_logger.debug('pulse_off is None.') return None pulse_on = None while GPIO.input(self.ECHO_PIN) == 1: pulse_on = time.time() if pulse_on is None: self.debug_logger.debug('pulse_on is None.') return None duration = pulse_on - pulse_off return duration def distance(self): try: duration = self.calc_duration() if duration is None: return None distance = duration * 17000 return distance except TimeoutError: self.debug_logger.debug('TimeoutError while calcurating duration.') return None
また、 slack への通知処理も下記のようにクラスを分けて実装しています。APIのコールには Requests モジュールを使用しています。
#!/usr/bin/env python import json import requests class SlackNotifier: def __init__(self, webhook_url): self.webhook_url = webhook_url def notify(self, fallback, text, username, icon_emoji, channel, color): headers = { 'Content-Type': 'application/json' } payload = { 'username': username, 'channel': channel, 'icon_emoji': icon_emoji, 'attachments': [ { 'fallback': fallback, 'text': text, 'color': color } ] } response = requests.post(self.webhook_url, json = payload, headers = headers) return { 'status_code': response.status_code, 'text': response.text }
そしてこれらのクラスを使用して処理を行うメインのクラスを下記のように実装しています。1秒おきに距離を取得していますが、距離センサーの測定値にはばらつきがあり、温度や対象物の素材によっても音波を反射しやすいかどうかが変わってくるので、単発の測定値だけで判定するのではなく、直近10回分の測定結果から判定するようにしています。10回のうち8回以上閾値より近くに何かがあるという結果になったら使用中のステータスに変更し、2回以下になったら空室ステータスに変更します。3回〜7回の場合はステータスを変更しません。入室時や退室時は閾値を越えたり越えなかったりという感じになるので、その揺らぎを吸収するという意図もあります。閾値より近くに物を検知した場合は LED を点灯させます。ステータスが切り替わった場合には slack に通知します。
#!/usr/bin/env python import distance_sensor import slack_notifier import time import RPi.GPIO as GPIO from logging import getLogger, FileHandler, Formatter, DEBUG class RestroomMonitor: SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX' SLACK_USERNAME = 'RESTROOM_TOKYO_MENS' SLACK_EMOJI_ICON = ':restroom:' SLACK_CHANNEL = '#tokyo_restroom_mens' LOG_FILE = 'logs/restroom_monitor.log' THRESHOLD_CM = 108 OCCUPIED_THRESHOLD = 0.8 VACANT_THRESHOLD = 0.2 LED_PIN = 18 def __init__(self): self.logger = getLogger(__name__) file_handler = FileHandler(self.LOG_FILE) formatter = Formatter('%(asctime)s:%(levelname)s:%(message)s') file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.setLevel(DEBUG) self.distance_sensor = distance_sensor.DistanceSensor() self.slack_notifier = slack_notifier.SlackNotifier(self.SLACK_WEBHOOK_URL) GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.setup(self.LED_PIN, GPIO.OUT) self.threshold_queue = [0] * 10 self.occupied = False def notify_to_slack(self, occupied): fallback = 'Occupied' if occupied else 'Vacant' text = 'Occupied' if occupied else 'Vacant' color = 'warning' if occupied else 'good' res = self.slack_notifier.notify( fallback, text, self.SLACK_USERNAME, self.SLACK_EMOJI_ICON, self.SLACK_CHANNEL, color ) self.logger.debug( 'Notified to Slack. STATUS_CODE - {status_code}, MSG - {msg}'.format( status_code = res['status_code'], msg = res['text'] ) ) def judge_occupation(self, distance): threshold_queue_value = 1 if distance <= self.THRESHOLD_CM else 0 GPIO.output(self.LED_PIN, GPIO.HIGH if threshold_queue_value == 1 else GPIO.LOW) self.threshold_queue.append(threshold_queue_value) del self.threshold_queue[0] average = sum(self.threshold_queue) / len(self.threshold_queue) if average >= self.OCCUPIED_THRESHOLD: occupied = True elif average <= self.VACANT_THRESHOLD: occupied = False else: occupied = self.occupied self.logger.debug( 'QUEUE - {queue} {distance} {average} {occupied}'.format( queue = self.threshold_queue, distance = distance, average = average, occupied = occupied ) ) return occupied def execute(self): while True: distance = self.distance_sensor.distance() if distance is None: self.logger.debug('Distance is None.') time.sleep(1) continue occupied = self.judge_occupation(distance) if self.occupied != occupied: self.occupied = occupied self.notify_to_slack(self.occupied) time.sleep(1)
設置
実装が終わったら実際にトイレに設置します。個室内にAC電源があるのでそこから micro USB で給電し、トイレットペーパーホルダー上に配置します。ブレッドボードとラズパイの裏には養生テープを丸めて簡単に固定しています。
スクリプト実行
設置したら ssh でログインしてスクリプトを実行します。
$ ./restroom_monitor.py &
ログには下記のように出力されていきます。
2018-04-25 09:53:35,610:DEBUG:QUEUE - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] 109.69352722167969 0.0 False 2018-04-25 09:53:36,924:DEBUG:QUEUE - [0, 0, 0, 0, 0, 0, 0, 0, 0, 1] 106.19568824768066 0.1 False 2018-04-25 09:53:38,236:DEBUG:QUEUE - [0, 0, 0, 0, 0, 0, 0, 0, 1, 1] 81.27307891845703 0.2 False 2018-04-25 09:53:39,551:DEBUG:QUEUE - [0, 0, 0, 0, 0, 0, 0, 1, 1, 1] 80.18684387207031 0.3 False 2018-04-25 09:53:40,862:DEBUG:QUEUE - [0, 0, 0, 0, 0, 0, 1, 1, 1, 1] 75.01912117004395 0.4 False 2018-04-25 09:53:42,174:DEBUG:QUEUE - [0, 0, 0, 0, 0, 1, 1, 1, 1, 1] 80.32464981079102 0.5 False 2018-04-25 09:53:43,486:DEBUG:QUEUE - [0, 0, 0, 0, 1, 1, 1, 1, 1, 1] 77.09026336669922 0.6 False 2018-04-25 09:53:44,798:DEBUG:QUEUE - [0, 0, 0, 1, 1, 1, 1, 1, 1, 1] 73.83155822753906 0.7 False 2018-04-25 09:53:46,110:DEBUG:QUEUE - [0, 0, 1, 1, 1, 1, 1, 1, 1, 1] 76.49850845336914 0.8 True 2018-04-25 09:53:51,961:DEBUG:Notified to Slack. STATUS_CODE - 200, MSG - ok
ステータスが切り替わると下記のように slack に通知されます。
課題
現状それなりに稼働させていますが、下記のような課題があります。
【精度】
空室状態の時に誤検知して使用中と判定することはほぼないのですが、使用中なのに空室と判定したり、空室と使用中の切り替えを繰り返してしまったりということが多く、もっと確実にステータスを判定できるセンシングに変更したいところです。
【ネットワーク】
オフィスの AP に届いてはいるものの、やはりそんなに電波は強くないので、状況によってはネットワークに接続できずにエラーになってしまうので、ネットワーク接続の安定性は改善したいと思っています。
【死活監視】
いつの間にかプロセスが落ちて通知が止まっていることがあるので、プロセス監視を入れて落ちたら自動的に起動するなどの対応が必要です。
まとめ
まだまだ課題はあるものの、センサー設置前と比べると便利になっているということで、メンバーにはそれなりに好評のようです。今まで色々センサーを試してみたりはしていましたが、やっぱりちゃんと役に立つものを作れると面白いですね。課題点については暇を見て改善していこうと思います。