NGSIKafkaSink

コンテンツ :

機能性

com.iot.telefonica.cygnus.sinks.NGSIKafkaSink、または単なる NGSIKafkaSink は、Apache Kafka デプロイメント内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語 を話す他のシステムである可能性があります。

データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の Kafka データ構造にマップする必要があります。

次のセクションでこれについて詳しく説明します。

トップ

NGSIEvent オブジェクトへの NGSI イベントのマッピング

コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent オブジェクト に変換されます。NGSIEvent は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement オブジェクトを組み合わせたものです

これは、コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent オブジェクト に変換されます。NGSIEvent は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement オブジェクトを組み合わせたものです のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください

トップ

Kafka データ構造への NGSIEvents のマッピング

Apache Kafka organizes は、トピックのデータ(メッセージが公開されるカテゴリまたはフィード名)を編成します。そのような構成は、NGSIEvent が永続化されるたびに NGSIKafkaSink によって利用されます。

トップ

トピックの命名規則

構成されたデータモデルに応じて、Kafka のトピックがまだ作成されていない場合は作成されます(パーティション数 1) :

  • サービスによるデータモデル (data_model=dm-by-service)。データモデル名が示すとおり、通知された FIWARE service、または NGSIRestHandler でデフォルトで設定されたものがトピックの名前として使用されます。これにより、同じサービスに属するすべての NGSI エンティティに関するデータがこのユニークなトピックにストアされます
  • サービスパスによるデータモデル (data_model=dm-by-service-path)。データモデル名が示すとおり、通知された FIWARE service path、または NGSIRestHandler デフォルトで設定されたものがトピックの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの固有のトピックにストアされます。このデータモデルに関する唯一の制約は、FIWARE service path をルート(/)にすることができないことです
  • 実体によるデータモデル (data_model=dm-by-entity)。各エンティティについて、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID およびタイプに連結され、トピック名を構成します。FIWARE service path がルート(/)である場合、エンティティ ID とタイプのみが連結されます
  • 属性によるデータモデル (data_model=dm-by-attribute)。各エンティティの属性について、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID、タイプ、および通知された属性名に連結され、トピック名を構成します。FIWARE service path がルート(/)である場合、エンティティ ID とタイプ、および属性名とタイプのみが連結されます

Kafka に受け入れられているか禁止されている文字セットは知られていません。とにかく、特定のエンコーディングが適用されます。

次の表は、トピック名 の構成をまとめたものです :

FIWARE service path dm-by-service dm-by-service-path dm-by-entity dm-by-attribute 
/ <svc> <svc>xffffx002f <svc>xffffx002fxffff<entityId>xffff<entityType> <svc>xffffx002fxffff<entityId>xffff<entityType>xffff<attrName>
/<svcPath> <svc> <svc>xffffx002f<svcPath> <svc>xffffx002f<svcPath>xffff<entityId>xffff<entityType> <svc>xffffx002f<svcPath>xffff<entityId>xffff<entityType>xffff<attrName>

エンティティ ID とタイプの連結が、NGSIEvent 内の notified_entities/grouped_entities ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。

トップ

ストア

NGSIEvent 構造体は、NGSI ライクのソースによって通知されるため、ヘッダの配列を含む Json オブジェクトと Json データを含む別のオブジェクトとして、文字列化されます。

トップ

NGSIEvent

通知された NGSI コンテキストデータから、以下の NGSIEvent が作成されたと仮定します。以下のコードは、実際のデータフォーマットではなくオブジェクト表現です :

ngsi-event={
    headers={
         content-type=application/json,
         timestamp=1429535775,
         transactionId=1429535775-308-0000000000,
         correlationId=1429535775-308-0000000000,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         <grouping_rules_interceptor_headers>,
         <name_mappings_interceptor_headers>
    },
    body={
        entityId=car1,
        entityType=car,
        attributes=[
            {
                attrName=speed,
                attrType=float,
                attrValue=112.9
            },
            {
                attrName=oil_level,
                attrType=float,
                attrValue=74.6
            }
        ]
    }
}

トップ

トピック名

トピック名は、構成されたデータ・モデルに応じて、以下のものになります :

FIWARE service path dm-by-service dm-by-service-path dm-by-entity dm-by-attribute 
/ vehicles vehiclesxffffx002f vehiclesxffffx002fxffffcar1_car vehiclesxffffx002fxffffcar1xffffcarxffffspeed
vehiclesxffffx002fxffffcar1xffffcarxffffoil_level
/4wheels vehicles vehiclesxffffx002f4wheels vehiclesxffffx002f4wheelsxffffcar1xffffcar vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed
vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffoil_level

トップ

ストア

