ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

Swift for TensorFlowで頭部姿勢推定をやってみる

こんにちは、iOSエンジニアのしだです。
前回、Swift-Jupyter について書いたのですが、本当はSwift for TensorFlowを使って頭部姿勢推定(Head pose estimation)をやりたかったのでこちらをやってみたいと思います。

tech.unifa-e.com

頭部姿勢推定(Head pose estimation)

頭部姿勢推定は、カメラからみた頭の向きの推定を行うことです。 そのままですが、画像から頭の向きを pitch、 yaw、 roll の3つの値を予測することになります。

データセットの準備

データセットは tensorflow_datasets に含まれている300W-LP というものを使います。 このデータセットは、Face Alignment用のデータセットになっており、顔の向きだけではなく顔のランドマークデータも含んでいます。 今回は、その中の pitch、yaw、rollのデータを使います。

しかし、Swift for TensorFlow から直接 tensorflow_datasets を使うことができなかったため、 一度、Pythonからnumpyを使って、 .npy形式へ変換して使うことにします。 変換方法を以下のGistにおいておきましたので参考にしてください。

generate-dataset-from-the300w-lp-public.ipynb · GitHub

また、今回の実装は以下の論文がベースになっています。

arxiv.org

準備

基本的に Swift-Jupyter 上で実装を進めていますが、swift-modelsというリポジトリの ResNetの実装をそのまま使わせてもらいます。 そのために、直接 ResNet50.swift をダウンロードしておきます。

$ curl -O https://raw.githubusercontent.com/tensorflow/swift-models/master/Models/ImageClassification/ResNet50.swift

実装

インポート

最初に必要なソースコードを Swift-Jupyter 上で参照できるように、%include を使って読み込んでおきます。

%include "EnableIPythonDisplay.swift"
%include "ResNet50.swift"
IPythonDisplay.shell.enable_matplotlib("inline")
import TensorFlow
import Python

let np = Python.import("numpy")
let plt = Python.import("matplotlib.pyplot")
let PIL = Python.import("PIL")

データセットの読み込み

作っておいたデータセットをnumpy.loadを使って読み込みます。

let data = np.load("data_20000.npy")
let label = np.load("label_20000.npy")
print("data.npy", data.shape, data.dtype)
print("label.npy", label.shape, label.dtype)
data.npy (20000, 128, 128, 3) uint8
label.npy (20000, 3) float32

いくつか前処理を行います。ピクセル値を0 ~ 1にしたり、pitch、yaw、roll の値をone-hotベクトル化してます。 回転値を予測したいのですが、ここでは -90° ~ 90° の値を 3° 刻みにラベルを分割します。

let data2 = (data / 255.0).astype(np.float32)
print(data2.shape, data2.dtype)

let bins = np.arange(-90, 93, 3, dtype: np.int32)
let bin_size = Int(bins.shape[0])!
let bin_labels = np.digitize(label, bins).astype(np.int32)
let label_onehot = np.eye(bin_size)[bin_labels].astype(np.int32)
print(bin_size, bin_labels.shape, bin_labels.dtype)
print(label_onehot.shape, label_onehot.dtype)
(20000, 128, 128, 3) float32
61 (20000, 3) int32
(20000, 3, 61) int32

Numpyの PythonObject をTensorに変換しておきます。以降はこの data、axes、labels のデータを使って学習します。

let data = Tensor<Float>(numpy: data2)!
let axes = Tensor<Float>(numpy: label)!
let labels = Tensor<Int32>(numpy: bin_labels)!
print(data.shape, axes.shape, labels.shape)
print(axes[0], labels[0], labels.shape)
[20000, 128, 128, 3] [20000, 3] [20000, 3]
[ 11.812507,  30.915585, -11.911684] [34, 41, 27] [20000, 3]

