ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

DynamoDBで集計とJavaScriptのPromise

こんにちは、プロダクト開発部のチョウです。最近一部古いJavaScriptをcallbakからPromise/async/awaitのスタイルに書き換えるようにしてみました。その中で印象に残ったコードを紹介します。

やりたいことが簡単です。あるセンサーのデータ個数を日ごとに集計します。テーブルの構造はこういう感じです。

SensorId: Number(Partition Key)
Date: String(Sort Key)
DataCount: Number

Partition KeyとSort KeyはDyanmoDBでテーブルを作るとき必要な情報ですが、とくに本文に関係ありません。

普通にRelation Databaseで考えると、最初にレコードを作るのは要注意です。複数クライアントでレコードを作ろうとすると、一個だけが実際のDBに残り、集計が正しくない状態になります。 普通のDBならunique indexなどを使います。SensorIdとDateを一つのunique indexにして、同じindexで作ろうと一つだけが成功し、ほかはすべて失敗です。 DynamoDBにunique indexがないですが、ConditionExpressがあります。レコードを追加したり更新したりするとき条件に満たさないと失敗になります。一個のセンサーに一日に一個のデータを作るには

    this.ddb.putItem({
      TableName: 'SensorDigest,
      Item: {
        DeviceId: {N: String(sensorId)},
        Date: {S: date},
        DataCount: {N: String(1)}
      },
      ConditionExpression: 'attribute_not_exists(Date)'
    }, (error, data) => {
      if(error) {
        if(error.code === 'ConditionalCheckFailedException') {
          // 重複
        } else {
          // 他のエラー
        }
      } else {
        // 先生
      }
    });

attribute_not_existsを使えば、レコードがない前提で追加できます。パラメータのDateは実際なんでもいいですが、必ずあるKeyにしたほうをおすすめです。

次にDataCountを増やすのも簡単ではありません。複数クライアントで一斉に更新すると、一部クライアントのデータがロスするかもしれません。 この問題に対し、普通のDBはversionカラムなどの方法があります。 DynamoDBになると、UpdateExpressionに SET DataCount = DataCount + increment や、1を増やす場合、ADD関数を使えます。 更新のコードはこうなります。

    this.ddb.updateItem({
      TableName: 'SensorDigest',
      Key: {
        SensorId: {N: String(sensorId)},
        Date: {S: date}
      },
      UpdateExpression: 'SET DataCount = DataCount + :increment',
      ExpressionAttributeValues: {
        ':increment': {N: String(increment)}
      }
    }, (error, data) => {
      if(error) {
        // 失敗
      } else {
        // 成功
      }
    });

レコードの追加と更新をあわせて考えると、ごく普通で、日ごとにセンサーのデータを集計することが複雑になりました。 あるセンサーのデータが来ると、まず追加ですか。それとも更新ですか。

ここで、私の考えは、

  1. 更新
  2. 更新失敗でしたら、追加
  3. 追加失敗ししたら、更新

最初更新にするのは、99%以上が更新だからです。追加は1回のみで、毎回やる必要がありません。 一日最初の更新は失敗するので、そこでレコードを追加します。 もし運悪く、複数クライアントで1の更新失敗し、2の追加をすることになったら、一つのクライアントだけが成功し、失敗したクライアントは更新モードに戻ります。今度こそ失敗しないです。

流れが完璧ですが、コードは少々複雑です。Promise/async/awaitなしのバージョン

  // pattern 1
  increaseDataCount(sensorId, date, increment, moveToPattern2) {
      this.ddb.updateItem({
      TableName: 'SensorDigest',
      Key: {
        SensorId: {N: String(sensorId)},
        Date: {S: date}
      },
      ConditionExpression: 'attribute_exists(DateUsed)',
      UpdateExpression: 'SET DataCount = DataCount + :increment',
      ExpressionAttributeValues: {
        ':increment': {N: String(increment)}
      }
    }, (error, data) => {
      if(error) {
        if(error.code === 'ConditionalCheckFailedException' && 
           moveToPattern2) {
            // レコードは存在しない、go to pattern 2
            save(sensorId, date, increment)
        } else {
            // 失敗
        }
      } else {
        // 成功
      }
    });
  }

  save(sensorId, date, increment) {
    this.ddb.putItem({
        TableName: 'SensorDigest,
        Item: {
            DeviceId: {N: String(sensorId)},
            Date: {S: date},
            DataCount: {N: String(increment)}
        },
        ConditionExpression: 'attribute_not_exists(Date)'
        }, (error, data) => {
        if(error) {
            if(error.code === 'ConditionalCheckFailedException') {
                // 重複、go to pattern 3
                increaseDataCount(sensorId, date, increment, false);
            } else {
                // 他のエラー
            }
        } else {
            // 先生
        }
    });
  }

pattern 1のときに更新と同時にConditionExpressionを追加し、レコードがないとすぐ失敗になります。 callbackの中でもし条件に満たさないエラー(レコードが存在しない)なら、pattern 2に移します。

pattern 2でレコードを追加します。ここもConditionExpressionがあり、複数クライアントの場合ひとつのクライアントだけ成功になります。失敗したほかのクライアントは条件満たさないエラーで、pattern 3に移します。

pattern 3はpattern 1のコードそのまま利用し、パラメータのmoveToPattern2をfalseに設定しただけです。パラメータを用意するのは無限ループにならないためです(基本ならないと思いますが)。確率からすると、pattern 3になるクライアントはほとんどいないはずです。

ようやくcallbackスタイルのコードをPromise/async/awaitに変わる部分に入れるようになりました。 上のコードを見ると、同期スタイルのコードの変えるのは難しそうですね。

try {
    await increaseDataCount(sensorId, date, increment)
} catch(e) {
    if(e.code == 'ConditionalCheckFailedException') {
        try {
            await save(sensorId, date, increment)
        } catch(e) {
            if(e.code == 'ConditionalCheckFailedException) {
                await increaseDataCount(sensorId, date, increment);
                return;
            }
            throw e;
        }
        return;
    }
    throw e;
}

(ここで、increaseDataCountのcallbackからsaveをコールしなくなるので、パラメータmoveToPattern2を削除しました。) しかも、同期スタイルに変えても読みにくそうです。 ではPromiseスタイルはどうでしょう。

await increaseDataCount(sensorId, date, increment)
    .catch(e -> 
        if(e.code == 'ConditionalCheckFailedException') {
            return save(sensorId, date, increment)
        }
        return Promise.error(e)
    )
    .catch(e -> 
        if(e.code == 'ConditionalCheckFailedException') {
            return increaseDataCount(sensorId, date, increment)
        }
        return Promise.error(e)
    );

同期コードよりよさそうです。わかりやすいです。 increaseDataCountとsaveをPromiseスタイルに変えるのはとくに難しくないのでここで割愛します。

私が古いコードをPromise/async/awaitに変える中でこれは一番難しかったです。フローはそうだし、エラーハンドリングもすべてasync/awaitに寄せるではなく、Promiseも使うのもいいでしょう。 実際、こういうPromiseのchainのような書き方は関数型プログラミング言語HaskellでMonadといいます。Monadを用いたReactiveX(RxJava、RxSwiftなど)もエラーハンドリングなどに優れています。

いかがでしょうか。DynamoDBで集計テーブルを操作するのも、callbackスタイルのコードをPromise/async/awaitも難しかったんでしょうか。普段こういうコードあまり見れないと思いますので、参考になると幸いです。