トピック名 vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed (属性、非ルート・サービスパスによるデータモデル) を想定しましょう。このトピックにストアされるデータは次のとおりです :

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic vehiclesxffffx002f4wheelsxffffcar1xffffcarxffffspeed --from-beginning
{"headers":[{"fiware-service":"vehicles"},{"fiware-servicePath":"/4wheels"},{"timestamp":1429535775}],"body":{"contextElement":{"attributes":[{"name":"speed","type":"float","value":"112.9"}],"type":"car","isPattern":"false","id":"car1"},"statusCode":{"code":"200","reasonPhrase":"OK"}}}
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic vehicles_4wheels_car1_car_oil_level --from-beginning
{"headers":[{"fiware-service":"vehicles"},{"fiware-servicePath":"4wheels"},{"timestamp":1429535775}],"body":{"contextElement":{"attributes":[{"name":"oil_level","type":"float","value":"74.6"}],"type":"car","isPattern":"false","id":"car1"},"statusCode":{"code":"200","reasonPhrase":"OK"}}}

トップ

管理ガイド

構成

NGSIKafkaSink は、次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.NGSIKafkaSink でなければなりません
channel yes N/A
enable_grouping no false true か *false。詳細については、このリンクをチェックしてください
enable_name_mappings no false true か *false。詳細については、このリンクをチェックしてください
enable_lowercase no false truefalse
data_model no dm-by-entity dm-by-service, dm-by-service-path, dm-by-entity または dm-by-attribute
broker_list no localhost:9092 Kafka ブローカのカンマ区切りリスト (ブローカは host:port として定義されます)
zookeeper_endpoint no localhost:2181 Zookeeper エンドポイントは、host:port の形式で Kafka トピックを作成するために必要です
partitions no 1 トピックのパーティション数
replication_factor no 1 レプリケーション・ファクタ N のトピックでは、Kafka はログにコミットされたメッセージを失うことなく N-1 サーバの障害を許容します。レプリケーション・ファクタは、作成されたブローカの数以下でなければなりません
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
batch_retry_intervals no 5000 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます

構成例は次のとおりです :

cygnus-ngsi.sinks = kafka-sink
cygnus-ngsi.channels = kafka-channel
...
cygnus-ngsi.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.NGSIKafkaSink
cygnus-ngsi.sinks.kafka-sink.channel = kafka-channel
cygnus-ngsi.sinks.kafka-sink.enable_grouping = false
cygnus-ngsi.sinks.kafka-sink.enable_lowercase = false
cygnus-ngsi.sinks.kafka-sink.enable_name_mappings = false
cygnus-ngsi.sinks.kafka-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.kafka-sink.broker_list = localhost:9092
cygnus-ngsi.sinks.kafka-sink.zookeeper_endpoint = localhost:2181
cygnus-ngsi.sinks.kafka-sink.partitions = 5
cygnus-ngsi.sinks.kafka-sink.replication_factor = 1
cygnus-ngsi.sinks.kafka-sink.batch_size = 100
cygnus-ngsi.sinks.kafka-sink.batch_timeout = 30
cygnus-ngsi.sinks.kafka-sink.batch_ttl = 10
cygnus-ngsi.sinks.kafka-sink.batch_retry_intervals = 5000

トップ

ユースケース

Storm リアルタイム・アプリケーションとして、Kafka ベースのコンシューマと OrionContextBrokerを統合する場合に NGSIKafkaSink を使用します。

トップ

重要なメモ

バッチ処理

プログラマ・ガイドで説明したように、NGSIKafkaSink は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続性の詳細のみを処理することができます。

バッチのメカニズムに関して重要なことは、インサートの数が劇的に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう.100個の NGSIEvent を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ Carto テーブルに保持されます。イベントを1つずつ処理する場合は、Carto に100個のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 Carto テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の Carto デスティネーション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。

バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。

永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。

デフォルトで、NGSIKafkaSinは、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、予想されるサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。

トップ

エンコーディング

Cygnus は、 Kafka のデータ構造に合わせたこの特定のエンコーディングを適用します :

  • 英数字はエンコードされません
  • 数字はエンコードされません
  • アンダースコア文字 (_) はエンコードされません
  • ハイフン文字 (`-') はエンコードされません
  • ドット文字 (`.') はエンコードされません
  • 等号文字 (=) は、xffff としてエンコードされます
  • FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の x 文字としてエンコードされます
  • x 文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後に xx としてエンコードされます
  • xffffは、連結文字として使用されます

トップ

プログラマ・ガイド

NGSIKafkaSink クラス

他の NGSI ライクのシンクと同様に、NGSIKafkaSink は、ベース NGSISink を拡張します。拡張されるメソッドは次のとおりです :

void persistBatch(Batch batch) throws Exception;

Batch には、通知されたコンテキストデータイベントを解析した結果である一連の NGSIEvent オブジェクトが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される Kafka トピックを指定します。したがって、各宛先は、KafkaProducer のおかげで永続化される宛先データ文字列を構成するために反復します。

public void start();

KafkaProducer が作成されます。起動するシーケンスは NGSIKafkaSink() (コンストラクタ)、configure() および、start() であるため、start() メソッドではなくコンストラクタで行う必要があります。

public void configure(Context);

上記のような完全な構成が、与えられた Context インスタンスから読み取られます。

トップ