ibm mq...
Post on 19-Aug-2020
4 Views
Preview:
TRANSCRIPT
第4章
2016.11⽇本アイ・ビー・エム(株)
⽇本アイ・ビー・エム システムズ・エンジニアリング(株)
IBM Message Hub の紹介
IBM MQ V9.0アップデート・セミナー
©2016 IBM Corporation2
内容p IBM Message Hub for Bluemix
p 概要p 構成p 基本動作p メッセージの配信パターンp Message Hubサービスの作成p 制約事項p アプリケーション・インターフェース
Ø Kafka APIØ Kafka REST APIØ MQ Light API
p 他サービスとの連携例
p IBM Message Connect for Bluemixp 概要p 構成⼿順
-- 前提 --
当資料は下記バージョンのBluemixサービスを対象としています。・IBM Message Hub for Bluemix
公開⽇:2016/06/23・IBM Message Connect for Bluemix(試験版)
公開⽇:2016/02/10
当資料の内容は、2016年11⽉時点の情報を基に記載しています。
©2016 IBM Corporation3
IBM Message Hubfor Bluemix
©2016 IBM Corporation4
IBM Message Hub for Bluemix 概要p IBM Bluemix上で提供されるApache Kafkaベースの分散メッセージング・サービス
p 複数のアプリケーションやサービスを⾮同期に接続するハブとして利⽤p KafkaプロトコルやRESTなどのオープン・プロトコルを利⽤したアクセスが可能p 最新のMessage Hubでは、MQ Light APIもサポートし、MQ Lightのサーバーとしての機能も提供p Streaming AnalyticsサービスやApache Sparkサービスなどの分析基盤との連携も可能
p 提供機能p 分散メッセージング・プロバイダー機能
Ø Kafka API、Kafka REST API、MQ Light APIを使⽤したメッセージの送受信p トピック管理機能
Ø トピックの作成/削除、パーティション数の設定ü Bluemixダッシュボード/管理REST APIの提供
p モニタリングおよびロギング機能Ø 使⽤状況のサマリーØ 操作ログの表⽰
p セキュリティーØ Kafka API: SASL認証Ø Kafka REST API: APIキーによる認証Ø TLS 1.2 での接続が必須
Message Hub
REST
Streaming Analytics
Apache Spark
アプリケーション
Bluemix
アプリケーション
2016年11⽉現在Message Hubは、Kafka 0.10.0.1 をベースに構成されています。Kafkaクライアントは、0.9.0.0以降をサポートしています。
©2016 IBM Corporation5
参考:Apache Kafkap オープンソースの分散メッセージング・システム
p LinkedInにより開発され、2011年にApacheから公開Ø https://kafka.apache.org/
p ⼤量のログを⾼スループット/低レイテンシに収集/配信することを⽬的に開発された
p Apache Kafkaの4つの特徴p Fast: ⾼スループット/低レイテンシのメッセージング処理p Distributed by Design: 複数ブローカーからなるKafkaクラスターを構成p Scalable: Kafkaクラスター構成によるスケールアウトp Durable: データの損失を防ぐため、メッセージを複製し、Kafkaクラスター内で永続化
p データの送受信⽅式p プロデューサーとコンシューマーがトピックを介して
M対Nで通信するパブリッシュ/サブスクライブ・モデルp 特定のコンシューマーに対するP2P通信やメッセージの順序性
の保証も可能
KafkaCluster
Producer Producer Producer
Consumer Consumer Consumer
ServerServerBrokerTopic
©2016 IBM Corporation6
参考:Apache Kafkaの代表的なユースケースp Messaging
p ⼀般的なメッセージング⽤のブローカーとして使⽤p 送信・受信アプリケーションの⾮同期化、メッセージのバッファリングや⼀時保存 等
p Website Activity Trackingp Webサイト・アクティビティのトラッキングp リアルタイム・プロセッシング、リアルタイム・モニタリング、
Hadoopなどのオフラインのデータウェアハウスへのデータロードp Metrics
p 運⽤ログや統計情報などの集約p Log Aggregation
p ログファイルの集約p Stream Processing
p Kafka Streamによるストリーム・プロセッシング
KafkaCluster
サーバーAP#1
サーバーAP#2
サーバーAP#3
Broker#1 Broker#2 Broker#3 Broker#n・・・Topic
サーバーAP#n
分析基盤、データウェアハウスなど
・・・スケーラビリティが求められる
大量トランザクションの負荷分散処理を得意とする
©2016 IBM Corporation7
構成p Message Hubサービス
p Kafkaの実装としてメッセージの保管/配信、トピックの管理等を⾏うサービスp Bluemix上でユーザーがサービス・インスタンスを作成
Ø 内部的には、5つのブローカーによってKafkaクラスターが構成されている
p トピックp メッセージの送受信の対象となるオブジェクト
Ø ユーザーがサービス内に任意に作成
p プロデューサーとコンシューマーp メッセージの送受信を⾏うユーザー・アプリケーション
Ø プロデューサーはトピックに対してメッセージを送信Ø コンシューマーはトピックをサブスクライブし、メッセージを受信
p プロデューサー/コンシューマーは、Kafka API、Kafka REST APIで接続
BluemixMessage Hubサービス
KafkaCluster
Producer Producer
Consumer Consumer
BrokerTopic
最新のMessage HubではMQ Light APIのサーバーとしての機能も提供していますが、ここでは、Kafkaサーバーとしての構成、機能について説明しています。
©2016 IBM Corporation8
構成p トピックの構造
p トピックはそれぞれ独⽴した存在Ø JMSやMQTTのPub/Subトピックのように階層構造ではない
p トピックは1つ以上のパーティションで構成されるØ トピック作成時にパーティション数を設定
p パーティションp メッセージが保管される領域p メッセージには格納された順番にID(オフセット)が付与される
Ø オフセットはパーティションの中でユニークØ コンシューマーはオフセット順にメッセージを受信できる
p パブリッシュされた全てのメッセージは、パーティションに⼀定期間保存されるØ トピック作成時には存続時間も指定Ø 存続時間を過ぎたメッセージは⾃動的に消去される
ü メッセージがコンシューマーに受信されたかどうかは関係しないTopic A
Partition 0格納順にオフセットが付与される
Producer
Message A
Topic A
A
CB
D E
Kafka のトピック体系
JMSやMQTTのトピック体系
Topic B Topic C
A/B
A/C/D
下位のトピックを指定する場合は上位から順に辿って指定
Message A (Offset 0)Message B (Offset 1)
Message C (Offset 2)
Message BMessage C
Consumer
Partiton Partiton Partiton
©2016 IBM Corporation9
構成p 複数パーティション構成
p トピック作成時にパーティション数を指定p プロデューサーから送信されたメッセージは複数のパーティションに分散して格納される(ラウンドロビンで振り分け)
Ø プロデューサーはパーティションを明⽰的に指定して送信することも可能p コンシューマーは特定のパーティションからメッセージを受信
p パーティション毎にコンシューマーを⽤意することで分散並列処理を実現
Topic A
Partition 0
Producer
Message A
Message A (Offset 0)
Consumer
Partition 1 Partition 2Message B (Offset 0) Message C (Offset 0)
Message D (Offset 1) Message E (Offset 1)パーティション毎に格納順にオフセットが付与される
Consumer Consumer
Producer
Message BMessage C
Message DMessage E
©2016 IBM Corporation10
参考:Apache Kafkaのクラスター構成p Kafkaクラスター
p 複数のブローカーでクラスターを構成Ø 各ブローカーにトピックのパーティションが分散して配置される
ü トピック作成時、配置先が構成に応じて⾃動的に決められるØ パーティションは耐障害性のために複製される
p ブローカーの役割Ø ブローカーはリーダー/フォロワーの役割に分けられる
ü パーティションごとにリーダー/フォロワーが決められるü リーダーとして1つのブローカー、フォロワーとして0以上のブローカーが割り当てられる
Ø リーダーがメッセージの送信/受信のリクエストを受け付けるØ リーダーがダウンした場合、いずれかひとつのフォロワーが⾃動的にリーダーに昇格
KafkaCluster
Broker1 Broker2 Broker3
Topic
Partition0
Partition2
replica
Partition0
replica
Partition1
replica
Partition1 Partition2
Broker3がリーダーBroker1がフォロワー
複製 複製 複製
©2016 IBM Corporation11
基本動作p プロデューサー
p トピックに対し、キーとデータで構成されるメッセージを送信(キーは省略可)Ø パーティション番号を明⽰的に指定して送信することもできる
p キー、パーティションの指定の有無によって、メッセージの送信先が変わる① キーとパーティションの指定がない場合、メッセージは各パーティションにラウンドロビンで振り分けられる② パーティションは指定されず、キーが指定された場合、キーのハッシュ値に基いて⾃動的に決定されたパーティションに送信される
※同じキーを指定したメッセージは同じパーティションに送信される③ パーティションが指定された場合、(キーの指定有無に関係なく)指定されたパーティションに送信される
Topic A
Partition 0
Producer A
Partition 1 Partition 2
Producer B Producer C
① key: 指定なしpartition: 指定なし
② key: key_xpartition: 指定なし
③ key: 指定なし/key_ypartition: 2
キーのハッシュ値から送信先のパーティション番号が決定される同じキーを指定したメッセージは常に同じパーティションに送信される
©2016 IBM Corporation12
基本動作p コンシューマー
p Message Hubに接続し、メッセージを受信するアプリケーションの1つ1つをコンシューマー・インスタンスと呼ぶØ それぞれのインスタンスがトピックのサブスクライブ指定やメッセージの受信を⾏う
p コンシューマー・インスタンスは、接続時に指定したグループIDによって、コンシューマー・グループを形成するØ 同じグループIDを指定したインスタンス同⼠は、同じグループに属するØ グループはMessage Hubサービス側で認識されるため、異なるマシン上で稼働するインスタンス同⼠でも同じグループを所属できるØ グループIDを明⽰的に指定しないインスタンスにもグループIDは付与され、1つのグループに属することになる
ConsumerInst. A1 ConsumerInst. A2 ConsumerInst. A3ConsumerGroup A GroupID:A GroupID:A GroupID:A
CI B1 CI B2CG BGroupID:B GroupID:B
CI X1CG XGroupID:なし
CI Y1
GroupID:なし
Message Hubサービス
Topic A
©2016 IBM Corporation13
基本動作p コンシューマーがトピックを指定してサブスクライブを⾏うと、⾃⾝が受け持つパーティションがアサインされる
Ø グループ内では、トピックを構成する全てのパーティションがそれぞれのインスタンスにアサインされるü 1つのインスタンスが複数のパーティションを受け持つことはあるが、複数のインスタンスが1つのパーティションを受け持つことはない
Ø グループ間では、1つのパーティションが複数の(グループの異なる)インスタンスにアサインされるØ インスタンスのサブスクライブ/アンサブスクライブ、接続/切断によってアサインは動的に変わる(リバランス)
p 各インスタンスは、⾃⾝にアサインされたパーティションからメッセージを受信
p グループ内のインスタンス同⼠はそれぞれ⾃分がアサインされたパーティションからしかメッセージを受信しないため、トピックに送信されたメッセージがグループ内のインスタンスに分散されることになる(負荷分散処理)
p 異なるグループのインスタンス同⼠は同じパーティションから同じメッセージを受信することになる(同報配信型メッセージング)
ConsumerInst. A1 ConsumerInst. A2 ConsumerInst. A3ConsumerGroup A
CI B1 CI B2CG BGroupID:A GroupID:A GroupID:A GroupID:B GroupID:B
Message Hubサービス
Topic APartition 0 Partition 1 Partition 2
©2016 IBM Corporation14
メッセージの配信パターンp パーティションやコンシューマーの構成によって、メッセージの配信パターンを変えることができる
p パターン1 負荷分散型の配信p パターン2 同報配信p パターン3 メッセージの順序性を保証した配信
©2016 IBM Corporation15
メッセージの配信パターンp パターン1 負荷分散型の配信
p ⼤量のメッセージを複数のコンシューマーで分散して処理させたいケースØ トピックには、複数のパーティションを構成Ø コンシューマーは、同じコンシューマー・グループに複数のインスタンスを構成Ø インスタンス数とパーティション数を同じにすることで各インスタンスに均等にメッセージを分散できる
Topic A
Partition 0
Producer B
Message A (Offset 0)Partition 1 Partition 2
Message B (Offset 0) Message C (Offset 0)Message D (Offset 1) Message E (Offset 1)
Consumer Inst. A Consumer Inst. B Consumer Inst. CConsumerGroupA
Producer A
Message F (Offset 1)
A BED
CF
©2016 IBM Corporation16
メッセージの配信パターンp パターン2 同報配信
p 複数のコンシューマーに同じメッセージを配信したいケースØ トピックは、1つ、もしくは複数のパーティションで構成Ø コンシューマーは、インスタンス毎に異なるグループIDを指定し、複数のグループを構成Ø 各インスタンス(グループ)に同じメッセージが配信される
Producer B
Consumer Inst. A Consumer Inst. BConsumerGroupA
Producer A
ConsumerGroupB Consumer Inst. CConsumer
GroupC
Topic A
Partition 0Message A (Offset 0)Message B (Offset 1)
AB
AB
AB
©2016 IBM Corporation17
メッセージの配信パターンp パターン3 メッセージの順序性を保証した配信
p 全てのメッセージの順序性を保証したいケースØ トピックは、1つのパーティションで構成Ø コンシューマーは、1つのインスタンスを構成
Topic A
Partition 0
Producer
Message A (Offset 0)Message B (Offset 1)
Message C (Offset 2)
Consumer
p 特定の複数のメッセージの順序性を保証したいケースØ 複数のメッセージを論理的にグルーピングし、グループ内では
メッセージの順序性は保証するが、グループ間では順序性を保証しなくてよいケース
Ø プロデューサーがキーを利⽤してメッセージをグルーピングすることでパーティションやコンシューマー・インスタンスを複数構成することも可
Topic A
Partition 0key:A msg01 (Offset 0)key:A msg02 (Offset 1)
key:A msg03 (Offset 2)
Partition 1key:B msg01 (Offset 0)
key:B msg02 (Offset 2)
Consumer Inst. A Consumer Inst. BConsumerGroup A
Producer同じキーを指定したメッセージは同じパーティションに送信される
key:C msg01 (Offset 1)
key:C msg02 (Offset 3)
©2016 IBM Corporation18
Message Hubサービスの作成p ユーザーは、各Bluemixスペースにつき1つのMessage Hubサービスのインスタンスを作成可能
p IBM Bluemixにログインし、カタログからMessage Hubサービスを選択
「アプリケーション・サービス」カテゴリー
©2016 IBM Corporation19
Message Hubサービスの作成p 任意の名前を指定し、作成(Create)を押す
デフォルトで⼀意の名前がセットされる
©2016 IBM Corporation20
Message Hubサービスの作成p 作成されたMessage Hub サービスが表⽰される
©2016 IBM Corporation21
Message Hubサービスの作成p トピックの作成
+ボタンを押してトピックを作成
トピック名を指定(最⼤100⽂字)
パーティション数(区分)と存続時間(Retention)も指定
保存(Save)を押して完了
©2016 IBM Corporation22
Message Hubサービスの作成p トピックの作成
作成したトピックが表⽰される
©2016 IBM Corporation23
Message Hubサービスの作成p VCAP_SERVICES(接続情報、認証情報)の確認
Ø アプリケーションが利⽤(後述)
「サービス資格情報」をクリック
「資格情報の表⽰」をクリック
©2016 IBM Corporation24
Message Hubサービスの作成p VCAP_SERVICES(接続情報、認証情報)の確認
APIからアクセスするための接続情報、認証情報が表⽰される
©2016 IBM Corporation25
Message Hubサービスの作成p VCAP_SERVICES(接続情報、認証情報)の確認
Ø サンプル
{"mqlight_lookup_url": "https://mqlight-lookup-prod02.messagehub.services.eu-
gb.bluemix.net/Lookup?serviceId=fb67ef9c-b361-437e-a143-a30cf8b00f01","api_key": "OquOVUuXMGujHmsy8mtpJte51MtgYkOtRi1a2OoT2mJUCaD0","kafka_admin_url": "https://kafka-admin-prod02.messagehub.services.eu-gb.bluemix.net:443","kafka_rest_url": "https://kafka-rest-prod02.messagehub.services.eu-gb.bluemix.net:443","kafka_brokers_sasl": ["kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093","kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093","kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093","kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093","kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093”
],"user": "OquOVUuXMGujHmsy","password": "8mtpJte51MtgYkOtRi1a2OoT2mJUCaD0”
}
Kafka APIの接続情報として利⽤(5つのブローカーが構成されていることが分かる)
Kafka API、MQLight APIの認証情報として利⽤
Kafka REST APIの接続情報として利⽤
Kafka REST APIの認証情報として利⽤
MQLight APIの接続情報として利⽤
©2016 IBM Corporation26
制約事項p Bluemix PublicのMessage Hubサービスの制約事項
項⽬ 制約サービス・インスタンス数 各Bluemixスペースに作成できるMessage Hubサービスのインスタンスは1つトピック、パーティション トピック名は最⼤100⽂字
各Bluemixスペースに構成できるパーティションは合計で100個(1トピックに1つのパーティションを設定した場合、100トピックまで作成可)
メッセージの保存 メッセージはデフォルト24 時間保存され、各パーティションは1GB が上限1GBに達したら、最も古いメッセージから破棄される保管期限は1時間〜720時間(30⽇)まで設定可能
©2016 IBM Corporation27
アプリケーション・インターフェースp Message Hubサービスでは、メッセージを送受信するための3種類のAPIをサポート
p Kafka APIp Kafka REST APIp MQ Light API
©2016 IBM Corporation28
Kafka APIp Apache Kafkaが提供するクライアント・アプリケーション⽤API
p Kafka REST APIより⾼機能で、⾼レスポンス、⾼スループットp Message Hubでの使⽤前提
p Apache Kafka Client 0.9.0.0 〜 0.10.0.1Ø 最新版のKafka 0.10.0.1 Javaライブラリは以下から⼊⼿可能
ü https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz
p 接続、認証p VCAP_SERVICESのkafka_brokers_sasl、user、passwordを使⽤p クライアントの認証⽅式は、SASL PLAINを前提
Ø Kafkaでは、0.10.0.0からSASL PLAINをサポートü 0.9.0.0のクライアントは、Message Hubが提供するログイン・モジュール(Java)を使⽤する必要があった(つまり、Javaクライアントのみ接続可)
• https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-login-libraryü 0.10.0.0以降のクライアントは、ライブラリがSASL PLAINをサポートしていれば、Java以外でもMessage Hubに接続可能
• https://github.com/edenhill/librdkafka/• https://github.com/confluentinc/confluent-kafka-python
p サンプルp Kafka APIを使⽤してメッセージ送受信を⾏うサンプルをGitHubにて提供
Øhttps://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl
©2016 IBM Corporation29
Kafka APIp Kafka API の提供パッケージ
p 主に以下のパッケージを利⽤
p 詳細はJavadocを参照Ø http://kafka.apache.org/0100/javadoc/index.html
パッケージ 内容
org.apache.kafka.clients.consumer Consumer関連のクラスを提供
org.apache.kafka.clients.producer Producer関連クラスを提供
org.apache.kafka.common 共通モジュールの提供
org.apache.kafka.common.errors Kafka関連のExceptionクラスを提供
org.apache.kafka.common.serialization 各種シリアライザーのクラスを提供
©2016 IBM Corporation30
Kafka APIp アプリケーションの実装(0.10.0.0以降のJavaクライアントの例)
p 開発/実⾏環境のクラスパスにApache Kafka APIライブラリを追加
p JAAS構成プロパティの設定Ø ログイン・モジュールの指定、VCAP_SERVICESから参照したユーザー名、パスワードの認証情報を⼊⼒
Ø 以下はjaas.conf ファイルに指定した場合
public static void main(String args[]) {userDir = System.getProperty("user.dir");resourceDir = userDir + File.separator + "resources";
// Set JAAS configuration property.System.setProperty("java.security.auth.login.config", resourceDir + File.separator + "jaas.conf");・・・
JAAS構成プロパティの設定
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredserviceName="kafka"username=“xxxx”password=“xxxx";
};
jaas.conf
jaas.confファイルの読み込み
Message HubサービスのVCAP_SERVICESからuserとpasswordの値をセット
©2016 IBM Corporation31
Kafka APIp プロデューサー⽤プロパティの設定
Ø producer.propertiesファイルに設定した場合
Ø 最低限必要な設定項⽬
Ø 詳細は、下記参照ü http://kafka.apache.org/documentation.html#producerconfigs
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializervalue.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9093security.protocol=SASL_SSLssl.protocol=TLSv1.2sasl.mechanism=PLAIN
項⽬ 設定内容key.serializer キーの値をシリアライズするためのSerializerクラスを指定value.serializer バリュー(データ)の値をシリアライズするためのSerializerクラスを指定
bootstrap.serversMessage Hubサービス(Kafkaブローカー)の接続先情報(ホスト名とポート番号)を指定カンマ区切りで複数の接続先を指定可能(hostname1:port1,hostname2:port2,...)Message HubサービスのVCAP_SERVICESのkafka_brokers_saslの値をセット
security.protocol SASL_SSLを指定ssl.protocol TLSv1.2を指定sasl.mechanism PLAIN を指定
Message HubサービスのVCAP_SERVICESからkafka_brokers_saslの値をセット
©2016 IBM Corporation32
Kafka APIp propertiesファイルの読み込み例
public static final Properties getConfiguration(String fileName) {Properties props = new Properties();InputStream propsStream;
try {propsStream = new FileInputStream(resourceDir + File.separator + fileName);props.load(propsStream);propsStream.close();
} catch (IOException e) {return props;
}return props;
}
Propertiesの読み込み
©2016 IBM Corporation33
Kafka APIp 基本的なプロデューサーの実装例
public static void produceMessage(){KafkaProducer<byte[], byte[]> kafkaProducer =
new KafkaProducer<byte[], byte[]>(KafkaSample.getConfiguration(“producer.properties”));
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(“topicname”,“key”.getBytes(“UTF-8”),“test message”.getBytes(“UTF-8”));
RecordMetadata m = kafkaProducer.send(record).get();
System.out.println("Message produced, offset: " + m.offset());kafkaProducer.close();
}
KafkaProducerを⽣成プロパティ設定の内容を基にMessage Hubサービスに接続
ProducerRecordの⽣成送信するトピックとキー、メッセージをセット
メタデータからメッセージのオフセットを取得
メッセージを送信し、結果のメタデータを取得
©2016 IBM Corporation34
Kafka APIp コンシューマー⽤プロパティの設定
Ø consumer.propertiesファイルに設定した場合key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializervalue.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializerbootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9093enable.auto.commit=falsegroup.id=testgroupauto.offset.reset=earliestsecurity.protocol=SASL_SSLssl.protocol=TLSv1.2sasl.mechanism=PLAIN
Message HubサービスのVCAP_SERVICESからkafka_brokers_saslの値をセット
©2016 IBM Corporation35
Kafka APIØ 最低限必要な設定項⽬
Ø 詳細は下記参照ü http://kafka.apache.org/documentation.html#consumerconfigs
項⽬ 設定内容key.deserializer バイト列からキーに変換する際に⽤いられるDeserializerクラスを指定value.deserializer バイト列からバリュー(データ)に変換する際に⽤いられるDeserializerクラスを指定
bootstrap.servers Kafkaブローカーのホスト名とポート番号をカンマ区切りで指定クラスター内のいずれかひとつのブローカーを指定
enable.auto.commit 定期的にConsume済みのメッセージをコミットするかどうかを指定
group.id コンシューマーが属するコンシューマー・グループを指定する(指定なしの場合空⽂字)group.id が同じプロセス(スレッド)が、コンシューマー・グループのインスタンスになる
security.protocol SASL_SSLを指定ssl.protocol TLSv1.2を指定auto.offset.reset earliest/latestを指定(サーバー側にコミット済みのOffsetが存在しない場合の動作を指定)sasl.mechanism PLAIN を指定
©2016 IBM Corporation36
Kafka APIp 基本的なコンシューマーの実装例
public static void consumeMessage(String topic){KafkaConsumer<byte[], byte[]> kafkaConsumer
= new KafkaConsumer<byte[], byte[]>(KafkaSample.getConfiguration("consumer.properties"));
ArrayList<String> topicList = new ArrayList<String>();topicList.add(topic);kafkaConsumer.subscribe(topicList);
final int WAIT_TIME = 10000;Iterator<ConsumerRecord<byte[], byte[]>> it = kafkaConsumer.poll(WAIT_TIME).iterator();while (it.hasNext()) {
ConsumerRecord<byte[], byte[]> record = it.next();final String message = new String(record.value(),Charset.forName("UTF-8"));System.out.println("Message: " + message);
}kafkaConsumer.commitSync();
}
トピックのリストを指定し、サブスクライブ
取得したデータの表⽰
タイムアウトを指定し、メッセージをポーリング
kafkaConsumerのコミット
KafkaConsumerを⽣成プロパティ設定の内容を基にMessage Hubサービスに接続
©2016 IBM Corporation37
Kafka REST APIp RESTインフェース経由でメッセージの送受信を⾏うAPI
p Webベースの技術が使⽤可能であるため、Kafka APIに⽐べ実装の敷居は低いp スループットが重要視されないユース・ケースで有効
Ø デバッグおよび障害調査時などp 要求と応答ではバイナリー埋め込み形式のみがサポート
Ø JSON/Avro 埋め込み形式はサポートされない
p 接続、認証p VCAP_SERVICESのkafka_rest_url、api_keyを使⽤
Ø 要求のX-Auth-Tokenヘッダーにapi_keyを指定
p サンプルp メッセージを作成およびコンシュームするNode.jsアプリケーションをGitHubにて提供
Ø https://github.com/ibm-messaging/message-hub-samples/tree/master/nodejs/bluemix-chat-sample
©2016 IBM Corporation38
Kafka REST APIp Kafka REST APIにおける基本操作 ⼀覧
p 詳細は下記参照Ø http://docs.confluent.io/2.0.0/kafka-rest/docs/api.html
URL メソッド 説明/topics GET トピック・リストの取得/topics/(string: topic_name) GET 特定のトピック情報の取得/topics/(string: topic_name) POST メッセージの送信/topics/(string: topic_name)/partitions GET 特定のトピックに作成されたパーティションのリストを取得/topics/(string: topic_name)/partitions/(int: partition_id) GET 特定パーティションの情報を取得GET /topics/(string: topic_name)/partitions/(int: partition_id)/messages?offset=(int)[&count=(int)]
GET 特定パーティションからメッセージを受信
/topics/(string: topic_name)/partitions/(int: partition_id) POST 特定パーティションにメッセージを送信/consumers/(string: group_name) POST コンシューマー・グループにインスタンスを作成/consumers/(string: group_name)/instances/(string: instance)/offsets POST オフセットのコミット/consumers/(string: group_name)/instances/(string: instance) DELETE コンシューマー・インスタンスの削除/consumers/(string: group_name)/instances/(string: instance)/topics/(string: topic_name)
GET トピックからメッセージを受信
/brokers GET ブローカーのリストを取得
©2016 IBM Corporation39
Kafka REST APIp プロデューサーによるメッセージ送信
Ø curl コマンドによる送信例(2件のメッセージを送信)
$ curl -X POST -H 'X-Auth-Token:018yshSjKbA3nkHJzuVZAZH9tWNie9VmOyAHVhwsa8cUO' -H 'Content-Type:application/vnd.kafka.binary.v1+json' --data '{"records":[{"value":"dGVzdCBtZXNzYWdlMQo="},{"value":"dGVzdCBtZXNzYWdlMgo=","partition":0}]}' https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/topics/T1
(レスポンス){"offsets":[{"partition":0,"offset":16,"error_code":null,"error":null},{"partition":0,"offset":17,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}
設定項⽬ 説明メソッド POSTURL <Kafka REST URL>/topics/<Topic Name>ヘッダー X-Auth-Token: <API Key>
Content-Type: application/vnd.kafka.binary.v1+json送信JSONデータ records: トピックに送信するJSONオブジェクトのリスト。以下の項⽬を指定する
records[i].key: (Optional)メッセージのキー⽂字列 String(base64 エンコード)records[i].value:メッセージデータ String(base64 エンコード)records[i].partition:(Optional) パーティション番号 Integer
©2016 IBM Corporation40
Kafka REST APIp コンシューマーによるメッセージの受信
p コンシューマー・インスタンスの作成
Ø curlコマンドによる実⾏例$ curl -X POST -H 'X-Auth-Token:018yshSjKbA3nkHJzuVZAZH9tWNie9VmOyAHVhwsa8cUO' -H 'Content-Type:application/vnd.kafka.binary.v1+json' --data '{"name":"my_consumer","format":"binary","auto.offset.reset":"smallest","auto.commit.enable":true}' https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/group1
(レスポンス){"instance_id":"my_consumer","base_uri":"https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net/consumers/group1/instances/my_consumer"}
設定項⽬ 説明メソッド POSTURL <Kafka REST URL>/consumers/<Consumer Group Name>ヘッダー X-Auth-Token: <API Key>
Content-Type: application/vnd.kafka.binary.v1+json送信JSONデータ name: ユニークなコンシューマー・インスタンス名 string
省略時は⾃動的にネーミングされるformat: binary指定のみauto.offset.reset: “smallest” or “largest” stringauto.commit.enable: true or false boolean
©2016 IBM Corporation41
Kafka REST APIp メッセージ受信
Ø curlコマンドによる実⾏例$ curl -X GET -H 'X-Auth-Token:018yshSjKbA3nkHJzuVZAZH9tWNie9VmOyAHVhwsa8cUO' -H 'Accept:application/vnd.kafka.binary.v1+json' https://kafka-rest-prod01.messagehub.services.us-south.bluemix.net:443/consumers/group1/instances/my_consumer/topics/T1
(レスポンス)[{"key":null,"value":"dGVzdCBtZXNzYWdlMQo=","partition":0,"offset":181},{"key":null,"value":"dGVzdCBtZXNzYWdlMgo=","partition":0,"offset":182}]
URL 説明メソッド GETURL <Kafka REST URL>/consumers/<Consumer Group Name>/instances/<Instance
Name>/topics/<Topic Name>ヘッダー X-Auth-Token: <API Key>
Accept: application/vnd.kafka.binary.v1+json
©2016 IBM Corporation42
MQ Light APIp AMQPベースのMQ Light API からMessage Hub サービスに接続可能
p MQ Light APIでは、Java、Node.js、Python、Rubyをサポートp 前提
p Message Hub サービスに”MQLight”トピックが作成済みであることØ パーティションは1つであることが必須Ø MQ Lightアプリケーションは任意の階層型のトピック・ストリングを指定してメッセージをPub/Subできる
ü サブスクライブでは、ワイルド・カードの使⽤も可能Ø Kafka API/Kafka REST APIとは”MQLight”トピックを介してメッセージの送受信を⾏うことができる
p 接続、認証p VCAP_SERVICES の mqlight_lookup_url、user、passwordを使⽤p TLS v1.2の利⽤が必須
p 詳細は下記リンクを参照p https://console.ng.bluemix.net/docs/services/MessageHub/messagehub075.html#messagehub075
Ø 現時点では、⽇本語表⽰ではMQ Light API の記載は表⽰されない(英語で参照)
p サンプルp Webのフロントエンド・アプリと受信したメッセージを⼤⽂字にして返すバックエンド・アプリを提供
Ø https://github.com/ibm-messaging/message-hub-samples/tree/master/mqlight
©2016 IBM Corporation43
MQ Light APIp MQ Light APIからMessage Hubサービスへの接続⽅法(Java Scriptの例)
VCAP_SERVICES{ "credentials": {
"mqlight_lookup_url": "https://mqlight-lookup.messagehub.services.us-south.bluemix.net/Lookup?serviceId=584e8436-e7f5-43db-96ac-2864fccae5ae",(省略)"user": "d9JSx1SYsmLzNRbb","password": "gUFneDm2DtkedlVeViObYJIvrPAf2kJA”
}}
var mqlight = require('mqlight');
var opts = {service: 'https://mqlight-lookup.messagehub.services.us-south.bluemix.net/Lookup?serviceId=584e8436-e7f5-43db-96ac-2864fccae5ae',user: 'd9JSx1SYsmLzNRbb',password: 'gUFneDm2DtkedlVeViObYJIvrPAf2kJA',id: ʻclient01ʼ
};
var client = mqlight.createClient(opts);(以下、省略)
MQ LightアプリケーションVCAP_SERVICESの情報をmqlight.createClientのオプションに指定Serviceにmqlight_lookup_rulの値を指定
Idの指定はオプション指定しない場合はユニークなIDが⾃動的に付与される指定した場合は、IDが重複していると先に接続していた⽅が切断されるため、必ずユニークとするよう注意が必要
以降は通常のMQLightAPIと同様のコーディングが可能
©2016 IBM Corporation44
MQ Light APIp MQ Light APIを利⽤する際の制約
p メッセージの保管期間は24時間p 1つのメッセージの最⼤サイズは1MB(ヘッダー部分は除く)p 同時に接続できるクライアントは25までp 同時にアクティブにできる宛先は25
Ø アクティブな宛先とは以下のことを指すØ TimeToLiveが0より⼤きい宛先(クライアントが接続している、接続していないに関わらず)Ø TimeToLiveが0でクライアントが接続している宛先
ü 複数のクライアントで共有されている宛先は1つの宛先と考える
©2016 IBM Corporation45
他サービスとの連携例1: 分析データをアプリに反映p Apache Sparkによるマイクロバッチ処理でTwitterデータを収集&Tone Analyzerで感情分析し、
結果をメッセージング・エンジンを経由してWebアプリに反映
Message Hub(Apache Kafka)
Watson ToneAnalyzer
Twitter APIでつぶやきを収集
Apahce Sparkでマイクロバッチ処理
subscribe
Publish(REST API)
分析基盤
Web基盤
©2016 IBM Corporation46
他サービスとの連携例1 - 組み合わせ詳細p ランタイム
p [1] Spark-Twitter-Watson-Dashboard (IBM SDK for Node.js)Ø [役割] MessageHubに格納されたTwitter情報を収集し、定期的に画⾯表⽰Ø [リンク] https://github.com/ibm-cds-labs/Spark-Twitter-Watson-Dashboard
p サービスp [A] MessageHub
Ø [役割] Apache Sparkからの分析結果を収集p [B] Watson Tone Analyzer
Ø [役割] Twitterのつぶやきデータのセンチメント分析を実施p [C] Apache Spark
Ø [役割] Twitterデータを収集し、マイクロ・バッチ処理で分析を実施p [D] Object Storage
Ø [役割] Apache Sparkの処理結果の⼀時保管⽤に⽤意
p その他p Twitter APIアカウント
Ø Twitter社から発⾏
©2016 IBM Corporation47
他サービスとの連携例2: 収集したデータを分析p モバイル・デバイスのデータをNode.jsアプリケーション経由でIoT Platformに収集、そのデータを定期的に
Javaバッチ経由でMessageHubに転送し、Apache Sparkでデータの分析をマイクロバッチ経由で実施
モバイル・デバイス iot-html5-phone
デバイスデータの収集
認証情報の格納
Cloudant NoSQL DB
Connector-MessageHubIoT Platform MessageHub Apache Spark
MQTTでデータ送信
収集 2秒ごとに収集トピックに格納
収集したデータをもとにバッチ処理でデータを加⼯
©2016 IBM Corporation48
他サービスとの連携例2 - 組み合わせ詳細p ランタイム
p [1] iot-html5-phone (IBM SDK for Node.js)Ø [役割] デバイスからの情報収集を⾏い、IoT Platformに格納Ø [リンク] https://github.com/ibm-messaging/iot-html5-phone
p [2] Watson IoT MessageHub Connector (Java)Ø [役割] IoT Platformからデータを受信し、MessageHubへデータを送信するØ [リンク] https://github.com/ibm-watson-iot/connector-messagehub
p サービスp [A] IoT Platform
Ø [役割] モバイル・デバイスからセンサー情報を収集する ([1]で利⽤)p [B] Cloudant NoSQL DB
Ø [役割] モバイルからのアクセス時の認証情報を格納する ([1]で利⽤)p [C] MessageHub
Ø [役割] 定期的に処理するモバイル・デバイスのデータを⼀時的に格納する ([2]で利⽤)p [D] Apache Spark
Ø [役割] MessageHubから受信したデータをもとに、分析処理をマイクロバッチで⾏う
©2016 IBM Corporation49
参考資料p [dW]
p IBM Message HubØ https://developer.ibm.com/messaging/message-hub/
p [Bluemix]p Message Hubの概要 ※最新の情報は英語表⽰で確認
Ø https://new-console.ng.bluemix.net/docs/services/MessageHub/index.html#messagehub
p [InterConnect]p HHM-3499: Introducing IBM Message Hub: Cloud-Scale Messaging Based on Apache Kafka
Ø https://www-304.ibm.com/events/tools/interconnect/2016ems/REST/presentations/PDF/InterConnect2016_3499.pdf
p [GitHub]p ibm-messaging/message-hub-samples
Ø https://github.com/ibm-messaging/message-hub-samplesp ibm-messaging/iot-messgehub-spark-samples
Ø https://github.com/ibm-messaging/iot-messgehub-spark-samples
p [MQ Light]p https://developer.ibm.com/messaging/mq-light/
p [Apache Kafka]p http://kafka.apache.org/0100/javadoc/index.htmlp http://www.confluent.io/blog/a-comprehensive-open-source-rest-proxy-for-kafka/p http://kafka.apache.org/documentation.html
©2016 IBM Corporation50
IBM Message Connectfor Bluemix
©2016 IBM Corporation51
IBM Message Connect for Bluemix 概要p Message HubサービスとオンプレミスのMQキューマネージャーを連携するためのサービス
p 現時点では、試験版として⽶国南部のサービスでのみ提供p Secure Gatewayサービスを利⽤してオンプレミスのMQとセキュアな通信を実現p Message Connectで作成するストリームを介して、MQのトピックとMessage Hubのトピックを紐付ける
Ø ストリームは、MQとはAMQPプロトコルで接続し、Message HubとはKafkaプロトコルで接続Ø 現時点では、MQからMessage Hubへの⽅向のメッセージ送信のみサポート
Bluemix
Message Hub Message Connect Secure Gateway
On-Premise
Secure GatewayClient
Secure Tunnel
TopicREST
Kafka API
Kafka REST API
MQ Light
MQ
JMS
MQLight
StreamTopic
MQ
©2016 IBM Corporation52
参考:IBM Secure Gateway for Bluemixp 他クラウド/オンプレミス上のリソースとセキュアに通信するための連携サービス
p セキュア・トンネル(websocketトンネル)で安全かつ容易に拠点間を接続できるp 接続したい拠点にSecure Gatewayクライアントを導⼊/構成し、Bluemixと接続
Ø 提供タイプは3種類 (Docker, DataPower, ネイティブ・クライアント)p ネットワーク・レイテンシーや通信の暗号化にかかるコストにより、アプリのパフォーマンスに多少の影響が出る点に注
意が必要
他クラウド or オンプレミスBluemix
appセキュア・トンネル 既存リソース
SGゲートウェイ SGクライアント
ランタイム
セキュアな通信を保証
Client TLS設定で暗号化プロトコル指定
で暗号化
関係ないアプリの通信の遮断が可能
Port 4439000で通信
©2016 IBM Corporation53
Message Connectの構成⼿順p Message Connectの構成⼿順
p 以下の構成、設定を前提とした⼿順を⽰す
Bluemix
Message Hub Message ConnectSecure Gateway
On-Premise
Secure GatewayClient
Secure Tunnel
REST
Kafka API
Kafka REST API
MQ Light
MQ
JMS
MQLight
トピック
MQ
① ゲートウェイの構成
② ゲートウェイ・クライアントの構成
④ ストリームの構成
トピックAMQP
チャネル
SG
クライアント
ゲートウェイ
宛先ストリーム
MQサーバー・アドレス:192.168.0.1AMQPチャネル・ポート:5672トピック名:topic01
ゲートウェイ名:mygateway宛先名:mydestination
ストリーム名:mystream
③ 宛先の構成
※MQサーバー、AMQPチャネルは事前に定義済みとする
※Message Hubサービスは事前に定義済みとする
©2016 IBM Corporation54
①ゲートウェイの構成(Bluemix:Secure Gateway)p Bluemix上で、Message Connectの前提となるSecure Gatewayを構成
「統合」カテゴリのSecure Gatewayを選択
©2016 IBM Corporation55
①ゲートウェイの構成(Bluemix:Secure Gateway)p Secure Gatewayのサービスを作成
Secure Gatewayのサービスを作成
©2016 IBM Corporation56
①ゲートウェイの構成(Bluemix:Secure Gateway)p ゲートウェイを作成
ゲートウェイの名前(mygateway)を指定
©2016 IBM Corporation57
①ゲートウェイの構成(Bluemix:Secure Gateway)p 作成したゲートウェイの情報を確認
作成したゲートウェイの設定ボタンをクリック
セキュリティ・トークンとゲートウェイIDを確認(これらは、ゲートウェイ・クライアントを構成する際に使⽤)
©2016 IBM Corporation58
②ゲートウェイ・クライアントの構成p 作成したゲートウェイから、ゲートウェイ・クライアントの追加ウィザードを開く
クライアントの追加ボタンをクリック
©2016 IBM Corporation59
②ゲートウェイ・クライアントの構成p ゲートウェイ・クライアントの選択
環境に応じてゲートウェイ・クライアントのタイプを選択・ネイティブ・クライアント・docker・IBM DataPower
©2016 IBM Corporation60
②ゲートウェイ・クライアントの構成p ゲートウェイ・クライアントの構成⼿順はクライアントのタイプによって異なるため、下記リンクを参照
p https://console.ng.bluemix.net/docs/services/SecureGateway/secure_gateway.html?pos=2
p 例:Linux(RedHat)環境のネイティブ・クライアントの場合の構成⼿順概要p 前提の確認
Ø 上記リンクにて確認p 前画⾯より該当のインストーラーをダウンロードp クライアントをインストール
Ø rpm -ivhf ibm-securegateway-client-1.4.1+client_amd64.rpmp クライアントの構成
Ø ⾃動開始の選択Ø ゲートウェイID、セキュリティー・トークンの設定 ← ゲートウェイ構成時に確認したIDとトークンを指定Ø アクセス制御リストの設定 ← AMQPチャネルのポート(5672)へのアクセスを許可するØ など
p クライアントの開始Ø ⼿動で起動するか、⾃動開始機能で起動 → ここでBluemix上のゲートウェイとオンプレ上のクライアントが接続
©2016 IBM Corporation61
②ゲートウェイ・クライアントの構成p ゲートウェイ側でクライアントからの接続を確認
ゲートウェイ・クライアントを適切に構成し、起動すると、クライアントはBluemix側のゲートウェイと接続し、ここに接続済みのマークが表⽰される
©2016 IBM Corporation62
③宛先の構成p 前の画⾯で「宛先の追加」を選択し、宛先追加ウィザードを開く
宛先(MQサーバー)がどこで稼働しているか選択
©2016 IBM Corporation63
③宛先の構成p 宛先への接続情報を設定
MQサーバーが稼働するホスト名/IPアドレスとAMQPチャネルのポート番号を指定※ゲートウェイ・クライアントからアクセスできる接続情報をセットする
©2016 IBM Corporation64
③宛先の構成p アプリケーションが宛先に接続するプロトコルを指定
ここでのアプリケーションはMessage Connectを指し、宛先はMQのAMQPチャネルを指すプロトコルはTCPを指定
©2016 IBM Corporation65
③宛先の構成p 認証⽅法の指定
ここでは「なし」を選択
©2016 IBM Corporation66
③宛先の構成p IPテーブル・ルールの追加
ここでは、ルールは追加しない
©2016 IBM Corporation67
③宛先の構成p 宛先の名前を指定
宛先の名前(mydestination)を指定する
©2016 IBM Corporation68
③宛先の構成p 宛先が作成されたことを確認
©2016 IBM Corporation69
④ストリームの構成p Message Connectのサービスを作成
Message Connectは試験版のため、カタログ・ページからExperimental Servicesのサイトに移動する
©2016 IBM Corporation70
④ストリームの構成p Experimental Serviceのカタログで「統合」カテゴリーのMessage Connectを選択
©2016 IBM Corporation71
④ストリームの構成p Message Connectのサービスを作成
©2016 IBM Corporation72
④ストリームの構成p ストリームを作成
ここをクリック
©2016 IBM Corporation73
④ストリームの構成p ストリーム名を指定
ストリーム名(mystream)を指定※ここで指定した名前がMessage Hub側のトピック名となる
©2016 IBM Corporation74
④ストリームの構成p ストリームのタイプで「MQ Light」を選択し、ゲートウェイと宛先を指定
MQ Lightをクリック
先に作成したゲートウェイ(mygateway)と宛先(mydestination)を選択し、IMPORTボタンをクリック
©2016 IBM Corporation75
④ストリームの構成p Message HubとAMQPチャネル/MQを連携するための情報を指定
IMPORTボタンを押すと、⾃動的に値がセットされる
AMQPチャネルに接続するユーザー/パスワードを指定※AMQPチャネルおよびMQ側のセキュリティ設定に依存
MQ Light/MQ側のトピック名(topic01)を指定
©2016 IBM Corporation76
④ストリームの構成p 作成したストリームのステータスを確認
ステータスがRunningとなることを確認
©2016 IBM Corporation77
④ストリームの構成p Message Hubサービスにて、トピックが作成されていることを確認
ストリーム名のトピックができていることを確認
©2016 IBM Corporation78
Message Connectの構成p 以上の⼿順によって、Message ConnectのストリームがMQ(AMQPチャネル)とMessage Hubサービスと
の間でメッセージを仲介することができる
Bluemix
Message Hub Message ConnectSecure Gateway
On-Premise
Secure GatewayClient
Secure Tunnel
REST
Kafka API
Kafka REST API
MQ Light
MQ
JMS
MQLight
トピック
mystream
MQ
トピック
topic01AMQP
チャネル
SG
クライアント
ゲートウェイ
宛先
MQサーバー・アドレス:192.168.0.1AMQPチャネル・ポート:5672トピック名:topic01
ストリーム名:mystream
ストリーム
MQ側のtopic01をサブスクライブし、受信したメッセージをMessage Hub側のmystreamトピックに配信する
©2016 IBM Corporation79
参考資料p [dW]
p IBM Message ConnectØ https://developer.ibm.com/messaging/2016/03/16/message_connect_hybrid_messaging/Ø https://developer.ibm.com/messaging/2016/02/15/introducing-ibm-message-connect-the-way-to-connect-bluemix-
messaging-with-on-prem-mq/
p [Bluemix]p Message Connectの概要
Ø https://console.ng.bluemix.net/docs/services/MessageConnect/index.html#gettingstartedeventhub
top related