新しい NGSI シンクを追加する開発ガイド

コンテンツ :

イントロダクション

cygnus-ngsi は、Flume シンクを使用して特定のストレージ内の NGSI コンテキストデータの永続性を可能にします。現在のシンクのコレクションが目的に合わせて制限されていれば、あなたが選択した永続化テクノロジに関する独自のシンクを追加して公式の cygnus-ngsi コントリビュータになることができます。

このドキュメントでは、シンク・コードの記述方法に関するガイドラインと、異なるクラスの呼び出し方法、使用できるバックエンドなどを提供することで、このような代替シンクの開発についてご案内します。

トップ

ベース NGSISink クラス

NGSISink は、cygnus-ngsi 拡張内のすべてのシンクの基本クラスです。cygnus-commonCygnusSink クラスから継承する抽象クラスです。Flume のネイティブ AbstractSink を拡張しています。

NGSISink は、 NGSIライクのシンクで必要とされるほとんどのロジックを提供します :

  • すべてのシンクに共通するパラメータの設定
  • シンクの開始と停止
  • Flumeトランザクションのオープン、コミット、クローズを含む、バッチのようなアプローチで、Flume イベント の消費
  • 統計情報のカウンタ。実際には、この機能は CygnusSink によって提供されます。

次のパスに、このクラスがあります :

fiware-cygnus/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java

トップ

継承された構成

NGSISink を拡張するすべてのシンクは、次の構成パラメータを継承します :

Parameter Mandatory Default value Comments
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用します。、無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
data_model no dm-by-entity 受け入れられる値 : dm-by-service, dm-by-service-path, dm-by-entity および dm-by-attribute
enable_grouping no false 受け入れられる値 : true または false
enable_lowercase no false 受け入れられる値 : true または false

これらのパラメータは、configure(Context) メソッドで読み込まれ、必要に応じてデフォルトになります。

トップ

継承された開始と停止

未定

トップ

継承されたイベントの消費

NGSISink の最も重要な部分は、イベントがバッチのようなアプローチで消費される場所です。これは、AbstractSink から継承された process() メソッドで行われます。これは上書きされます。

このようなイベント処理は、Flume トランザクションを開き、batch_size パラメータで指定されたイベントを読み取ることによって行われます。十分なイベントがない場合、batch_timeout に達すると蓄積が終了します。読み取られたイベントごとに、トランザクションがコミットされます。 累積が終了すると、トランザクションは終了します。

process() メソッドは、例外をキャッチして Flume トランザクション中に発生する可能性のあるすべてのエラーを処理することに注意してください。cygnus-common には、使用が必須である、Cygnus 関連の例外のコレクションが存在します :

fiware-cygnus/cygnus-common/src/main/java/com/telefonica/iot/cygnus/errors/

終了すると、通知された/グループ化された宛先ごとにサブバッチを内部的に保持するNGSIBatchオブジェクトが蓄積されます。Flume イベント・オブジェクトの notified-destinations および grouped-destinations ヘッダは、設定された enable_grouping 値に応じて、サブバッチを作成するために検査されます。この NGSIBatch オブジェクト内の情報は、新しいシンクの特定の実装が永続化しなければならないものです。

特定の永続性ロジックは、NGSISink 内の唯一の抽象メソッドを上書きすることによって実装されます。すなわち、persistBatch(NGSIBatch) :

abstract void persistBatch(NGSIBatch) throws Exception;

トップ

継承されたカウンタ

NGSISinkCygnusSink を拡張しているため、NGSISink を拡張しているシンクの統計情報を取得するために、次のカウンタを利用できます :

  • 処理されたイベントの数 : チャネルから取得され、永続性のためにバッチに蓄積されたイベントの数
  • 保存されたイベントの数 : 最終的なストレージに最後に書き込まれた/挿入された/追加されたバッチ内のイベントの数

トップ

新しいシンククラス

特定の構成

NGSISinkconfigure(Context) メソッドは、特定の設定パラメータの読み込んで拡張できます(必要に応じて、デフォルト設定)。

トップ

永続化される情報の種類

通常、Cygnus シンクに保持されるフィールドのリストが含まれています :

  • 通知の受信時間(ミリ秒単位)
  • 人が判読可能な形式での通知の受信時刻
  • 通知された/グループ化された FIWARE service path
  • エンティティID
  • エンティティタイプ
  • 属性と属性のメタデータ
  • 属性とそのメタデータに関しては、2つのオプションのどちらかを、または両方ともを、切り替え構成パラメータを使用して、選択できます
  • row フォーマット : 属性およびメタデータごとに write/insertion/upsert
  • column フォーマット : すべての属性とそのメタデータを含む単一の write/insertion/upsert

トップ

特定のデータ構造に合わせる

特定のデータ構造がどのように作成されるべきかを簡単にコメントする価値はあります。

通常、クライアント/テナントを定義する通知されたサービスは、ユーザごとの名前空間を定義するストレージ要素にマップする必要があります。たとえば、MySQL, PostgreSQL, MongoDB, および STH では、サービスはユーザ・レベルで権限を定義できる特定のデータベースにマップされます。CKANでは、このサービスは organization にマップされます。他のケースでは、HDFS のように、サービスが hdfs://user/ の下のフォルダにマップされるように、マッピングはそれほど明白ではありません。また、DynamoDBやKafkaの場合のように、パーシスタンス要素名 (テーブルとトピック) の一部としてのみサービスを追加することもできます。

通知されたサービスパスに関しては、通常、データが実際に書き込まれる宛先名(ファイル、テーブル、リソース、コレクション、トピック)のプレフィックスとして含まれます。これは、HDFS と CKAN を除くすべてのシンクのケースです。HDFS はサービスパスを hdfs://user/service 下のサブフォルダとしてマップし、CKAN はサービスパスをパッケージとしてマップします。

特に重要なのは、ルートサービスパス(/)です。この場合、宛先名にプレフィックスを付けるときにサービスパスを考慮しないでください。それは禁止された文字であるためです。

最後に、グループ化ルールを使用してこのデフォルトの動作を上書きしない限り、すべてのエンティティを区別するために、エンティティ ID とタイプの連結をデフォルトの宛先名として使用する必要があります

トップ

バックエンドのコンビニエンス・クラス

通知されたコンテキストデータを永続化するために必要なすべてのロジックを、持続抽象メソッドでコーディングすることはできません。この場合、最終的なバックエンドとの詳細なやり取りをラップするバックエンドクラスまたはクラスセットを作成することができます。 それにもかかわらず、これらのクラスは、 cygnus-ngsi ではなく、cygnus-common に配置すべきです。

トップ

新しいシンクの命名と配置

新しいシンククラスは、NGSI<technology>Sink と呼ばれる必要があります。technology とは永続性バックエンドの名前です。例として、既存のシンク NGSIHDFSSink, NGSICKANSink, NGSIMySQLSink などがあります。

新しいシンククラスの場所については、次のようにする必要があります :

fiware-cygnus/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/

すでに説明したように、バックエンドは次の場所に配置する必要があります :

fiware-cygnus/cygnus-common/src/main/java/com/telefonica/iot/cygnus/backends/<technology>/

トップ