NGSIKafkaSink¶
コンテンツ :
機能性¶
com.iot.telefonica.cygnus.sinks.NGSIKafkaSink
、または単なる NGSIKafkaSink
は、Apache Kafka デプロイメント内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語 を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の Kafka データ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです
これは、コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください
Kafka データ構造への NGSIEvent
s のマッピング¶
Apache Kafka organizes は、トピックのデータ(メッセージが公開されるカテゴリまたはフィード名)を編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSIKafkaSink
によって利用されます。
トピックの命名規則¶
構成されたデータモデルに応じて、Kafka のトピックがまだ作成されていない場合は作成されます(パーティション数 1) :
- サービスによるデータモデル (
data_model=dm-by-service
)。データモデル名が示すとおり、通知された FIWARE service、またはNGSIRestHandler
でデフォルトで設定されたものがトピックの名前として使用されます。これにより、同じサービスに属するすべての NGSI エンティティに関するデータがこのユニークなトピックにストアされます - サービスパスによるデータモデル (
data_model=dm-by-service-path
)。データモデル名が示すとおり、通知された FIWARE service path、またはNGSIRestHandler
デフォルトで設定されたものがトピックの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの固有のトピックにストアされます。このデータモデルに関する唯一の制約は、FIWARE service path をルート(/
)にすることができないことです - 実体によるデータモデル (
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID およびタイプに連結され、トピック名を構成します。FIWARE service path がルート(/
)である場合、エンティティ ID とタイプのみが連結されます - 属性によるデータモデル (
data_model=dm-by-attribute
)。各エンティティの属性について、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID、タイプ、および通知された属性名に連結され、トピック名を構成します。FIWARE service path がルート(/
)である場合、エンティティ ID とタイプ、および属性名とタイプのみが連結されます
Kafka に受け入れられているか禁止されている文字セットは知られていません。とにかく、特定のエンコーディングが適用されます。
次の表は、トピック名 の構成をまとめたものです :
FIWARE service path | dm-by-service |
dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|---|
/ |
<svc> |
<svc>xffffx002f |
<svc>xffffx002fxffff<entityId>xffff<entityType> |
<svc>xffffx002fxffff<entityId>xffff<entityType>xffff<attrName> |
/<svcPath> |
<svc> |
<svc>xffffx002f<svcPath> |
<svc>xffffx002f<svcPath>xffff<entityId>xffff<entityType> |
<svc>xffffx002f<svcPath>xffff<entityId>xffff<entityType>xffff<attrName> |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entities
ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。
ストア¶
NGSIEvent
構造体は、NGSI ライクのソースによって通知されるため、ヘッダの配列を含む Json オブジェクトと Json データを含む別のオブジェクトとして、文字列化されます。
例¶
NGSIEvent
¶
通知された NGSI コンテキストデータから、以下の NGSIEvent
が作成されたと仮定します。以下のコードは、実際のデータフォーマットではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
トピック名¶
トピック名は、構成されたデータ・モデルに応じて、以下のものになります :
FIWARE service path | dm-by-service |
dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|---|
/ |
vehicles |
vehiclesxffffx002f |
vehiclesxffffx002fxffffcar1_car |
vehiclesxffffx002fxffffcar1xffffcarxffffspeed vehiclesxffffx002fxffffcar1xffffcarxffffoil_level |
/4wheels |
vehicles |
vehiclesxffffx002f4wheels |
vehiclesxffffx002f4wheelsxffffcar1xffffcar |
vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffoil_level |
ストア¶
トピック名 vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed
(属性、非ルート・サービスパスによるデータモデル) を想定しましょう。このトピックにストアされるデータは次のとおりです :
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed --from-beginning
{"headers":[{"fiware-service":"vehicles"},{"fiware-servicePath":"/4wheels"},{"timestamp":1429535775}],"body":{"contextElement":{"attributes":[{"name":"speed","type":"float","value":"112.9"}],"type":"car","isPattern":"false","id":"car1"},"statusCode":{"code":"200","reasonPhrase":"OK"}}}
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic vehicles_4wheels_car1_car_oil_level --from-beginning
{"headers":[{"fiware-service":"vehicles"},{"fiware-servicePath":"4wheels"},{"timestamp":1429535775}],"body":{"contextElement":{"attributes":[{"name":"oil_level","type":"float","value":"74.6"}],"type":"car","isPattern":"false","id":"car1"},"statusCode":{"code":"200","reasonPhrase":"OK"}}}
管理ガイド¶
構成¶
NGSIKafkaSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSIKafkaSink でなければなりません |
channel | yes | N/A | |
enable_grouping | no | false | true か *false。詳細については、このリンクをチェックしてください |
enable_name_mappings | no | false | true か *false。詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true か false |
data_model | no | dm-by-entity | dm-by-service, dm-by-service-path, dm-by-entity または dm-by-attribute |
broker_list | no | localhost:9092 | Kafka ブローカのカンマ区切りリスト (ブローカは host:port として定義されます) |
zookeeper_endpoint | no | localhost:2181 | Zookeeper エンドポイントは、host:port の形式で Kafka トピックを作成するために必要です |
partitions | no | 1 | トピックのパーティション数 |
replication_factor | no | 1 | レプリケーション・ファクタ N のトピックでは、Kafka はログにコミットされたメッセージを失うことなく N-1 サーバの障害を許容します。レプリケーション・ファクタは、作成されたブローカの数以下でなければなりません |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
構成例は次のとおりです :
cygnus-ngsi.sinks = kafka-sink
cygnus-ngsi.channels = kafka-channel
...
cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.enable_lowercase = false
cygnus-ngsi.sinks.kafka-sink.enable_name_mappings = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = localhost:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = localhost:2181
cygnus-ngsi.sinks.kafka-sink.partitions = 5
cygnus-ngsi.sinks.kafka-sink.replication_factor = 1
cygnus-ngsi.sinks.kafka-sink.batch_size = 100
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 30
cygnus-ngsi.sinks.kafka-sink.batch_ttl = 10
cygnus-ngsi.sinks.kafka-sink.batch_retry_intervals = 5000
ユースケース¶
Storm リアルタイム・アプリケーションとして、Kafka ベースのコンシューマと OrionContextBrokerを統合する場合に NGSIKafkaSink
を使用します。
重要なメモ¶
バッチ処理¶
プログラマ・ガイドで説明したように、NGSIKafkaSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続性の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が劇的に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう.100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ Carto テーブルに保持されます。イベントを1つずつ処理する場合は、Carto に100個のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 Carto テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の Carto デスティネーション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSIKafkaSin
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、予想されるサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
エンコーディング¶
Cygnus は、 Kafka のデータ構造に合わせたこの特定のエンコーディングを適用します :
- 英数字はエンコードされません
- 数字はエンコードされません
- アンダースコア文字 (
_
) はエンコードされません - ハイフン文字 (`-') はエンコードされません
- ドット文字 (`.') はエンコードされません
- 等号文字 (
=
) は、xffff
としてエンコードされます - FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
プログラマ・ガイド¶
NGSIKafkaSink
クラス¶
他の NGSI ライクのシンクと同様に、NGSIKafkaSink
は、ベース NGSISink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には、通知されたコンテキストデータイベントを解析した結果である一連の NGSIEvent
オブジェクトが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される Kafka トピックを指定します。したがって、各宛先は、KafkaProducer
のおかげで永続化される宛先データ文字列を構成するために反復します。
public void start();
KafkaProducer
が作成されます。起動するシーケンスは NGSIKafkaSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。