NGSISTHSink

コンテンツ:

機能性

com.iot.telefonica.cygnus.sinks.NGSISTHSink、または単なる NGSISTHSink は、MongoDB サーバ内の NGSI ライクのコンテキストデータイベントを集約して保持するように設計されたシンクです。具体的には、これらの測定値は次のように計算されます :

  • 数値属性値の場合:
    • すべてのサンプルの合計
    • すべてのサンプルの二乗値の和
    • すべてのサンプルの中で最大値
    • すべてのサンプルの中の最小値
  • 文字列属性値のオカレンス数

STH Comet と サポートされている集計については、STH Comet Github でさらに詳しく知ることができます。

通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。

データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の HDFSデータ構造にマップする必要があります。

次のセクションでこれについて詳しく説明します。

トップ

NGSIEvent オブジェクトへの NGSI イベントのマッピング

コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent オブジェクト に変換されます。NGSIEvent は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement オブジェクトを組み合わせたものです。

これは、NGSIRestHandler のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。

トップ

MongoDB データ構造への NGSIEvents のマッピング

MongoDB は、Json ドキュメントのコレクションを含むデータベースにデータを編成します。そのような構成は、NGSIEvent が永続化されるたびに NGSISTHSink によって利用されます。

トップ

MongoDB データベースの命名規則

イベント内の fiware-service ヘッダ値と呼ばれるデータベースが作成されます(存在しない場合)。設定済みのプレフィックスがデフォルト sth_ でに追加されます。

MongoDBは、データベース名に /, \,.,", $受け入れません。これにより、enable_encoding 構成パラメーターに応じて、特定のエンコードが適用されます。

MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。

トップ

MongoDB コレクションの命名規則

これらのコレクションの名前は、設定されたデータモデルと分析モードによって異なります。詳細については、構成セクションを参照してください :

  • サービスパスによるデータモデル (data_model=dm-by-service-path)。データモデル名が示すように、通知された FIWARE service、または NGSIRestHandler でデフォルトで設定されたものがコレクションの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルにストアされます。設定されたプレフィックスは、コレクション名の前に付加され、.aggr サフィックスが追加されます
  • 実体によるデータモデル (data_model=dm-by-entity)。各エンティティについて、通知された/デフォルトの FIWARE service path は、コレクション名を構成するために、通知されたエンティティ ID とタイプに連結されます。FIWARE service path がルート (/) である場合、エンティティ ID とタイプのみが連結されます。設定されたプレフィックスは、コレクション名の前に付加され、.aggr サフィックスが追加されます
  • 属性によるデータモデル(data_model=dm-by-attribute)。各エンティティの属性について、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプ、および通知された属性名に連結され、コレクション名を構成します。FIWARE service path がルート (/) である場合、エンティティ ID とタイプ、および属性名とタイプのみが連結されます。設定されたプレフィックスがコレクション名の先頭に追加され、.aggr サフィックスが追加されます

MongoDBはコレクション名に $受け入れないので、アンダースコア _ で置き換えられます。これにより、enable_encoding 構成パラメータに応じて、特定のエンコードが適用されます。

MongoDB ネームスペース(データベース+コレクション)の名前の長さは、113バイトに制限されています。

次の表は、テーブル名の構成をまとめたものです。古いエンコーディングである、デフォルトの sth_ プレフィックスを前提としています :

FIWARE service path dm-by-service-path dm-by-entity dm-by-attribute
/ sth_/.aggr sth_/<entityId>_<entityType>.aggr sth_/<entityId>_<entityType>_<attrName>.aggr
/<svcPath> sth_/<svcPath>.aggr sth_/<svcPath>_<entityId>_<entityType>.aggr sth_/<svcPath>_<entityId>_<entityType>_<attrName>.aggr

新しいエンコーディングを使用すると :

FIWARE service path dm-by-service-path dm-by-entity dm-by-attribute
/ sth_x002f.aggr sth_x002fxffff<entityId>xffff<entityType>.aggr sth_x002fxffff<entityId>xffff<entityType>xffff<attrName>.aggr
/<svcPath> sth_x002fxffff<svcPath>.aggr sth_x002fxffff<svcPath>xffff<entityId>xffff<entityType>.aggr sth_x002fxffff<svcPath>xffff<entityId>xffff<entityType>xffff<attrName>.aggr

エンティティ ID とタイプの連結が、NGSIEvent 内の notified_entities/grouped_entities ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。

トップ

