NGSITestSink

コンテンツ:

機能性

com.iot.telefonica.cygnus.sinks.NGSITestSink、または単なる NGSITestSinkNGSI は、NGSI ライクのコンテキストデータ・イベントを受信したときにCygnusをテストするように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。

データジェネレータとは無関係に、NGSI コンテキストデータは常にCygnusソースの内部 NGSIEvent オブジェクトに変換されます。最終的に、これらのイベント内の情報は、実際のストレージに保存されるのではなく、単にログに記録されるようになっています。log4j の設定に応じて、ログはコンソールやファイル等に出力されます。

次のセクションでこれについて詳しく説明します。

トップ

NGSIEvent オブジェクトへの NGSI イベントのマッピング

コンテキストデータを含む、通知された NGSI イベントは、NGSI データジェネレータまたは持続される最終的なバックエンドとは無関係に、NGSIEvent オブジェクトに変換されます。NGSIEvent は、各コンテキスト要素に対して作成されます。このようなイベントは特定のヘッダとContextElement オブジェクトが組み合わせたものです。

これは、NGSIRestHandler のおかげで、cygnus-ngsi Http のリスナー(Flume jergonのソース)で行われています。変換されると、データ(NGSIEvent オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。

トップ

ログへの NGSIEvents の マッピング

マッピングは、直接、コンテキストデータを文字列に変換してコンソールに書き込む、またはファイルです。

トップ

NGSIEvent

通知されたNGSIコンテキストデータから、以下の NGSIEvent が作成されたとします。以下のコードは、実際のデータフォーマットではなくオブジェクト表現です :

ngsi-event={
    headers={
         content-type=application/json,
         timestamp=1429535775,
         transactionId=1429535775-308-0000000000,
         correlationId=1429535775-308-0000000000,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         <grouping_rules_interceptor_headers>,
         <name_mappings_interceptor_headers>
    },
    body={
        entityId=car1,
        entityType=car,
        attributes=[
            {
                attrName=speed,
                attrType=float,
                attrValue=112.9
            },
            {
                attrName=oil_level,
                attrType=float,
                attrValue=74.6
            }
        ]
    }
}

ログ・アペンダがコンソールであると仮定すると、NGSITestSink は、本体内のデータを次のようにログします :

time=2015-12-10T14:31:49.389CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[150] : Starting transaction (1429535775-308-0000000000)
time=2015-12-10T14:31:49.392CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[232] : Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "speed",            "type" : "float",            "value" : "112.9"          },          {            "name" : "oil_level",            "type" : "float",            "value" : "74.6"          }        ],        "type" : "car",        "isPattern" : "false",        "id" : "car1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
time=2015-12-10T14:31:49.394CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=getEvents | comp=Cygnus | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[255] : Event put in the channel (id=1491400742, ttl=10)
time=2015-12-10T14:31:49.485CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=persistAggregation | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSITestSink[176] : [test-sink] Persisting data at NGSITestSink. Data (Processing event={Processing headers={recvTimeTs= 1429535775, fiwareService=vehicles, fiwareServicePath=4wheels, destinations=car1_car}, Processing context element={id=car1, type=car}, Processing attribute={name=speed, type=float, value="112.9", metadata=[]}, Processing attribute={name=oil_level, type=float, value="74.6", metadata=[]}})
time=2015-12-10T14:31:49.486CET | lvl=INFO | trans=1429535775-308-0000000000 | srv=vehicles | subsrv=4wheels | function=process | comp=Cygnus | msg=com.telefonica.iot.cygnus.sinks.NGSISink[178] : Finishing transaction (1429535775-308-0000000000)

トップ

管理ガイド

構成

NGSITestSink は、次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.NGSITestSinkである必要があります
channel yes N/A
enable_grouping no false true または false
enable_lowercase no false true または false
data_model no dm-by-entity 構成されていなくても、常に dm-by-entity
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください

構成例は、次のとおりです :

cygnus-ngsi.sinks = test-sink
cygnus-ngsi.channels = test-channel
...
cygnus-ngsi.sinks.test-sink.type = com.telefonica.iot.cygnus.sinks.NGSITestSink
cygnus-ngsi.sinks.test-sink.channel = ckan-channel
cygnus-ngsi.sinks.test-sink.enable_grouping = false
cygnus-ngsi.sinks.test-sink.enable_lowercase = false
cygnus-ngsi.sinks.test-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.test-sink.batch_size = 100
cygnus-ngsi.sinks.test-sink.batch_timeout = 30
cygnus-ngsi.sinks.test-sink.batch_ttl = 10

トップ

ユースケース

このシンクを使用して、Cygnus デプロイメントが Orion Context Broker からの通知を適切に受信しているかどうかをテストします。

トップ

重要なメモ

バッチ処理

NGSITestSink は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。

バッチ・メカニズムに関して重要なのは、書き込みの回数が大幅に減るため、シンクのパフォーマンスが大幅に向上することです。特に、このテストシンクでは重要ではありませんが、他のシンクではこの機能が大きく役立ちます。詳細については、特定のシンクのドキュメントを確認してください。

トップ