こんにちは、iOSエンジニアのしだです。
前回、Swift-Jupyter について書いたのですが、本当はSwift for TensorFlowを使って頭部姿勢推定(Head pose estimation)をやりたかったのでこちらをやってみたいと思います。
頭部姿勢推定(Head pose estimation)
頭部姿勢推定は、カメラからみた頭の向きの推定を行うことです。 そのままですが、画像から頭の向きを pitch、 yaw、 roll の3つの値を予測することになります。
データセットの準備
データセットは tensorflow_datasets に含まれている300W-LP というものを使います。 このデータセットは、Face Alignment用のデータセットになっており、顔の向きだけではなく顔のランドマークデータも含んでいます。 今回は、その中の pitch、yaw、rollのデータを使います。
しかし、Swift for TensorFlow から直接 tensorflow_datasets を使うことができなかったため、 一度、Pythonからnumpyを使って、 .npy形式へ変換して使うことにします。 変換方法を以下のGistにおいておきましたので参考にしてください。
generate-dataset-from-the300w-lp-public.ipynb · GitHub
また、今回の実装は以下の論文がベースになっています。
準備
基本的に Swift-Jupyter 上で実装を進めていますが、swift-modelsというリポジトリの ResNetの実装をそのまま使わせてもらいます。
そのために、直接 ResNet50.swift
をダウンロードしておきます。
$ curl -O https://raw.githubusercontent.com/tensorflow/swift-models/master/Models/ImageClassification/ResNet50.swift
実装
インポート
最初に必要なソースコードを Swift-Jupyter 上で参照できるように、%include
を使って読み込んでおきます。
%include "EnableIPythonDisplay.swift" %include "ResNet50.swift" IPythonDisplay.shell.enable_matplotlib("inline")
import TensorFlow import Python let np = Python.import("numpy") let plt = Python.import("matplotlib.pyplot") let PIL = Python.import("PIL")
データセットの読み込み
作っておいたデータセットをnumpy.loadを使って読み込みます。
let data = np.load("data_20000.npy") let label = np.load("label_20000.npy") print("data.npy", data.shape, data.dtype) print("label.npy", label.shape, label.dtype)
data.npy (20000, 128, 128, 3) uint8
label.npy (20000, 3) float32
いくつか前処理を行います。ピクセル値を0 ~ 1にしたり、pitch、yaw、roll の値をone-hotベクトル化してます。 回転値を予測したいのですが、ここでは -90° ~ 90° の値を 3° 刻みにラベルを分割します。
let data2 = (data / 255.0).astype(np.float32) print(data2.shape, data2.dtype) let bins = np.arange(-90, 93, 3, dtype: np.int32) let bin_size = Int(bins.shape[0])! let bin_labels = np.digitize(label, bins).astype(np.int32) let label_onehot = np.eye(bin_size)[bin_labels].astype(np.int32) print(bin_size, bin_labels.shape, bin_labels.dtype) print(label_onehot.shape, label_onehot.dtype)
(20000, 128, 128, 3) float32
61 (20000, 3) int32
(20000, 3, 61) int32
Numpyの PythonObject をTensorに変換しておきます。以降はこの data、axes、labels のデータを使って学習します。
let data = Tensor<Float>(numpy: data2)! let axes = Tensor<Float>(numpy: label)! let labels = Tensor<Int32>(numpy: bin_labels)! print(data.shape, axes.shape, labels.shape) print(axes[0], labels[0], labels.shape)
[20000, 128, 128, 3] [20000, 3] [20000, 3]
[ 11.812507, 30.915585, -11.911684] [34, 41, 27] [20000, 3]
TensorGroupを継承したクラスを用意します。これは、入力データのパイプラインを作るときに必要になるクラスで、データの読み出しを定義します。 あとで Datasetと一緒につかうことで、入力データをバッチサイズごとに取り出したり、シャッフルしたりパイプラインを構築するのが容易になります。
public struct LabeledExample: TensorGroup { public var label: Tensor<Int32> public var axes: Tensor<Float> public var data: Tensor<Float> public init(label: Tensor<Int32>, axes: Tensor<Float>, data: Tensor<Float>){ self.label = label self.axes = axes self.data = data } public init<C: RandomAccessCollection>(_handles: C) where C.Element: _AnyTensorHandle { precondition(_handles.count == 3) let labelIndex = _handles.startIndex let axesIndex = _handles.index(labelIndex, offsetBy: 1) let dataIndex = _handles.index(labelIndex, offsetBy: 1) label = Tensor<Int32>(handle: TensorHandle<Int32>(handle: _handles[labelIndex])) axes = Tensor<Float>(handle: TensorHandle<Float>(handle: _handles[axesIndex])) data = Tensor<Float>(handle: TensorHandle<Float>(handle: _handles[dataIndex])) } }
モデルの定義
モデルを構築します。HeadPoseというLayerをベースモデルとして定義します。こちらは ResNet50.swift
の実装を参考にしています。
あと、pitch、yaw、roll の3つの軸を出力するために Axis というLayerを用意します。
Layer プロトコルは、callAsFunction を提供しておりこの関数にフォワード処理を記述します。@differentiable
と付けてあげればバックワード処理を記述することはありません。
public struct Axis: Layer { public var layer: HeadPose public var fc: Dense<Float> public init(inputSize: Int, outputSize: Int, backbone: HeadPose) { layer = backbone fc = Dense(inputSize: inputSize, outputSize: outputSize) } @differentiable public func callAsFunction(_ input: Tensor<Float>) -> Tensor<Float> { return softmax(fc(layer(input))) } } public struct HeadPose: Layer { public var l1: ConvBN public var maxPool = MaxPool2D<Float>(poolSize: (3, 3), strides: (2, 2)) public var l2a = ResidualBasicBlock(featureCounts: (64, 64, 64, 64)) public var l2b: ResidualBasicBlockStack public var l3a = ResidualBasicBlockShortcut(featureCounts: (64, 128, 128, 128)) public var l3b: ResidualBasicBlockStack public var l4a = ResidualBasicBlockShortcut(featureCounts: (128, 256, 256, 256)) public var l4b: ResidualBasicBlockStack public var l5a = ResidualBasicBlockShortcut(featureCounts: (256, 512, 512, 512)) public var l5b: ResidualBasicBlockStack public var avgPool = GlobalAvgPool2D<Float>() public init(numClass: Int, layerBlockCounts: (Int, Int, Int, Int)=(3, 4, 6, 3)) { l1 = ConvBN(filterShape: (7, 7, 3, 64), strides: (2, 2), padding: .same) maxPool = MaxPool2D(poolSize: (3, 3), strides: (2, 2)) l2b = ResidualBasicBlockStack( featureCounts: (64, 64, 64, 64), blockCount: layerBlockCounts.0) l3b = ResidualBasicBlockStack( featureCounts: (128, 128, 128, 128), blockCount: layerBlockCounts.1) l4b = ResidualBasicBlockStack( featureCounts: (256, 256, 256, 256), blockCount: layerBlockCounts.2) l5b = ResidualBasicBlockStack( featureCounts: (512, 512, 512, 512), blockCount: layerBlockCounts.3) } @differentiable public func callAsFunction(_ input: Tensor<Float>) -> Tensor<Float> { let inputLayer = maxPool(relu(l1(input))) let level2 = inputLayer.sequenced(through: l2a, l2b) let level3 = level2.sequenced(through: l3a, l3b) let level4 = level3.sequenced(through: l4a, l4b) let level5 = level4.sequenced(through: l5a, l5b, avgPool) return level5 } }
Lossの値をスタックするための変数を定義しています。エポック毎にLossの値を保持して、学習終了後にグラフとして表示するために用います。
var state = [ "pitch": [ "total": [Float](), "softmax": [Float](), "mse": [Float](), ], "yaw": [ "total": [Float](), "softmax": [Float](), "mse": [Float](), ], "roll": [ "total": [Float](), "softmax": [Float](), "mse": [Float](), ], ]
学習
モデル、データセットやオプティマイザを初期化していよいよ学習します。
// データセット let dataset = Dataset<LabeledExample>(elements: LabeledExample(label: labels, axes: axes, data: data)) let binTensor = Tensor<Int32>(numpy: bins)! let binSize = binTensor.shape[0] let bin = Tensor<Float>(binTensor) // ハイパーパラメータ let epochs = 40 let batchSize = 128 let alpha: Float = 2.0 // モデル var model = HeadPose(numClass: binSize, layerBlockCounts: (2, 2, 2, 2)) var pitch = Axis(inputSize: 512, outputSize: binSize, backbone: model) var yaw = Axis(inputSize: 512, outputSize: binSize, backbone: model) var roll = Axis(inputSize: 512, outputSize: binSize, backbone: model) let optimizerPitch = Adam(for: pitch, learningRate: 1e-4) let optimizerYaw = Adam(for: yaw, learningRate: 1e-4) let optimizerRoll = Adam(for: roll, learningRate: 1e-4)
学習は epoch回数分ループを回し、datasetからバッチサイズ分ロードすることでステップごとの学習ができます。 以下は冗長なのですが、3つの回転軸をそれぞれわかるように実装しています。
for epoch in 0..<epochs { Context.local.learningPhase = .training let ds = dataset.shuffled(sampleCount: 10000, randomSeed: Int64(1234)) for batch in ds.batched(batchSize) { let (images, axes, labels) = (batch.data, batch.axes, batch.label) let (pLoss, 𝛁pitch) = valueWithGradient(at: pitch) { pitch -> Tensor<Float> in let ŷ = pitch(images) let y = labels[TensorRange.ellipsis, 0] let loss = softmaxCrossEntropy(logits: ŷ, labels: y) let yAxis = axes[TensorRange.ellipsis, 0] let ŷAxis = (ŷ * bin).sum(squeezingAxes: 1) let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis) let totalLoss = loss + alpha * reg state["pitch"]?["total"]?.append(totalLoss.scalarized()) state["pitch"]?["softmax"]?.append(loss.scalarized()) state["pitch"]?["mse"]?.append(reg.scalarized()) return loss + alpha * reg } let (yLoss, 𝛁yaw) = valueWithGradient(at: yaw) { yaw -> Tensor<Float> in let ŷ = yaw(images) let y = labels[TensorRange.ellipsis, 1] let loss = softmaxCrossEntropy(logits: ŷ, labels: y) let yAxis = axes[TensorRange.ellipsis, 1] let ŷAxis = (ŷ * bin).sum(squeezingAxes: 1) let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis) let totalLoss = loss + alpha * reg state["yaw"]?["total"]?.append(totalLoss.scalarized()) state["yaw"]?["softmax"]?.append(loss.scalarized()) state["yaw"]?["mse"]?.append(reg.scalarized()) return totalLoss } let (rLoss, 𝛁roll) = valueWithGradient(at: roll) { roll -> Tensor<Float> in let ŷ = roll(images) let y = labels[TensorRange.ellipsis, 2] let loss = softmaxCrossEntropy(logits: ŷ, labels: y) let yAxis = axes[TensorRange.ellipsis, 2] let ŷAxis = (ŷ * bin).sum(squeezingAxes: 1) let reg = meanSquaredError(predicted: ŷAxis, expected: yAxis) let totalLoss = loss + alpha * reg state["roll"]?["total"]?.append(totalLoss.scalarized()) state["roll"]?["softmax"]?.append(loss.scalarized()) state["roll"]?["mse"]?.append(reg.scalarized()) return totalLoss } optimizerPitch.update(&pitch, along: 𝛁pitch) optimizerYaw.update(&yaw, along: 𝛁yaw) optimizerRoll.update(&roll, along: 𝛁roll) } print( """ [Epoch \(epoch)] \ Loss: pitch=\(state["pitch"]?["total"]?.last!), \ yaw=\(state["yaw"]?["total"]?.last!), \ roll=\(state["roll"]?["total"]?.last!)) """ ) }
学習にはGPU環境を利用しています。学習に時間がかかるのでご利用の際は気をつけてください。
2019-12-10 12:01:42.863078: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcudnn.so.7
2019-12-10 12:01:43.881228: I tensorflow/stream_executor/platform/default/dso_loader.cc:44] Successfully opened dynamic library libcublas.so.10.0
[Epoch 0] Loss: pitch=Optional(206.77942), yaw=Optional(1058.0618), roll=Optional(156.5832))
[Epoch 1] Loss: pitch=Optional(216.52313), yaw=Optional(729.75433), roll=Optional(132.02693))
[Epoch 2] Loss: pitch=Optional(223.95944), yaw=Optional(531.65314), roll=Optional(74.63479))
:
[Epoch 37] Loss: pitch=Optional(29.166101), yaw=Optional(43.509895), roll=Optional(12.510527))
[Epoch 38] Loss: pitch=Optional(29.051668), yaw=Optional(38.135098), roll=Optional(16.24944))
[Epoch 39] Loss: pitch=Optional(27.234924), yaw=Optional(27.397942), roll=Optional(15.582761))
学習中のLossをグラフとして出力します。
let steps = np.arange(0, step, 1) plt.figure(figsize: [6, 4]) plt.plot(steps, state["pitch"]!["total"]!, label: "pitch") plt.plot(steps, state["yaw"]!["total"]!, label: "yaw") plt.plot(steps, state["roll"]!["total"]!, label: "roll") plt.xlabel("Steps") plt.ylabel("Loass") plt.title("Training losses") plt.legend() plt.show()
推論
最後にレナさんの画像を使って、頭部姿勢推定してみます。
var image = PIL.Image.open("rena_crop.png").convert("RGB") image = np.array(image) var image_np = np.expand_dims(image, 0) image_np = (image_np / 255.0).astype(np.float32) print(image_np.shape) let input = Tensor<Float>(numpy: image_np)! let (p, y, r) = ((pitch(input) * bin).sum(squeezingAxes: 1), (yaw(input) * bin).sum(squeezingAxes: 1), (roll(input) * bin).sum(squeezingAxes: 1)) print(p, y, r)
(1, 128, 128, 3)
[-1.9520531] [0.3463745] [-0.5297375]
出力結果をもとに、pitch、yaw、rollの軸を描画しますと以下のようになります。 (残念ながら軸の描画はPythonで行っています。) 少しずれているような気がしますが、概ね合っているかなと思います。
さいごに
Swift for TensorFlowで頭部姿勢推定やってみました。 まだ現時点では、モデルファイルの読込・保存などができるようになってはいないようなのでその辺りが整うと使っていけるような気がします。 何かの参考になれば幸いです。