VerticaにおけるApache Kafka連携とは?
Verticaへのデータロードは、COPYコマンドを用いたCSVファイルのバルクロードで実施する方法が一般的です。INSERT文も使用できますが、Verticaのような列指向データベースは1件1件のINSERT処理よりも、ある程度まとまった件数をロードするバルクロードに向いています。
そのため、これまでのバージョンでは、IoT端末が生成する大量のログデータやTwitterのつぶやきデータ等の逐次発生するデータをリアルタイムで取り込むことは難しいというのが現実でした。
(ETLツール等を用いて、少量のデータを逐次COPYコマンドでロードするような仕組みを作れば実現できますが、そういった仕組みはデータ同期等のことを考えると複雑になりがちです)
Vertica 7.2では、オープンソース分散型メッセージングシステムの「Apache Kafka」(以下Kafka)のネイティブサポートを提供することで、これらのデータをニア・リアルタイムで取り込むことが可能になりました。
例えばTwitterのつぶやきデータを分析したい場合は、図のような構成で実現することができます。TwitterのつぶやきデータをApache Flumeやfluentdなどを使って収集し、Kafkaに送信します。Kafkaには送られてきたデータがキューとしてKafka Cluster内に次々と溜まっていきます。このキューに溜まっているデータの中から、分析に使用するデータをVerticaが一定間隔(図の場合は10秒間隔)で受け取りに行き、内部的にCOPYコマンドを使ってVerticaにロードします。
このように、INSERTではなくVerticaが得意とするCOPYコマンドを使用したバルクロードとして処理することで、ログデータ等の逐次発生するようなデータをVerticaにリアルタイムに取り込むことが可能です。ロードしたデータについては通常のテーブルに格納されるので、すぐに分析を行うことができます。
設定手順
以下にKafka連携機能を利用するための具体的な手順を記載します。
0.事前準備
事前に任意のサーバにKafkaをインストールし、起動しておいてください。
1.Kafka スケジューラの定義
vkconfig schedulerコマンドを使用して、接続先のKafkaサーバの情報や、Kafkaにデータを取得しに行く間隔等を指定します。
設定例)
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --add \
--config-schema kafka_config \
--username dbadmin \
--password XXXXX \
--dbhost 172.16.XX.XX \
--dbport 5433 \
--frame-duration 00:00:10 \
--brokers localhost:9092各パラメータの詳細は以下をご確認ください。
| 列名 | 内容 |
|---|---|
| add | 新しくスケジューラを定義する場合に使用します |
| config-schema | Kafka連携でスケジューラごとにVerticaが使用するスキーマを指定します。スキーマは自動で作成されるため事前に作成しておく必要はありません。 |
| username | Verticaに接続する際に使用するユーザ名を指定します |
| password | usernameに指定したユーザのログインパスワードを指定します |
| dbhost | 接続するVerticaのホスト名(IPアドレス)を指定します |
| dbport | 接続するVerticaのポート番号を指定します |
| frame-duration | VerticaがKafkaにデータを取得しにいく間隔(例の場合は10秒)を指定します |
| brokers | 接続するKafka brokerのホスト名、ポートを指定します |
2.データロード用のターゲットテーブルの作成
Kafkaから取得したデータをロードするためのテーブルを作成します。
通常のCREATE TABLE文を使用してテーブルを作成することができます。
例)
dbadmin=> CREATE TABLE public.messages(col1 VARCHAR(300));なお、ロードされるデータの内容が不明(半構造)な場合は、通常のテーブルではなく、Flex tableとして作成してください。
例)
dbadmin=> CREATE FLEX TABLE public.messages;3.Kafka Topicとターゲットテーブルの関連付け
KafkaではデータをTopicという単位で分類して格納しています。vkconfig topicコマンドを使用して取り込みたいKafkaのTopicとロード先のVerticaのテーブルとの関連付けを行います。
設定例)
$ /opt/vertica/packages/kafka/bin/vkconfig topic --add \
--config-schema kafka_config \
--username dbadmin \
--password XXXX \
--dbhost 172.16.XX.XX \
--dbport 5433 \
--target public.messages \
--topic test \
--parser KafkaParser \
--rejection-table public.kafka_rej \
--start-offset START各パラメータの詳細は以下をご確認ください。
| 列名 | 内容 |
|---|---|
| add | 新しくtopicとターゲットテーブルを関連付ける場合に指定します |
| config-schema | 「1.Kafka スケジューラの定義」のconfig-schemaで指定したスキーマと同名のスキーマを指定します |
| username | Verticaに接続する際に使用するユーザ名を指定します |
| password | usernameに指定したユーザのログインパスワードを指定します |
| dbhost | 接続するVerticaのホスト名(IPアドレス)を指定します |
| dbport | 接続するVerticaのポート番号を指定します |
| target | Verticaのターゲットテーブル(データのロード先)を指定します |
| topic | データを取得するKafkaのTopicを指定します |
| rejection-table | ロードに失敗したデータの情報を格納するテーブルを指定します |
4.Kafkaスケジューラの実行
設定した内容でKafka連携を開始するためにvkconfig launchコマンドを実行します。
本コマンドを実行すると、Kafka連携がスタートします。
設定例)
$ /opt/vertica/packages/kafka/bin/vkconfig launch \
--config-schema kafka_config \
--username dbadmin \
--password XXXX \
--dbhost 172.16.XX.XX \
--dbport 5433各パラメータの詳細は以下をご確認ください。
| 列名 | 内容 |
|---|---|
| config-schema | 「1.Kafka スケジューラの定義」のconfig-schemaで指定したスキーマと同名のスキーマを指定します |
| username | Verticaに接続する際に使用するユーザ名を指定します |
| password | usernameに指定したユーザのログインパスワードを指定します |
| dbhost | 接続するVerticaのホスト名(IPアドレス)を指定します |
| dbport | 接続するVerticaのポート番号を指定します |
なお、定義の中にはパスワードが含まれているため、実際に運用する際は設定内容をプロパティファイルに記述し、ファイルを指定して実行する方法が推奨されます。
例えば、vkconfig launchの場合は以下のように記述します。
$ /opt/vertica/packages/kafka/bin/vkconfig launch --conf configFile.propertiesconfigFile.propertiesファイルの記述内容
#config.properties:
config-schema kafka_config
username dbadmin
password test
dbhost 172.16.XX.XX
dbport 5433また、今回使用していないその他のオプションについては、マニュアルの以下のページをご参考ください。
Kafka Utility Options
動作確認
今回はKafkaに付属している「kafka-console-producer.sh」を使用してKafkaにデータを送信した場合の動作を確認します。
1.Kafkaへのデータ送信
「kafka-console-producer.sh」から以下のように「アシスト」「Vertica」「データベース」と、3つのメッセージを入力します。
# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
アシスト
Vertica
データベース2.Verticaのテーブルを検索
上記「2.データロード用のターゲットテーブルの作成」で作成したテーブルを検索すると、Kafkaに送信したデータがVerticaにロードされていることが確認できます。
dbadmin=> SELECT * FROM messages ;
col1
------------
アシスト
Vertica
データベース
(3 rows)なお、Kafka連携を終了する場合は以下のコマンドを実行します。
$ /opt/vertica/packages/kafka/bin/vkconfig shutdown検証バージョンについて
この記事の内容はVertica 7.2、Kafka 0.9で確認しています。