ストア

前述のように、NGSISTHSink は、エンティティとその属性に関する特定の統計情報を事前集計するために設計されています :

  • 数値属性値の場合:
    • すべてのサンプルの合計
    • すべてのサンプルの二乗値の和
    • すべてのサンプルの中で最大値
    • すべてのサンプルの中の最小値
  • 文字列属性値のオカレンス数

これは、NGSI ライクのシンクの通常の動作を変更することによって行われます。情報要素を追加する代わりに、一連の情報要素(この場合はコレクション内のJsonドキュメント)が一度作成され、多く更新されます。

NGSISTHSink が扱うそれぞれの解像度(month, day, hour, minute, second)については、少なくとも Json ドキュメントが存在します。origin については以下を参照してください。これらのドキュメントのそれぞれについて、解像度が示すもと同じ数のoffsetフィールドが存在します。例えば、"day"の解像度の場合は24のオフセット、"second"の解像度の場合は60のオフセットがあります。最初に、数値の集計の場合これらのオフセットは0、文字列の集計の場合は {} に設定され、長い通知が到着すると、通知が受信された解像度内での解像度およびオフセットに応じて更新されます。

Jsonの各ドキュメントには、各解像度ごとに1つずつ、集約された情報が適用される時間の起点もあります。たとえば、 "minute" の解像度の場合、有効な起点は 2015-03-01T13:00:00.000Z で、2015年3月1日の13時を意味します。同じ解像度の原点が異なる別の Json ドキュメントがあるかもしれませんが、たとえば 2015-03-01T14:00:00.000Z は、2015年3月1日の14時を意味します。起点は、ロケールの問題を回避するために UTC 時間を使用して格納されます。

最後に、各ドキュメントは更新に使用されたサンプル数を保存します。これは、平均などの値を取得する場合に便利です。これは、合計をサンプル数で割ったものです。

トップ

NGSIEvent

以下の NGSIEvent が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :

ngsi-event={
    headers={
         content-type=application/json,
         timestamp=1429535775,
         transactionId=1429535775-308-0000000000,
         correlationId=1429535775-308-0000000000,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         <grouping_rules_interceptor_headers>,
         <name_mappings_interceptor_headers>
    },
    body={
        entityId=car1,
        entityType=car,
        attributes=[
            {
                attrName=speed,
                attrType=float,
                attrValue=112.9
            },
            {
                attrName=oil_level,
                attrType=float,
                attrValue=74.6
            }
        ]
    }
}

トップ

データベース名とコレクション名

プレフィックスと通知された FIWARE service path を連結した名前の MongoDB データベースが作成されます。例えば sth_vehicles です。

コレクション名に関して、MongoDB コレクション名は、構成されたデータモデルに応じて、以下の古いエンコーディングとなります :

FIWARE service path dm-by-service-path dm-by-entity dm-by-attribute
/ sth_/.aggr sth_/car1_car.aggr sth_/car1_car_speed.aggr
sth_/car1_car_oil_level.aggr
/4wheels sth_/4wheels.aggr sth_/4wheels_car1_car.aggr sth_/4wheels_car1_car_speed.aggr
sth_/4wheels_car1_car_oil_level.aggr

新しいエンコーディングを使用すると :

FIWARE service path dm-by-service-path dm-by-entity dm-by-attribute
/ sth_x002f.aggr sth_x002fxffffcar1xffffcar.aggr sth_x002fxffffcar1xffffcarxffffspeed.aggr
sth_x002fxffffcar1xffffcarxffffoil_level.aggr
/4wheels sth_x002f4wheels.aggr sth_x002f4wheelsxffffcar1xffffcar.aggr sth_x002f4wheelsxffffcar1xffffcarxffffspeed.aggr
sth_x002f4wheelsxffffcar1xffffcarxffffoil_level.aggr

トップ

ストア

data_model=dm-by-entity とすべての解決策を構成パラメータと想定します (詳細は構成セクションを参照してください)。NGSISTHSink は本体内のデータを次のように保持します :