TensorGroupを継承したクラスを用意します。これは、入力データのパイプラインを作るときに必要になるクラスで、データの読み出しを定義します。 あとで Datasetと一緒につかうことで、入力データをバッチサイズごとに取り出したり、シャッフルしたりパイプラインを構築するのが容易になります。

public struct LabeledExample: TensorGroup {
    public var label: Tensor<Int32>
    public var axes: Tensor<Float>
    public var data: Tensor<Float>
    
    public init(label: Tensor<Int32>, axes: Tensor<Float>, data: Tensor<Float>){
        self.label = label
        self.axes = axes
        self.data = data
    }
    
    public init<C: RandomAccessCollection>(_handles: C) where C.Element: _AnyTensorHandle {
        precondition(_handles.count == 3)
        let labelIndex = _handles.startIndex
        let axesIndex = _handles.index(labelIndex, offsetBy: 1)
        let dataIndex = _handles.index(labelIndex, offsetBy: 1)
        label = Tensor<Int32>(handle: TensorHandle<Int32>(handle: _handles[labelIndex]))
        axes = Tensor<Float>(handle: TensorHandle<Float>(handle: _handles[axesIndex]))
        data = Tensor<Float>(handle: TensorHandle<Float>(handle: _handles[dataIndex]))
    }
}

モデルの定義

モデルを構築します。HeadPoseというLayerをベースモデルとして定義します。こちらは ResNet50.swift の実装を参考にしています。
あと、pitch、yaw、roll の3つの軸を出力するために Axis というLayerを用意します。 Layer プロトコルは、callAsFunction を提供しておりこの関数にフォワード処理を記述します。@differentiable と付けてあげればバックワード処理を記述することはありません。

public struct Axis: Layer {
    public var layer: HeadPose
    public var fc: Dense<Float>
    
    public init(inputSize: Int, outputSize: Int, backbone: HeadPose) {
        layer = backbone
        fc = Dense(inputSize: inputSize, outputSize: outputSize)
    }
    
    @differentiable
    public func callAsFunction(_ input: Tensor<Float>) -> Tensor<Float> {
        return softmax(fc(layer(input)))
    }
}

public struct HeadPose: Layer {
    public var l1: ConvBN
    public var maxPool = MaxPool2D<Float>(poolSize: (3, 3), strides: (2, 2))

    public var l2a = ResidualBasicBlock(featureCounts: (64, 64, 64, 64))
    public var l2b: ResidualBasicBlockStack

    public var l3a = ResidualBasicBlockShortcut(featureCounts: (64, 128, 128, 128))
    public var l3b: ResidualBasicBlockStack

    public var l4a = ResidualBasicBlockShortcut(featureCounts: (128, 256, 256, 256))
    public var l4b: ResidualBasicBlockStack

    public var l5a = ResidualBasicBlockShortcut(featureCounts: (256, 512, 512, 512))
    public var l5b: ResidualBasicBlockStack
    
    public var avgPool = GlobalAvgPool2D<Float>()

    public init(numClass: Int, layerBlockCounts: (Int, Int, Int, Int)=(3, 4, 6, 3)) {
        l1 = ConvBN(filterShape: (7, 7, 3, 64), strides: (2, 2), padding: .same)
        maxPool = MaxPool2D(poolSize: (3, 3), strides: (2, 2))

        l2b = ResidualBasicBlockStack(
            featureCounts: (64, 64, 64, 64), blockCount: layerBlockCounts.0)
        l3b = ResidualBasicBlockStack(
            featureCounts: (128, 128, 128, 128), blockCount: layerBlockCounts.1)
        l4b = ResidualBasicBlockStack(
            featureCounts: (256, 256, 256, 256), blockCount: layerBlockCounts.2)
        l5b = ResidualBasicBlockStack(
            featureCounts: (512, 512, 512, 512), blockCount: layerBlockCounts.3)
    }

