NGSITestSink¶
コンテンツ:
機能性¶
com.iot.telefonica.cygnus.sinks.NGSITestSink
、または単なる NGSITestSinkNGSI
は、NGSI ライクのコンテキストデータ・イベントを受信したときにCygnusをテストするように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常にCygnusソースの内部 NGSIEvent
オブジェクトに変換されます。最終的に、これらのイベント内の情報は、実際のストレージに保存されるのではなく、単にログに記録されるようになっています。log4j
の設定に応じて、ログはコンソールやファイル等に出力されます。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知された NGSI イベントは、NGSI データジェネレータまたは持続される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクトに変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは特定のヘッダとContextElement
オブジェクトが組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergonのソース)で行われています。変換されると、データ(NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
ログへの NGSIEvent
s の マッピング¶
マッピングは、直接、コンテキストデータを文字列に変換してコンソールに書き込む、またはファイルです。
例¶
NGSIEvent
¶
通知されたNGSIコンテキストデータから、以下の NGSIEvent
が作成されたとします。以下のコードは、実際のデータフォーマットではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
ログ・アペンダがコンソールであると仮定すると、NGSITestSink
は、本体内のデータを次のようにログします :
time=2015-12-10T14:31:49.389CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[150] : Starting transaction (1429535775-308-0000000000)
time=2015-12-10T14:31:49.392CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[232] : Received data ({ "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "attributes" : [ { "name" : "speed", "type" : "float", "value" : "112.9" }, { "name" : "oil_level", "type" : "float", "value" : "74.6" } ], "type" : "car", "isPattern" : "false", "id" : "car1" }, "statusCode" : { "code" : "200", "reasonPhrase" : "OK" } } ]})
time=2015-12-10T14:31:49.394CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[255] : Event put in the channel (id=1491400742, ttl=10)
time=2015-12-10T14:31:49.485CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=persistAggregation | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSITestSink[176] : [test-sink] Persisting data at NGSITestSink. Data (Processing event={Processing headers={recvTimeTs= 1429535775, fiwareService=vehicles, fiwareServicePath=4wheels, destinations=car1_car}, Processing context element={id=car1, type=car}, Processing attribute={name=speed, type=float, value="112.9", metadata=[]}, Processing attribute={name=oil_level, type=float, value="74.6", metadata=[]}})
time=2015-12-10T14:31:49.486CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=process | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSISink[178] : Finishing transaction (1429535775-308-0000000000)
管理ガイド¶
構成¶
NGSITestSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSITestSinkである必要があります |
channel | yes | N/A | |
enable_grouping | no | false | true または false |
enable_lowercase | no | false | true または false |
data_model | no | dm-by-entity | 構成されていなくても、常に dm-by-entity |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
構成例は、次のとおりです :
cygnus-ngsi.sinks = test-sink
cygnus-ngsi.channels = test-channel
...
cygnus-ngsi.sinks.test-sink.type = com.telefonica.iot.cygnus.sinks.NGSITestSink
cygnus-ngsi.sinks.test-sink.channel = ckan-channel
cygnus-ngsi.sinks.test-sink.enable_grouping = false
cygnus-ngsi.sinks.test-sink.enable_lowercase = false
cygnus-ngsi.sinks.test-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.test-sink.batch_size = 100
cygnus-ngsi.sinks.test-sink.batch_timeout = 30
cygnus-ngsi.sinks.test-sink.batch_ttl = 10
ユースケース¶
このシンクを使用して、Cygnus デプロイメントが Orion Context Broker からの通知を適切に受信しているかどうかをテストします。
重要なメモ¶
バッチ処理¶
NGSITestSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチ・メカニズムに関して重要なのは、書き込みの回数が大幅に減るため、シンクのパフォーマンスが大幅に向上することです。特に、このテストシンクでは重要ではありませんが、他のシンクではこの機能が大きく役立ちます。詳細については、特定のシンクのドキュメントを確認してください。