$ mongo -u myuser -p
MongoDB shell version: 2.6.9
connecting to: test
> show databases
admin              (empty)
local              0.031GB
sth_vehicles       0.031GB
test               0.031GB
> use vehicles
switched to db vehicles
> show collections
sth_/4wheels_car1_car.aggr
system.indexes
> db['sth_/4wheels_car1_car.aggr'].find()
{
    "_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T00:00:00Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 12, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
        ...,
        { "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "speed", "origin" : ISODate("2015-01-01T00:00:00Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
        ...,
        { "offset" : 3, "samples" : 0, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
        ...,
        { "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T12:13:00Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 22, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
        ...,
        { "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-20T12:00:00Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 13, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
        ...,
        { "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "speed", "origin" : ISODate("2015-04-01T00:00:00Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
    "points" : [
        { "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 20, "samples" : 1, "sum" : 112.9, "sum2" : 12746.41, "min" : 112.9, "max" : 112.9 },
        ...,
        { "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T00:00:00Z"), "resolution" : "hour", "range" : "day", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 12, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
        ...,
        { "offset" : 23, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-01-01T00:00:00Z"), "resolution" : "month", "range" : "year", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 1, "sum" : 0, "sum2" : 0, "min" : 0, "max" : 0 },
        ...,
        { "offset" : 3, "samples" : 0, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
        ...,
        { "offset" : 11, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:13:00Z"), "resolution" : "second", "range" : "minute", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 22, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
        ...,
        { "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-20T12:00:00Z"), "resolution" : "minute", "range" : "hour", "attrType" : "float" },
    "points" : [
        { "offset" : 0, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 13, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
        ...,
        { "offset" : 59, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}
{
    "_id" : { "attrName" : "oil_level", "origin" : ISODate("2015-04-01T00:00:00Z"), "resolution" : "day", "range" : "month", "attrType" : "float" },
    "points" : [
        { "offset" : 1, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity },
        ...,
        { "offset" : 20, "samples" : 1, "sum" : 74.6, "sum2" : 5565.16, "min" : 74.6, "max" : 74.6 },
        ...,
        { "offset" : 31, "samples" : 0, "sum" : 0, "sum2" : 0, "min" : Infinity, "max" : -Infinity }
    ]
}

トップ

管理ガイド

構成

NGSISTHSink は、 次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.NGSISTHSink
channel yes N/A
enable_encoding  no false true または falsetrue は新しいエンコーディングを適用し、false は古いエンコーディングを適用します
enable_grouping no false 常に false です
enable_name_mappings no false true または false。詳細については、このリンクをチェックしてください
enable_lowercase no false true または false
data_model no dm-by-entity dm-by-service-pathdm-by-entitydm-by-service は現在サポートされていません
mongo_hosts no localhost:27017 FQDN/IP : MongoDB サーバが動作するポート(スタンドアロンの場合)、または MongoDB レプリカセットのメンバが実行される FQDN/IP:port ペアのカンマで区切られたリスト
mongo_username no empty 空の場合、認証は行われません
mongo_password no empty 空の場合、認証は行われません
db_prefix no sth_
collection_prefix no sth_ system. は受け入れられません
resolutions  no  month,day,hour,minute,second データを集約するための分解能。受け入れられる値は、カンマで区切られた、月、日、時、分、秒です
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
batch_retry_intervals no 5000 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます
data_expiration no 0 コレクションは、秒単位で指定された値より古い場合は削除されます。時間の参照は、recvTimeプロパティに格納されているものです。このポリシーが必要ない場合は、0 に設定します
ignore_white_spaces no  true 排他的に空白ベースの属性値を無視する必要がある場合は true、そうでない場合は false

構成例は、次のとおりです :

cygnus-ngsi.sinks = sth-sink
cygnus-ngsi.channels = sth-channel
...
cygnus-ngsi.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.NGSISTHSink
cygnus-ngsi.sinks.sth-sink.channel = sth-channel
cygnus-ngsi.sinks.sth-sink.enable_encoding = false
cygnus-ngsi.sinks.sth-sink.enable_grouping = false
cygnus-ngsi.sinks.sth-sink.enable_lowercase = false
cygnus-ngsi.sinks.sth-sink.enable_name_mappings = false
cygnus-ngsi.sinks.sth-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.sth-sink.mongo_hosts = 192.168.80.34:27017
cygnus-ngsi.sinks.sth-sink.mongo_username = myuser
cygnus-ngsi.sinks.sth-sink.mongo_password = mypassword
cygnus-ngsi.sinks.sth-sink.db_prefix = cygnus_
cygnus-ngsi.sinks.sth-sink.collection_prefix = cygnus_
cygnus-ngsi.sinks.sth-sink.resolutions = month,day
cygnus-ngsi.sinks.sth-sink.batch_size = 100
cygnus-ngsi.sinks.sth-sink.batch_timeout = 30
cygnus-ngsi.sinks.sth-sink.batch_ttl = 10
cygnus-ngsi.sinks.sth-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.sth-sink.data_expiration = 0
cygnus-ngsi.sinks.sth-sink.ignore_white_spaces = true

トップ

ユースケース

中期的にはそれほど成長していない集約されたデータに関する Json ベースのドキュメントストレージを探している場合に、NGSISTHSink を使用します。

トップ

重要なメモ

バッチ処理

プログラマ・ガイドで説明したように、NGSISTHSink は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。

バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ MongoDB コレクションに保持されます。イベントを1つずつ処理する場合は、MongoDB に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 MongoDB コレクションごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の MongoDB コレクション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。

バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。

永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。

デフォルトで、NGSISTHSink は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。

トップ

recvTimeTimeInstant メタデータ

デフォルトで、NGSISTHSinkは、通知受信タイムスタンプが保存されます。それにもかかわらず、row モードで作業中であって、TimeInstant というメタデータが通知された場合には、そのようなメタデータ値が受信タイムスタンプの代わりに使用されます。これは、受信時間の代わりに、TimeInstant メタデータとして通知される測定生成時間を持続させたい場合に便利です。

トップ

エンコーディング

NGSIMongoSink は、MongoDB の命名規則に従います。一言で言えば :

バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:

  • データベース名は、\, /, ., '$,"および_としてエンコード された ` 文字 を持ちます
  • コレクション名には、_ としてエンコードされた $ 文字が含まれます

バージョン1.3.0(含む)から、Cygnus は MongoDB データ構造に合わせたこの特定のエンコーディングを適用します :

  • 等号文字 (=)は、xffff としてエンコードされます
  • すべての禁止された文字は、文字の Unicode の後に x 文字としてエンコードされます
  • x 文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後に xx としてエンコードされます
  • xffff は、連結文字として使用されます

将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding パラメータを使用してエンコーディングタイプを切り替えることは可能です。

トップ

サポートされている MongoDB バージョン

このシンクは Mongo の次のバージョンでテストされています :

  • 3.2.6
  • 3.4

トップ

プログラマ・ガイド

NGSISTHSink クラス

他の NGSI ライクのシンクが ベース NGSISink を拡張したように、NGSISTHSink は、NGSIMongoBaseSink を拡張します。拡張されるメソッドは次のとおりです :

void persistBatch(Batch batch) throws Exception;

Batch には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される MongoDB コレクションを指定します。したがって、MongoBackend 実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。

public void start();

MongoBackend の実装が作成されます。起動するシーケンスは NGSISTHSink() (コンストラクタ)、configure() および、start() であるため、start() メソッドではなくコンストラクタで行う必要があります。

public void configure(Context);

上記のような完全な構成が、与えられた Context インスタンスから読み取られます。

トップ

MongoBackend クラス

これは MongoDB の便利なバックエンドクラスであり、コンテキストデータを集約されたフォーマットのままで永続化するメソッドを提供します。未加工フォーマットに関する関連する方法は、次のとおりです :

public void createDatabase(String dbName) throws Exception;

データベースが存在しない場合は、その名前でデータベースを作成します。

public void createCollection(String dbName, String collectionName) throws Exception;

指定されたデータベースにコレクションが存在しない場合は、名前を指定してコレクションを作成します。

public void insertContextDataRaw(String dbName, String collectionName, long recvTimeTs, String recvTime, String entityId, String entityType, String attrName, String attrType, String attrValue, String attrMd) throws Exception;

指定されたデータベース内の指定されたコレクション内のドキュメントのセットを更新または挿入します(ドキュメントがすでに存在するかどうかに応じて異なります)。そのようなドキュメントのセットは、単一の属性の現在および過去の通知(履歴)に関するすべての情報を含みます。履歴データはいくつかの解像度と範囲の組み合わせ(秒と分、分と時、時と日、日と月、月と年)を使用して保存されるため、ドキュメントのセットが管理されます。詳細は、Github の STH Comet を参照してください。

エンコーディングに関しては何も特別なことはありません。Cygnus は一般的に UTF-8 文字セットで動作するので、これがデータがコレクションに書き込まれる方法です。したがって、MongoDB クライアントが UTF-8 に読み込んだバイトを変換する責任を負います。

トップ

認証と認可

NGSIMongoSink の現在の実装は、MongoDB エンドポイントで作成されたユーザ名とパスワードの資格情報に依存しています。

トップ