    @differentiable
    public func callAsFunction(_ input: Tensor<Float>) -> Tensor<Float> {
        let inputLayer = maxPool(relu(l1(input)))
        let level2 = inputLayer.sequenced(through: l2a, l2b)
        let level3 = level2.sequenced(through: l3a, l3b)
        let level4 = level3.sequenced(through: l4a, l4b)
        let level5 = level4.sequenced(through: l5a, l5b, avgPool)
        return level5
    }
}

Lossの値をスタックするための変数を定義しています。エポック毎にLossの値を保持して、学習終了後にグラフとして表示するために用います。

var state = [
    "pitch": [
        "total": [Float](),
        "softmax": [Float](),
        "mse": [Float](),
    ], 
    "yaw": [
        "total": [Float](),
        "softmax": [Float](),
        "mse": [Float](),
    ],
    "roll": [
        "total": [Float](),
        "softmax": [Float](),
        "mse": [Float](),
    ], 
]

学習

モデル、データセットやオプティマイザを初期化していよいよ学習します。

// データセット
let dataset = Dataset<LabeledExample>(elements: LabeledExample(label: labels, axes: axes, data: data))

let binTensor = Tensor<Int32>(numpy: bins)!
let binSize = binTensor.shape[0]
let bin = Tensor<Float>(binTensor)

// ハイパーパラメータ
let epochs = 40
let batchSize = 128
let alpha: Float = 2.0

// モデル
var model = HeadPose(numClass: binSize, layerBlockCounts: (2, 2, 2, 2))

var pitch = Axis(inputSize: 512, outputSize: binSize, backbone: model)
var yaw = Axis(inputSize: 512, outputSize: binSize, backbone: model)
var roll = Axis(inputSize: 512, outputSize: binSize, backbone: model)
let optimizerPitch = Adam(for: pitch, learningRate: 1e-4)
let optimizerYaw = Adam(for: yaw, learningRate: 1e-4)
let optimizerRoll = Adam(for: roll, learningRate: 1e-4)

学習は epoch回数分ループを回し、datasetからバッチサイズ分ロードすることでステップごとの学習ができます。 以下は冗長なのですが、3つの回転軸をそれぞれわかるように実装しています。

for epoch in 0..<epochs {
  Context.local.learningPhase = .training
  let ds = dataset.shuffled(sampleCount: 10000, randomSeed: Int64(1234))
  for batch in ds.batched(batchSize) {
    let (images, axes, labels) = (batch.data, batch.axes, batch.label)
    
    let (pLoss, 𝛁pitch) = valueWithGradient(at: pitch) { pitch -> Tensor<Float> in
      let ŷ = pitch(images)
      let y = labels[TensorRange.ellipsis, 0]
      let loss = softmaxCrossEntropy(logits: ŷ, labels: y)
      let yAxis = axes[TensorRange.ellipsis, 0]
      let ŷAxis =* bin).sum(squeezingAxes: 1)
      let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis)
      let totalLoss = loss + alpha * reg
      state["pitch"]?["total"]?.append(totalLoss.scalarized())
      state["pitch"]?["softmax"]?.append(loss.scalarized())
      state["pitch"]?["mse"]?.append(reg.scalarized())
      return loss + alpha * reg
    }
    let (yLoss, 𝛁yaw) = valueWithGradient(at: yaw) { yaw -> Tensor<Float> in
      let ŷ = yaw(images)
      let y = labels[TensorRange.ellipsis, 1]
      let loss = softmaxCrossEntropy(logits: ŷ, labels: y)
      let yAxis = axes[TensorRange.ellipsis, 1]
      let ŷAxis =* bin).sum(squeezingAxes: 1)
      let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis)
      let totalLoss = loss + alpha * reg
      state["yaw"]?["total"]?.append(totalLoss.scalarized())
      state["yaw"]?["softmax"]?.append(loss.scalarized())
      state["yaw"]?["mse"]?.append(reg.scalarized())
      return totalLoss
    }
    let (rLoss, 𝛁roll) = valueWithGradient(at: roll) { roll -> Tensor<Float> in
      let ŷ = roll(images)
      let y = labels[TensorRange.ellipsis, 2]
      let loss = softmaxCrossEntropy(logits: ŷ, labels: y)
      let yAxis = axes[TensorRange.ellipsis, 2]
      let ŷAxis =* bin).sum(squeezingAxes: 1)
      let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis)
      let totalLoss = loss + alpha * reg
      state["roll"]?["total"]?.append(totalLoss.scalarized())
      state["roll"]?["softmax"]?.append(loss.scalarized())
      state["roll"]?["mse"]?.append(reg.scalarized())
      return totalLoss
    }
    optimizerPitch.update(&pitch, along: 𝛁pitch)
    optimizerYaw.update(&yaw, along: 𝛁yaw)
    optimizerRoll.update(&roll, along: 𝛁roll)
  }
  print(
    """
    [Epoch \(epoch)] \
    Loss: pitch=\(state["pitch"]?["total"]?.last!), \
      yaw=\(state["yaw"]?["total"]?.last!), \
      roll=\(state["roll"]?["total"]?.last!))
    """
  )
}

