NGSISTHSink¶
コンテンツ:
機能性¶
com.iot.telefonica.cygnus.sinks.NGSISTHSink
、または単なる NGSISTHSink
は、MongoDB サーバ内の NGSI ライクのコンテキストデータイベントを集約して保持するように設計されたシンクです。具体的には、これらの測定値は次のように計算されます :
- 数値属性値の場合:
- すべてのサンプルの合計
- すべてのサンプルの二乗値の和
- すべてのサンプルの中で最大値
- すべてのサンプルの中の最小値
- 文字列属性値のオカレンス数
STH Comet と サポートされている集計については、STH Comet Github でさらに詳しく知ることができます。
通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の HDFSデータ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
MongoDB データ構造への NGSIEvent
s のマッピング¶
MongoDB は、Json ドキュメントのコレクションを含むデータベースにデータを編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSISTHSink
によって利用されます。
MongoDB データベースの命名規則¶
イベント内の fiware-service
ヘッダ値と呼ばれるデータベースが作成されます(存在しない場合)。設定済みのプレフィックスがデフォルト sth_
でに追加されます。
MongoDBは、データベース名に /
, \
,.
,"
, $
を受け入れません。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。
MongoDB コレクションの命名規則¶
これらのコレクションの名前は、設定されたデータモデルと分析モードによって異なります。詳細については、構成セクションを参照してください :
- サービスパスによるデータモデル (
data_model=dm-by-service-path
)。データモデル名が示すように、通知された FIWARE service、またはNGSIRestHandler
でデフォルトで設定されたものがコレクションの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルにストアされます。設定されたプレフィックスは、コレクション名の前に付加され、.aggr
サフィックスが追加されます - 実体によるデータモデル (
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトの FIWARE service path は、コレクション名を構成するために、通知されたエンティティ ID とタイプに連結されます。FIWARE service path がルート (/
) である場合、エンティティ ID とタイプのみが連結されます。設定されたプレフィックスは、コレクション名の前に付加され、.aggr
サフィックスが追加されます - 属性によるデータモデル(
data_model=dm-by-attribute
)。各エンティティの属性について、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプ、および通知された属性名に連結され、コレクション名を構成します。FIWARE service path がルート (/
) である場合、エンティティ ID とタイプ、および属性名とタイプのみが連結されます。設定されたプレフィックスがコレクション名の先頭に追加され、.aggr
サフィックスが追加されます
MongoDBはコレクション名に $
を受け入れないので、アンダースコア _
で置き換えられます。これにより、enable_encoding
構成パラメータに応じて、特定のエンコードが適用されます。
MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。
次の表は、テーブル名の構成をまとめたものです。古いエンコーディングである、デフォルトの sth_
プレフィックスを前提としています :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_/.aggr |
sth_/<entityId>_<entityType>.aggr |
sth_/<entityId>_<entityType>_<attrName>.aggr |
/<svcPath> |
sth_/<svcPath>.aggr |
sth_/<svcPath>_<entityId>_<entityType>.aggr |
sth_/<svcPath>_<entityId>_<entityType>_<attrName>.aggr |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_x002f.aggr |
sth_x002fxffff<entityId>xffff<entityType>.aggr |
sth_x002fxffff<entityId>xffff<entityType>xffff<attrName>.aggr |
/<svcPath> |
sth_x002fxffff<svcPath>.aggr |
sth_x002fxffff<svcPath>xffff<entityId>xffff<entityType>.aggr |
sth_x002fxffff<svcPath>xffff<entityId>xffff<entityType>xffff<attrName>.aggr |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entities
ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。
ストア¶
前述のように、NGSISTHSink
は、エンティティとその属性に関する特定の統計情報を事前集計するために設計されています :
- 数値属性値の場合:
- すべてのサンプルの合計
- すべてのサンプルの二乗値の和
- すべてのサンプルの中で最大値
- すべてのサンプルの中の最小値
- 文字列属性値のオカレンス数
これは、NGSI ライクのシンクの通常の動作を変更することによって行われます。情報要素を追加する代わりに、一連の情報要素(この場合はコレクション内のJsonドキュメント)が一度作成され、多く更新されます。
NGSISTHSink
が扱うそれぞれの解像度(month, day, hour, minute, second)については、少なくとも Json ドキュメントが存在します。origin については以下を参照してください。これらのドキュメントのそれぞれについて、解像度が示すもと同じ数のoffsetフィールドが存在します。例えば、"day"の解像度の場合は24のオフセット、"second"の解像度の場合は60のオフセットがあります。最初に、数値の集計の場合これらのオフセットは0、文字列の集計の場合は {}
に設定され、長い通知が到着すると、通知が受信された解像度内での解像度およびオフセットに応じて更新されます。
Jsonの各ドキュメントには、各解像度ごとに1つずつ、集約された情報が適用される時間の起点もあります。たとえば、 "minute" の解像度の場合、有効な起点は 2015-03-01T13:00:00.000Z
で、2015年3月1日の13時を意味します。同じ解像度の原点が異なる別の Json ドキュメントがあるかもしれませんが、たとえば 2015-03-01T14:00:00.000Z
は、2015年3月1日の14時を意味します。起点は、ロケールの問題を回避するために UTC 時間を使用して格納されます。
最後に、各ドキュメントは更新に使用されたサンプル数を保存します。これは、平均などの値を取得する場合に便利です。これは、合計をサンプル数で割ったものです。
例¶
NGSIEvent
¶
以下の NGSIEvent
が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
データベース名とコレクション名¶
プレフィックスと通知された FIWARE service path を連結した名前の MongoDB データベースが作成されます。例えば sth_vehicles
です。
コレクション名に関して、MongoDB コレクション名は、構成されたデータモデルに応じて、以下の古いエンコーディングとなります :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_/.aggr |
sth_/car1_car.aggr |
sth_/car1_car_speed.aggr sth_/car1_car_oil_level.aggr |
/4wheels |
sth_/4wheels.aggr |
sth_/4wheels_car1_car.aggr |
sth_/4wheels_car1_car_speed.aggr sth_/4wheels_car1_car_oil_level.aggr |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_x002f.aggr |
sth_x002fxffffcar1xffffcar.aggr |
sth_x002fxffffcar1xffffcarxffffspeed.aggr sth_x002fxffffcar1xffffcarxffffoil_level.aggr |
/4wheels |
sth_x002f4wheels.aggr |
sth_x002f4wheelsxffffcar1xffffcar.aggr |
sth_x002f4wheelsxffffcar1xffffcarxffffspeed.aggr sth_x002f4wheelsxffffcar1xffffcarxffffoil_level.aggr |
ストア¶
data_model=dm-by-entity
とすべての解決策を構成パラメータと想定します (詳細は構成セクションを参照してください)。NGSISTHSink
は本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels_car1_car.aggr
system.indexes
> db['sth_/4wheels_car1_car.aggr'].find()
{
"_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T00:00:00Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 12, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "speed", "origin" : ISODate("2015-01-01T00:00:00Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
...,
{ "offset" : 3, "samples" : 0, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:00Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 22, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T12:00:00Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 13, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-01T00:00:00Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
"points" : [
{ "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 20, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
...,
{ "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T00:00:00Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 12, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-01-01T00:00:00Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
...,
{ "offset" : 3, "samples" : 0, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:00Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 22, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:00:00Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
"points" : [
{ "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 13, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
{
"_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-01T00:00:00Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
"points" : [
{ "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
...,
{ "offset" : 20, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
...,
{ "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
]
}
管理ガイド¶
構成¶
NGSISTHSink
は、 次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSISTHSink |
channel | yes | N/A | |
enable_encoding | no | false | true または false。true は新しいエンコーディングを適用し、false は古いエンコーディングを適用します |
enable_grouping | no | false | 常に false です |
enable_name_mappings | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true または false |
data_model | no | dm-by-entity | dm-by-service-path、dm-by-entity。dm-by-service は現在サポートされていません |
mongo_hosts | no | localhost:27017 | FQDN/IP : MongoDB サーバが動作するポート(スタンドアロンの場合)、または MongoDB レプリカセットのメンバが実行される FQDN/IP:port ペアのカンマで区切られたリスト |
mongo_username | no | empty | 空の場合、認証は行われません |
mongo_password | no | empty | 空の場合、認証は行われません |
db_prefix | no | sth_ | |
collection_prefix | no | sth_ | system. は受け入れられません |
resolutions | no | month,day,hour,minute,second | データを集約するための分解能。受け入れられる値は、カンマで区切られた、月、日、時、分、秒です |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
data_expiration | no | 0 | コレクションは、秒単位で指定された値より古い場合は削除されます。時間の参照は、recvTime プロパティに格納されているものです。このポリシーが必要ない場合は、0 に設定します |
ignore_white_spaces | no | true | 排他的に空白ベースの属性値を無視する必要がある場合は true、そうでない場合は false |
構成例は、次のとおりです :
cygnus-ngsi.sinks = sth-sink
cygnus-ngsi.channels = sth-channel
...
cygnus-ngsi.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.NGSISTHSink
cygnus-ngsi.sinks.sth-sink.channel = sth-channel
cygnus-ngsi.sinks.sth-sink.enable_encoding = false
cygnus-ngsi.sinks.sth-sink.enable_grouping = false
cygnus-ngsi.sinks.sth-sink.enable_lowercase = false
cygnus-ngsi.sinks.sth-sink.enable_name_mappings = false
cygnus-ngsi.sinks.sth-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.sth-sink.mongo_hosts = 192.168.80.34:27017
cygnus-ngsi.sinks.sth-sink.mongo_username = myuser
cygnus-ngsi.sinks.sth-sink.mongo_password = mypassword
cygnus-ngsi.sinks.sth-sink.db_prefix = cygnus_
cygnus-ngsi.sinks.sth-sink.collection_prefix = cygnus_
cygnus-ngsi.sinks.sth-sink.resolutions = month,day
cygnus-ngsi.sinks.sth-sink.batch_size = 100
cygnus-ngsi.sinks.sth-sink.batch_timeout = 30
cygnus-ngsi.sinks.sth-sink.batch_ttl = 10
cygnus-ngsi.sinks.sth-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.sth-sink.data_expiration = 0
cygnus-ngsi.sinks.sth-sink.ignore_white_spaces = true
ユースケース¶
中期的にはそれほど成長していない集約されたデータに関する Json ベースのドキュメントストレージを探している場合に、NGSISTHSink
を使用します。
重要なメモ¶
バッチ処理¶
プログラマ・ガイドで説明したように、NGSISTHSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ MongoDB コレクションに保持されます。イベントを1つずつ処理する場合は、MongoDB に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 MongoDB コレクションごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の MongoDB コレクション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSISTHSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
recvTime
と TimeInstant
メタデータ¶
デフォルトで、NGSISTHSink
は、通知受信タイムスタンプが保存されます。それにもかかわらず、row
モードで作業中であって、TimeInstant
というメタデータが通知された場合には、そのようなメタデータ値が受信タイムスタンプの代わりに使用されます。これは、受信時間の代わりに、TimeInstant
メタデータとして通知される測定生成時間を持続させたい場合に便利です。
エンコーディング¶
NGSIMongoSink
は、MongoDB の命名規則に従います。一言で言えば :
バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:
- データベース名は、
\
,/
,.
, '$,
"および
_としてエンコード された
` 文字 を持ちます - コレクション名には、
_
としてエンコードされた$
文字が含まれます
バージョン1.3.0(含む)から、Cygnus は MongoDB データ構造に合わせたこの特定のエンコーディングを適用します :
- 等号文字 (
=
)は、xffff
としてエンコードされます - すべての禁止された文字は、文字の Unicode の後に
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding
パラメータを使用してエンコーディングタイプを切り替えることは可能です。
サポートされている MongoDB バージョン¶
このシンクは Mongo の次のバージョンでテストされています :
- 3.2.6
- 3.4
プログラマ・ガイド¶
NGSISTHSink
クラス¶
他の NGSI ライクのシンクが ベース NGSISink
を拡張したように、NGSISTHSink
は、NGSIMongoBaseSink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent
オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される MongoDB コレクションを指定します。したがって、MongoBackend
実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。
public void start();
MongoBackend
の実装が作成されます。起動するシーケンスは NGSISTHSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
MongoBackend
クラス¶
これは MongoDB の便利なバックエンドクラスであり、コンテキストデータを集約されたフォーマットのままで永続化するメソッドを提供します。未加工フォーマットに関する関連する方法は、次のとおりです :
public void createDatabase(String dbName) throws Exception;
データベースが存在しない場合は、その名前でデータベースを作成します。
public void createCollection(String dbName, String collectionName) throws Exception;
指定されたデータベースにコレクションが存在しない場合は、名前を指定してコレクションを作成します。
public void insertContextDataRaw(String dbName, String collectionName, long recvTimeTs, String recvTime, String entityId, String entityType, String attrName, String attrType, String attrValue, String attrMd) throws Exception;
指定されたデータベース内の指定されたコレクション内のドキュメントのセットを更新または挿入します(ドキュメントがすでに存在するかどうかに応じて異なります)。そのようなドキュメントのセットは、単一の属性の現在および過去の通知(履歴)に関するすべての情報を含みます。履歴データはいくつかの解像度と範囲の組み合わせ(秒と分、分と時、時と日、日と月、月と年)を使用して保存されるため、ドキュメントのセットが管理されます。詳細は、Github の STH Comet を参照してください。
エンコーディングに関しては何も特別なことはありません。Cygnus は一般的に UTF-8 文字セットで動作するので、これがデータがコレクションに書き込まれる方法です。したがって、MongoDB クライアントが UTF-8 に読み込んだバイトを変換する責任を負います。
認証と認可¶
NGSIMongoSink
の現在の実装は、MongoDB エンドポイントで作成されたユーザ名とパスワードの資格情報に依存しています。