NGSIMongoSink¶
コンテンツ :
機能性¶
com.iot.telefonica.cygnus.sinks.NGSIMongoSink
、または単なる NGSIMongoSink
は、MongoDB サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の HDFSデータ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
MongoDB データ構造への NGSIEvent
s のマッピング¶
MongoDB は、Json ドキュメントのコレクションを含むデータベースにデータを編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSIMongoSink
によって利用されます。
MongoDB データベースの命名規則¶
イベント内の fiware-service
ヘッダ値と呼ばれるデータベースが作成されます(存在しない場合)。設定済みのプレフィックスがデフォルト sth_
でに追加されます。
MongoDBは、データベース名に /
, \
,.
,"
, $
を受け入れません。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。
MongoDB コレクションの命名規則¶
これらのコレクションの名前は、設定されたデータモデルと分析モードによって異なります。詳細については、構成セクションを参照してください。
- サービスパスによるデータモデル (
data_model=dm-by-service-path
)。データモデル名が示すように、通知された FIWARE service、またはNGSIRestHandler
でデフォルトで設定されたものがコレクションの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルにストアされます。設定されたプレフィックスは、コレクション名の前に付加されます - 実体によるデータモデル (
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトの FIWARE service path は、コレクション名を構成するために、通知されたエンティティ ID とタイプに連結されます。FIWARE service path がルート (/
) である場合、エンティティ ID とタイプのみが連結されます。設定されたプレフィックスは、コレクション名の前に付加されます - 属性によるデータモデル(
data_model=dm-by-attribute
)。各エンティティの属性について、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプ、および通知された属性名に連結され、コレクション名を構成します。FIWARE service path がルート (/
) である場合、エンティティ ID とタイプ、および属性名とタイプのみが連結されます。設定されたプレフィックスがコレクション名の先頭に追加されます
MongoDBはコレクション名に $
を受け入れないので、アンダースコア _
で置き換えられます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。
次の表は、テーブル名の構成をまとめたものです。古いエンコーディングである、デフォルトの sth_
プレフィックスを前提としています :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_/ |
sth_/<entityId>_<entityType> |
sth_/<entityId>_<entityType>_<attrName> |
/<svcPath> |
sth_/<svcPath> |
sth_/<svcPath>_<entityId>_<entityType> |
sth_/<svcPath>_<entityId>_<entityType>_<attrName> |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_x002f |
sth_x002fxffff<entityId>xffff<entityType> |
sth_x002fxffff<entityId>xffff<entityType>xffff<attrName> |
/<svcPath> |
sth_x002f<svcPath> |
sth_x002f<svcPath>xffff<entityId>xffff<entityType> |
sth_x002f<svcPath>xffff<entityId>xffff<entityType>xffff<attrName> |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entities
ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。
行ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが row
(デフォルトの格納モード)に設定されている場合、通知されたデータは属性ごとに格納され、それぞれの Json ドキュメントを構成します。各ドキュメントには、構成された data_model
に応じて可変数のフィールドが含まれています :
- サービスパスによるデータモデル :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)entityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプattrName
: 通知された属性名attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMetadata
: 構成ファイルngsi_agent.conf
のattr_metadata_store
がtrue
に設定されている場合にのみストアされます
- エンティティによるデータモデル :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)attrName
: 通知された属性名。attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMetadata : 構成ファイル
ngsi_agent.confの
attr_metadata_storeが
true` に設定されている場合にのみストアされます。これは Json オブジェクトです
- 属性によるデータモデル :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)attrName
: 通知された属性名。attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMetadata
: 構成ファイルngsi_agent.conf
のattr_metadata_store
がtrue
に設定されている場合にのみストアされます。これは Json オブジェクトです
列ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが column
に設定されている場合、通知されたエンティティ全体に対して単一の Json ドキュメントが作成されます。各ドキュメントには、構成された data_model
に応じて可変数のフィールドが含まれています :
- サービスパスによるデータモデル :
recvTime
: 人間が判読可能な形式の UTC タイムスタンプ。ISO 8601 に似ていますが、すべての MySQL タイムスタンプは UTC 形式になっているはずなので、UTC を示すZ
文字は使用しないでくださいfiwareServicePath
通知された パス、または デフォルトのパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプ- 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
- 通知された属性ごとに、属性名と
_md
の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます
- エンティティによるデータモデル :
recvTime
: 人間が判読可能な形式の UTC タイムスタンプ。ISO 8601 に似ていますが、すべての MySQL タイムスタンプは UTC 形式になっているはずなので、UTC を示すZ
文字は使用しないでくださいfiwareServicePath
通知された パス、または デフォルトのパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプ- 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
- 通知された属性ごとに、属性名と
_md
の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます
- 属性によるデータモデル。この組み合わせには意味がないため無効です
例¶
NGSIEvent
¶
以下の NGSIEvent
が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
データベース名とコレクション名¶
プレフィックスと通知された FIWARE service path を連結した名前の MongoDB データベースが作成されます。例えば sth_vehicles
です。
コレクション名に関して、MongoDB コレクション名は、構成されたデータモデルに応じて、以下の古いエンコーディングとなります :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_/ |
sth_/car1_car |
sth_/car1_car_speed sth_/car1_car_oil_level |
/4wheels |
sth_/4wheels |
sth_/4wheels_car1_car |
sth_/4wheels_car1_car_speed sth_/4wheels_car1_car_oil_level |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
dm-by-attribute |
---|---|---|---|
/ |
sth_x002f |
sth_x002fxffffcar1xffffcar |
sth_x002fxffffcar1xffffcarxffffspeed sth_x002fxffffcar1xffffcarxffffoil_level |
/4wheels |
sth_x002f4wheels |
sth_x002f4wheelsxffffcar1xffffcar |
sth_x002f4wheelsxffffcar1xfffcarxffffspeed sth_x002f4wheelsxffffcar1xffffcarxffffoil_level |
行ライクなストア¶
設定パラメータとして、data_model=dm-by-service-path
と attr_persistence=row
を想定すると、NGSIMongoSink
は、本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels
system.indexes
> db['sth_/4wheels'].find()
{ "_id" : ObjectId("5534d143fa701f0be751db82"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "entityId" : "car1", "entityType" : "car", "attrName" : "speed", "attrType" : "float", "attrValue" : "112.9" }
{ "_id" : ObjectId("5534d143fa701f0be751db83"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "entityId" : "car1", "entityType" : "car", "attrName" : "oil_level", "attrType" : "float", "attrValue" : "74.6" }
data_model=dm-by-entity
と aattr_persistence=row
の場合、NGSIMongoSink
は、本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels_car1_car
system.indexes
> db['sth_/4wheels_car1_car'].find()
{ "_id" : ObjectId("5534d143fa701f0be751db82"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "attrName" : "speed", "attrType" : "float", "attrValue" : "112.9" }
{ "_id" : ObjectId("5534d143fa701f0be751db83"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "attrName" : "oil_level", "attrType" : "float", "attrValue" : "74.6" }
data_model=dm-by-attribute
と attr_persistence=row
の場合、NGSIMongoSink
は、本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels_car1_car_speed
sth_/4wheels_car1_car_oil_level
system.indexes
> db['sth_/4wheels_car1_car_speed'].find()
{ "_id" : ObjectId("5534d143fa701f0be751db87"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "attrType" : "float", "attrValue" : "112.9" }
> db['sth_/4wheels_car1_oil_level'].find()
{ "_id" : ObjectId("5534d143fa701f0be751db87"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "attrType" : "float", "attrValue" : "74.6" }
列ライクなストア¶
data_model=dm-by-service-path
と attr_persistence=column
の場合、NGSIMongoSink
は、本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels
system.indexes
> db['sth_/4wheels'].find()
{ "_id" : ObjectId("5534d143fa701f0be751db86"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "entityId" : "car1", "entityType" : "car", "speed" : "112.9", "oil_level" : "74.6" }
data_model=dm-by-entity
と attr_persistence=column
の場合、NGSIMongoSink
は、本体内のデータを次のように保持します :
$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin (empty)
local 0.031GB
sth_vehicles 0.031GB
test 0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels_car1_car
system.indexes
> db['sth_/4wheels_car1_car'].find()
{"_id" : ObjectId("56337ea4c9e77c1614bfdbb7"), "recvTimeTs": "1402409899391", "recvTime" : "2015-04-20T12:13:22.41.412Z", "speed" : "112.9", "oil_level" : "74.6"}
管理ガイド¶
構成¶
NGSIMongoSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSIMongoSink |
channel | yes | N/A | |
enable_encoding | no | false | true または false。true は新しいエンコーディングを適用し、false は古いエンコーディングを適用します |
enable_grouping | no | false | 常に false |
enable_name_mappings | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true または false |
data_model | no | dm-by-entity | dm-by-service-path、dm-by-entity。dm-by-service は現在サポートされていません |
attr_persistence | no | row | row または column |
attr_metadata_store | no | false | true または false |
mongo_hosts | no | localhost:27017 | MongoDB サーバが動作する FQDN/IP:port(スタンドアロンの場合)、または MongoDB レプリカセットのメンバが実行される FQDN/IP:port ペアのカンマで区切られたリスト |
mongo_username | no | empty | 空の場合、認証は行われません |
mongo_password | no | empty | 空の場合、認証は行われません |
db_prefix | no | sth_ | |
collection_prefix | no | sth_ | system. は受け入れられません |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
data_expiration | no | 0 | コレクションは、秒単位で指定された値より古い場合は削除されます。時間の参照は、recvTime プロパティに格納されているものです。このポリシーが必要ない場合は、0 に設定します |
collections_size | no | 0 | データコレクションのサイズがバイトで指定された値より大きくなると、挿入時間に応じて最も古いデータが削除されます。サイズベースの切り捨てポリシーは時間ベースの切り捨てポリシーよりも優先されることに注意してください。このポリシーが必要ない場合は、0 に設定します。最小値(0以外)は4096バイトです |
max_documents | no | 0 | データ収集内のドキュメント数が指定した値を超えた場合、挿入時間に応じて最も古いデータが削除されます。このポリシーが必要ない場合は、0 に設定します |
ignore_white_spaces | no | true | 排他的に空白ベースの属性値を無視する必要がある場合は true、そうでない場合は false |
構成例は、次のとおりです :
cygnus-ngsi.sinks = mongo-sink
cygnus-ngsi.channels = mongo-channel
...
cygnus-ngsi.sinks.mongo-sink.type = com.telefonica.iot.cygnus.sinks.NGSIMongoSink
cygnus-ngsi.sinks.mongo-sink.channel = mongo-channel
cygnus-ngsi.sinks.mongo-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.mongo-sink.attr_persistence = column
cygnus-ngsi.sinks.mongo-sink.enable_encoding = false
cygnus-ngsi.sinks.mongo-sink.enable_grouping = false
cygnus-ngsi.sinks.mongo-sink.enable_lowercase = false
cygnus-ngsi.sinks.mongo-sink.enable_name_mappings = false
cygnus-ngsi.sinks.mongo-sink.mongo_hosts = 192.168.80.34:27017
cygnus-ngsi.sinks.mongo-sink.mongo_username = myuser
cygnus-ngsi.sinks.mongo-sink.mongo_password = mypassword
cygnus-ngsi.sinks.mongo-sink.db_prefix = cygnus_
cygnus-ngsi.sinks.mongo-sink.collection_prefix = cygnus_
cygnus-ngsi.sinks.mongo-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.mongo-sink.batch_size = 100
cygnus-ngsi.sinks.mongo-sink.batch_timeout = 30
cygnus-ngsi.sinks.mongo-sink.batch_ttl = 10
cygnus-ngsi.sinks.mongo-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.mongo-sink.data_expiration = 0
cygnus-ngsi.sinks.mongo-sink.collections_size = 0
cygnus-ngsi.sinks.mongo-sink.max_documents = 0
cygnus-ngsi.sinks.mongo-sink.ignore_white_spaces = true
ユースケース¶
中期的にはそれほど成長しない集約されたデータに関する Json ベースのドキュメントストレージを探している場合に、NGSIMongoSink
を使用します。
重要なメモ¶
バッチ処理¶
プログラマ・ガイドで説明したように、NGSIMongoSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ MongoDB コレクションに保持されます。イベントを1つずつ処理する場合は、MongoDB に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 MongoDB コレクションごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の MongoDB コレクション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSIMongoSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
recvTime
と TimeInstant
メタデータ¶
デフォルトで、NGSIMongoSink
は、通知受信タイムスタンプが保存されます。それにもかかわらず、row
モードで作業中であって、TimeInstant
というメタデータが通知された場合には、そのようなメタデータ値が受信タイムスタンプの代わりに使用されます。これは、受信時間の代わりに、TimeInstant
メタデータとして通知される測定生成時間を持続させたい場合に便利です。
エンコーディング¶
NGSIMongoSink
は、MongoDB の命名規則に従います。一言で言えば :
バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:
- データベース名は、
\
,/
,.
, '$,
"および
_としてエンコード された
` 文字 を持ちます - コレクション名には、
_
としてエンコードされた$
文字が含まれます
バージョン1.3.0(含む)から、Cygnus は MongoDB データ構造に合わせたこの特定のエンコーディングを適用します :
- 等号文字 (
=
)は、xffff
としてエンコードされます - すべての禁止された文字は、文字の Unicode の後に
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding
パラメータを使用してエンコーディングタイプを切り替えることは可能です。
サポートされている MongoDB バージョン¶
このシンクは Mongo の次のバージョンでテストされています :
- 3.2.6
- 3.4
プログラマ・ガイド¶
NGSIMongoSink
クラス¶
他の NGSI ライクのシンクが ベース NGSISink
を拡張したように、NGSIMongoSink
は、NGSIMongoBaseSink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent
オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される MongoDB コレクションを指定します。したがって、MongoBackend
実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。
public void start();
MongoBackend
の実装が作成されます。起動するシーケンスは NGSIMongoSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
NGSIMongoBackend
クラス¶
これは MongoDB の便利なバックエンドクラスであり、コンテキストデータを集約されたフォーマットのままで永続化するメソッドを提供します。未加工フォーマットに関する関連する方法は、次のとおりです :
public void createDatabase(String dbName) throws Exception;
データベースが存在しない場合は、その名前でデータベースを作成します。
public void createCollection(String dbName, String collectionName) throws Exception;
指定されたデータベースにコレクションが存在しない場合は、名前を指定してコレクションを作成します。
public void insertContextDataRaw(String dbName, String collectionName, long recvTimeTs, String recvTime, String entityId, String entityType, String attrName, String attrType, String attrValue, String attrMd) throws Exception;
指定されたデータベース内の指定されたコレクション内のドキュメントのセットを更新または挿入します(ドキュメントがすでに存在するかどうかに応じて異なります)。そのようなドキュメントのセットは、単一の属性の現在および過去の通知(履歴)に関するすべての情報を含みます。履歴データはいくつかの解像度と範囲の組み合わせ(秒と分、分と時、時と日、日と月、月と年)を使用して保存されるため、ドキュメントのセットが管理されます。詳細は、Github の STH Comet を参照してください。
エンコーディングに関しては何も特別なことはありません。Cygnus は一般的に UTF-8 文字セットで動作するので、これがデータがコレクションに書き込まれる方法です。したがって、MongoDB クライアントが UTF-8 に読み込んだバイトを変換する責任を負います。
認証と認可¶
NGSIMongoSink
の現在の実装は、MongoDB エンドポイントで作成されたユーザ名とパスワードの資格情報に依存しています。