学習にはGPU環境を利用しています。学習に時間がかかるのでご利用の際は気をつけてください。

2019-12-10 12:01:42.863078: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2019-12-10 12:01:43.881228: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10.0
[Epoch 0] Loss: pitch=Optional(206.77942),   yaw=Optional(1058.0618),   roll=Optional(156.5832))
[Epoch 1] Loss: pitch=Optional(216.52313),   yaw=Optional(729.75433),   roll=Optional(132.02693))
[Epoch 2] Loss: pitch=Optional(223.95944),   yaw=Optional(531.65314),   roll=Optional(74.63479))
:
[Epoch 37] Loss: pitch=Optional(29.166101),   yaw=Optional(43.509895),   roll=Optional(12.510527))
[Epoch 38] Loss: pitch=Optional(29.051668),   yaw=Optional(38.135098),   roll=Optional(16.24944))
[Epoch 39] Loss: pitch=Optional(27.234924),   yaw=Optional(27.397942),   roll=Optional(15.582761))

学習中のLossをグラフとして出力します。

let steps = np.arange(0, step, 1)

plt.figure(figsize: [6, 4])
plt.plot(steps, state["pitch"]!["total"]!, label: "pitch")
plt.plot(steps, state["yaw"]!["total"]!, label: "yaw")
plt.plot(steps, state["roll"]!["total"]!, label: "roll")

plt.xlabel("Steps")
plt.ylabel("Loass")
plt.title("Training losses")
plt.legend()
plt.show()

f:id:unifa_tech:20200116102152p:plain
Total loss

推論

最後にレナさんの画像を使って、頭部姿勢推定してみます。

var image = PIL.Image.open("rena_crop.png").convert("RGB")
image = np.array(image)

var image_np = np.expand_dims(image, 0)
image_np = (image_np / 255.0).astype(np.float32)
print(image_np.shape)

let input = Tensor<Float>(numpy: image_np)!
let (p, y, r) = ((pitch(input) * bin).sum(squeezingAxes: 1), 
                 (yaw(input) * bin).sum(squeezingAxes: 1), 
                 (roll(input) * bin).sum(squeezingAxes: 1))
print(p, y, r)
(1, 128, 128, 3)
[-1.9520531] [0.3463745] [-0.5297375]

出力結果をもとに、pitch、yaw、rollの軸を描画しますと以下のようになります。 (残念ながら軸の描画はPythonで行っています。) 少しずれているような気がしますが、概ね合っているかなと思います。

f:id:unifa_tech:20200116102243p:plain:w200
Rena with axes

さいごに

Swift for TensorFlowで頭部姿勢推定やってみました。 まだ現時点では、モデルファイルの読込・保存などができるようになってはいないようなのでその辺りが整うと使っていけるような気がします。 何かの参考になれば幸いです。

参考