新しい NGSI シンクを追加する開発ガイド¶
コンテンツ :
イントロダクション¶
cygnus-ngsi
は、Flume シンクを使用して特定のストレージ内の NGSI コンテキストデータの永続性を可能にします。現在のシンクのコレクションが目的に合わせて制限されていれば、あなたが選択した永続化テクノロジに関する独自のシンクを追加して公式の cygnus-ngsi
コントリビュータになることができます。
このドキュメントでは、シンク・コードの記述方法に関するガイドラインと、異なるクラスの呼び出し方法、使用できるバックエンドなどを提供することで、このような代替シンクの開発についてご案内します。
ベース NGSISink
クラス¶
NGSISink
は、cygnus-ngsi
拡張内のすべてのシンクの基本クラスです。cygnus-common
のCygnusSink
クラスから継承する抽象クラスです。Flume のネイティブ AbstractSink
を拡張しています。
NGSISink
は、 NGSIライクのシンクで必要とされるほとんどのロジックを提供します :
- すべてのシンクに共通するパラメータの設定
- シンクの開始と停止
- Flumeトランザクションのオープン、コミット、クローズを含む、バッチのようなアプローチで、Flume イベント の消費
- 統計情報のカウンタ。実際には、この機能は
CygnusSink
によって提供されます。
次のパスに、このクラスがあります :
fiware-cygnus/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java
継承された構成¶
NGSISink
を拡張するすべてのシンクは、次の構成パラメータを継承します :
Parameter | Mandatory | Default value | Comments |
---|---|---|---|
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用します。、無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
data_model | no | dm-by-entity | 受け入れられる値 : dm-by-service, dm-by-service-path, dm-by-entity および dm-by-attribute |
enable_grouping | no | false | 受け入れられる値 : true または false |
enable_lowercase | no | false | 受け入れられる値 : true または false |
これらのパラメータは、configure(Context)
メソッドで読み込まれ、必要に応じてデフォルトになります。
継承された開始と停止¶
未定
継承されたイベントの消費¶
NGSISink
の最も重要な部分は、イベントがバッチのようなアプローチで消費される場所です。これは、AbstractSink
から継承された process()
メソッドで行われます。これは上書きされます。
このようなイベント処理は、Flume トランザクションを開き、batch_size
パラメータで指定されたイベントを読み取ることによって行われます。十分なイベントがない場合、batch_timeout
に達すると蓄積が終了します。読み取られたイベントごとに、トランザクションがコミットされます。 累積が終了すると、トランザクションは終了します。
process()
メソッドは、例外をキャッチして Flume トランザクション中に発生する可能性のあるすべてのエラーを処理することに注意してください。cygnus-common
には、使用が必須である、Cygnus 関連の例外のコレクションが存在します :
fiware-cygnus/cygnus-common/src/main/java/com/telefonica/iot/cygnus/errors/
終了すると、通知された/グループ化された宛先ごとにサブバッチを内部的に保持するNGSIBatchオブジェクトが蓄積されます。Flume イベント・オブジェクトの notified-destinations
および grouped-destinations
ヘッダは、設定された enable_grouping
値に応じて、サブバッチを作成するために検査されます。この NGSIBatch
オブジェクト内の情報は、新しいシンクの特定の実装が永続化しなければならないものです。
特定の永続性ロジックは、NGSISink
内の唯一の抽象メソッドを上書きすることによって実装されます。すなわち、persistBatch(NGSIBatch)
:
abstract void persistBatch(NGSIBatch) throws Exception;
継承されたカウンタ¶
NGSISink
は CygnusSink
を拡張しているため、NGSISink
を拡張しているシンクの統計情報を取得するために、次のカウンタを利用できます :
- 処理されたイベントの数 : チャネルから取得され、永続性のためにバッチに蓄積されたイベントの数
- 保存されたイベントの数 : 最終的なストレージに最後に書き込まれた/挿入された/追加されたバッチ内のイベントの数
新しいシンククラス¶
特定の構成¶
NGSISink
の configure(Context)
メソッドは、特定の設定パラメータの読み込んで拡張できます(必要に応じて、デフォルト設定)。
永続化される情報の種類¶
通常、Cygnus シンクに保持されるフィールドのリストが含まれています :
- 通知の受信時間(ミリ秒単位)
- 人が判読可能な形式での通知の受信時刻
- 通知された/グループ化された FIWARE service path
- エンティティID
- エンティティタイプ
- 属性と属性のメタデータ
- 属性とそのメタデータに関しては、2つのオプションのどちらかを、または両方ともを、切り替え構成パラメータを使用して、選択できます
- row フォーマット : 属性およびメタデータごとに write/insertion/upsert
- column フォーマット : すべての属性とそのメタデータを含む単一の write/insertion/upsert
特定のデータ構造に合わせる¶
特定のデータ構造がどのように作成されるべきかを簡単にコメントする価値はあります。
通常、クライアント/テナントを定義する通知されたサービスは、ユーザごとの名前空間を定義するストレージ要素にマップする必要があります。たとえば、MySQL, PostgreSQL, MongoDB, および STH では、サービスはユーザ・レベルで権限を定義できる特定のデータベースにマップされます。CKANでは、このサービスは organization にマップされます。他のケースでは、HDFS のように、サービスが hdfs://user/
の下のフォルダにマップされるように、マッピングはそれほど明白ではありません。また、DynamoDBやKafkaの場合のように、パーシスタンス要素名 (テーブルとトピック) の一部としてのみサービスを追加することもできます。
通知されたサービスパスに関しては、通常、データが実際に書き込まれる宛先名(ファイル、テーブル、リソース、コレクション、トピック)のプレフィックスとして含まれます。これは、HDFS と CKAN を除くすべてのシンクのケースです。HDFS はサービスパスを hdfs://user/service
下のサブフォルダとしてマップし、CKAN はサービスパスをパッケージとしてマップします。
特に重要なのは、ルートサービスパス(/
)です。この場合、宛先名にプレフィックスを付けるときにサービスパスを考慮しないでください。それは禁止された文字であるためです。
最後に、グループ化ルールを使用してこのデフォルトの動作を上書きしない限り、すべてのエンティティを区別するために、エンティティ ID とタイプの連結をデフォルトの宛先名として使用する必要があります
バックエンドのコンビニエンス・クラス¶
通知されたコンテキストデータを永続化するために必要なすべてのロジックを、持続抽象メソッドでコーディングすることはできません。この場合、最終的なバックエンドとの詳細なやり取りをラップするバックエンドクラスまたはクラスセットを作成することができます。 それにもかかわらず、これらのクラスは、 cygnus-ngsi
ではなく、cygnus-common
に配置すべきです。
新しいシンクの命名と配置¶
新しいシンククラスは、NGSI<technology>Sink
と呼ばれる必要があります。technology とは永続性バックエンドの名前です。例として、既存のシンク NGSIHDFSSink
, NGSICKANSink
, NGSIMySQLSink
などがあります。
新しいシンククラスの場所については、次のようにする必要があります :
fiware-cygnus/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/
すでに説明したように、バックエンドは次の場所に配置する必要があります :
fiware-cygnus/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/<technology>/