とある業務でとても悩んでいたことがあります。。。
簡単に説明すると、AWSのサービスEMRを使用してDynamoDBのテーブル、レコードへ更新処理を行っていました。 大量の更新があったので、DynamoDBの書込キャパシティ(WCU)を上げて、処理に耐えられるように設定。
いざ更新をかけたときにメトリックスを監視していると、ある一定の値で一定に。
「あれーーーーー....」
秒間ターゲット消費率が20%ほどで一定になって、想定していた時間で終わらない。 結果的には待っていると無事に処理が完了して更新も問題なかった。
ただ、「なんでキャパシティ上がらないねん!!」と、疑問が残って気持ち悪いかんじに。
とりあえず順をおってみていきましょうか。
まずEMRとは?
EMR (Amazon Elastic MapReduce)
大量のデータを迅速、容易に、かつコスト効果よく処理するためのウェブサービスです。 ログの分析、ウェブインデックス、データウェアハウジング、機械学習、財務分析、科学シミュレーション、 生物情報科学研究を含む、ビッグデータを確実かつ安全に処理します。
仕組み
Hadoopクラスターを実行するために使用できるサービスです。 Hadoopクラスターとはサーバーセットであり、このサーバーセットは連動して、サーバー間で処理およびデータを分散することにより計算タスクを実行する。 タスクの内容は、データの分析、データの格納、またはデータの移動と変換などが考えられる。 また、起動するとき、EC2によって提供される仮想サーバーでクラスターを実行する。
そもそもHadoopとは
巨大なデータをバッチ処理するための並列分散処理基盤である。 並列分散処理とは、その名の通り、1つの処理を分割して同時に処理を行うことである(数千台のサーバーを利用することが可能)。
そして、Hadoopとかググってると絶対でてくるこれ。
HDFS (Hadoop Distributed File System)
分散ファイルシステムのことである(複数のサーバーにファイルを分割して保持することで、巨大なファイルも扱える)。
"NameNode(マスターノード)"と"DataNode(スレーブノード)"の二つの構成要素から成る(= マスター・スレイブ構成)。 NameNodeは分散ファイルシステムのメタ情報を管理し、DataNodeはデータの実体を保存する。 ファイルを格納する場合、そのファイルを一定サイズで分割したデータを"ブロック"としてDataNodeで保存します。 DataNodeは故障に備え、ブロックは、複数のDataNodeにレプリカ(defaultで3つ)を保存する。 また、HDFSは保存しているファイルに対して追記のみ利用することができます。ファイルを更新して保存したい場合、 ローカル環境でファイルを更新したあと、更新したファイルをHDFSに保存して上書きすることになります。
MapReduce
分散閭里フレームワークである(= マスター・スレイブ構成)。 マスターノードである"JobTracker"とスレーブノードである"TaskTracker"で構成されている。 JobTrackerは、MapReduceジョブの管理やTaskTrackerへのタスクの割り当て、 TaskTrackerのリソース管理を役割としている。 TaskTrackerは、タスクの実行を役割としている。 (mapとreduceという処理を組み合わせて処理を行う) ローカリティと呼ばれ、JobTrackerがTaskTrackerにタスクを割り当てる際には、 そのTaskTrackerが動作しているサーバと同居しているDataNodeが管理しているデータを極力利用するように割り当てる(= サーバ間の通信量を極力抑える)。
Hive
Hadoop上にDWH(データウェアハウス)を構築するための基盤ソフトである。 HiveQLというSQL風の言語でMapReduceジョブに変換しHadoop上のデータを操作できる。
EMRへSSHして、$ hive
って打つとhiveプロンプトが立ち上がる。
非常に簡単です!
EMRで作業する前、ローカル環境にHadoop環境をつくってみましたが、(EMRクラスターは結構ちゃんとやろうとすると高額...) 「面倒くさい!」です。
いろんなツールやらなんやらをインストールしなければならないからです。
わたしの場合は、CentOS+Vagrantで仮想環境内で作りました。 Dockerで簡単に環境を作成できるものもありましたが、サポートがもう終了していました。
ただ、仕組みがほんの少しわかってよかったなー程度でした。
そして、HadoopやHiveの記事が非常に少ないです。。 日本語の記事はもっと少ない。
仮想環境でググった記事をもとに色々試してみましたが、まずコマンドがサポート終了していたりしました。
まあ、そんなかんじで序盤は手こずりながら試行錯誤で業務遂行のため計画を練りました。
そしていざ更新処理へ。
EMR内でDynamoDBの対象テーブルをマッピングさせて、テーブルを作成していました。 DynamoDBに負荷がかかるので、書込キャパシティを上げます。
Hiveプロンプトで以下を設定。
hive > SET hive.execution.engine=tez; hive > SET dynamodb.throughput.write.percent=0.9
これでOKと実行したわけです。
ですが、上記で説明した通り、想定通りのターゲット消費率が....
色々と調査したところ原因を掴みました。
(そんなんきちんとAWSのドキュメント読んだりEMRとかの内部をあらかじめ理解しておけばいいやん!なんて言わないでく.....)
まず以下を設定していました。
DynamoDB
- 最小WCU: 3,000
- 最大WCU: 10,000
- ターゲット消費率: 70%
プロビジョンドスループット
- dynamodb.throughput.read.percent=0.9
- dynamodb.throughput.write.percent=0.9
- hive.execution.engine=tez;
DynamoDBの設定を問題ありません。書込には3,000キャパシティで70%を消費し、 dynamodb.throughput.write.percent=0.9が想定なので、2700が使用可能なWCUです。
問題なのは、ジョブにおける各タスクへのキャパシティ割り当てでした。
EMRはHDFSやMapReduceがコアなコンポーネントとして用意されています。 その他にもさまざまなコンポーネントが存在していて、 例えば、EMRでDynamoDBをマッピングしてテーブル操作を扱うためには、DynamoDB connectorというコンポーネントが必要なようです。
これはEMR内にDynamoDBのテーブルをマッピングさせてCREATE TABLEしたときに、
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
こんなかんじでDynamoDB connectorを使用していました。
タスク自体は想定するMapタスクの数字を算出していましたが、実際に書込を行っていたタスクはその数字よりも低かったのです。
各タスクへのキャパシティ × タスク数 = 毎秒処理するレコード数
なるほどでした。
これらを求めるには、EMRから作成されるログファイルの中身を調査してみるといいでしょう。 実行している時間をメモっておけば心配ありません。 見慣れないログファイルかもしれませんが、よく見ると丁寧に情報がかいてありました。
そして、実行エンジンにはtezではなく、mr(MapReduce)を使用すること。
hive > SET hive.execution.engine=mr;
まとめる
想定通り最大限ターゲット消費率を使用するには、 MapタスクとDynamoDBStorageHandler 内で取得される Map 数を同等にしなければならない。