ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

AWS Lambda から Google Cloud Pub/Sub へ投入してみる(Python)

おはようございます。
こんにちは。
こんばんは。

最近の日差しの強さで肌が焼けるなと感じる地黒のインフラ担当すずきです。

本当に暑くなってきましたね。でも冷房で寒くてつらい…。
皆様、温度差で体を壊さないようにしていただけたらと思います。

さて本題、ユニファのシステムは基本的にAWSを利用しているのですが、BigQueryとかも利用したいのでGCPを利用しようという流れが来てます。
S3に溜まってるログをGCS経由で送ればいいじゃんってのもあるのですが、どうせならCloud Pub/Subとか使ってみようって事でLambdaからCloud Pub/Subを触ってみました。

GCPのAPIキー取得

クイックスタート: gcloud コマンドライン ツールの使用  |  Cloud Pub/Sub のドキュメント  |  Google Cloud Platform ここの始める前にをやって gcloud を使えるようにします。 その後以下のコマンドで認証します。

gcloud auth application-default login

コマンド実行後ブラウザで認証を通すと以下のような出力がでるので。
Credentials saved to file:
のパスを控えておきます。

$ gcloud auth application-default login
Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?redirect_uri=~~hogefuga~~~~

Credentials saved to file: [$HOME/.config/gcloud/application_default_credentials.json]

These credentials will be used by any library that requests
Application Default Credentials.


Updates are available for some Cloud SDK components.  To install them,
please run:
  $ gcloud components update

Cloud Pub/Subのコンソールからトピックとサブスクリプションを作成

Pub/Subの画面からトピックの作成します。
お試しなので「lambda_pubsub_test」って名前にしてみます。 f:id:mominosin:20170531191352p:plain

トピックはメッセージを投げ込むエンドポイントのようなものって言えばいいのかな?
つづいてサブスクリプションを作ります。
サブスクリプションはトピックに投げ込まれたメッセージを受け取る場所とでも言えばいいのでしょうかまだよくわかっていませんが作ります。
とりあえず同じく「lambda_pubsub_test」って名前にしてみます。 f:id:mominosin:20170531192722p:plain

Pub/Subのライブラリをローカルに準備

ひとまずLambdaで利用できるライブラリを調べてみました。

Lambda 実行環境と利用できるライブラリ - AWS Lambda

AWS SDK for Python (Boto 3) バージョン 1.4.4、Botocore バージョン 1.5.13

はい、google-cloud-pubsubなんてあるわけ無いですよね… なのでlambdaに投げ込むコードと一緒に入れ込むことにします。

$ mkdir lambda_pubsub  # 以後このディレクトリで作業していると思ってください
$ cd lambda_pubsub
$ pip install --upgrade google-cloud-pubsub -t .

pip install 時に -t . オプションを使うことでその場に展開してくれます。

これだけだと、Pythonを実行してもgoogle-cloud-pubsubのライブラリを読み込んでくれないので以下のコマンドを実行しておきます。

$ touch google/__init__.py
$ touch google/cloud/__init__.py

次にGCPのAPIキー取得 で取得したAPIキーをコピーしてきます。

$ cp $HOME/.config/gcloud/application_default_credentials.json .

これでライブラリの準備は完了です。

サンプルコードの準備

パブリッシャー ガイド  |  Cloud Pub/Sub  |  Google Cloud Platform
こちらのパブリッシャーガイドの トピックへのメッセージのパブリッシュ のPythonコードを参考に以下のコードを準備します。
ファイル名は lambda_function.py にします。
置き場所は先程のライブラリと同じ位置になります。

import json,os
from google.cloud import pubsub

def lambda_handler(event, context):
  data = json.dumps(event)

  pubsub_client = pubsub.Client()
  topic = pubsub_client.topic(os.environ['TOPIC_NAME'])

  data = data.encode('utf-8')

  message_id = topic.publish(data)

  print('Message {} published.'.format(message_id))

コードを準備したらlambdaにアップロードするためにS3に配置します。
直接ブラウザからアップロードすると容量が多いためエラーになるのでS3を経由します。

$ pwd
$HOME/lambda_pubsub_test
$ zip -r ../lambda_pubsub_test.zip *
$ aws s3 cp ../lambda_pubsub_test.zip s3://lambda_pubsub_test/lambda_pubsub_test.zip

これでコードの準備も完了です。

LambdaにアップロードしFunctionを作成

AWSのWebコンソールポチポチしながら作っていきます。
LambdaのコンソールでBlank Functionを作ります。
f:id:mominosin:20170601110414p:plain

Blank Function をクリックした次のページもそのまま次へを押して進んでください。

f:id:mominosin:20170601111442p:plain s3にアップロードしたファイルのアドレスや、 GCPのライブラリを使うために必要な環境変数などを設定します。
GCLOUD_PROJECTはWebコンソールのダッシュボードの左上にあるプロジェクト情報のプロジェクトIDを利用してください。 f:id:mominosin:20170601114937p:plain

Lambda Functionが作れたらテストを実行してみます。

f:id:mominosin:20170601111903p:plain 簡単なJSONを送ってみます。

f:id:mominosin:20170601112231p:plain 結果は null なんですが、右下の枠内に Message 127608900050889 published. って表示されているので転送成功です。

Pub/Subに入っているデータを取り出してみる

サブスクライバー ガイド  |  Cloud Pub/Sub  |  Google Cloud Platform
このサブスクライバーガイドのpull メッセージの受信のPythonコードを参考に以下のコードを準備します。
これはローカルで実行します。
ファイル名は適当で良いです。
とりあえずsub_sample.pyとかにしておきます。

from google.cloud import pubsub

pubsub_client = pubsub.Client()
# Webコンソールから作成したトピック名を引数に
topic = pubsub_client.topic('lambda_pubsub_test')
# Webコンソールから作成したサブスクリプション名を引数に
subscription = topic.subscription('lambda_pubsub_test')

results = subscription.pull(return_immediately=True)

print('Received {} messages.'.format(len(results)))

for ack_id, message in results:
    print('* {}: {}, {}'.format(
        message.message_id, message.data, message.attributes))

if results:
    subscription.acknowledge([ack_id for ack_id, message in results])

保存したら下記のようにコマンド実行します。

$ export GCLOUD_PROJECT="GCPのプロジェクトID"
# gcloudの設定ができていれば不要だと思うので必要なら設定してください。
$ export GOOGLE_APPLICATION_CREDENTIALS=$HOME/.config/gcloud/application_default_credentials.json
$ python sub_sample.py
Received 1 messages.
* 127608670635219: {"key3": "value3", "key2": "value2", "key1": "value1"}, {}

実行したところ、先程のLambdaテストで登録したJSONが表示されたことが確認できます。

おわり

以上でLambdaからPub/Subへのデータ投入挑戦は終了です。
これができることで、既存のAWSで作成したシステムのデータをLambda経由で加工してPub/Subへ送りDataflow経由でBigQueryに送るなどできるようになるかなと思います。

直接Pub/Subへ送れよとかあるかもしれませんがそこは目をつぶってください。 また、所々ツッコミどころあると思いますのでご意見くださるとありがたいです。

以上です、最後まで読んでいただきありがとうございました。