新しい twitter シンクの開発ガイドの追加

コンテンツ :

イントロダクション

cygnus-twitter は、Flume シンクによって、特定のストレージ内の twitterのコンテキスト・データの永続性を可能にします。現在のシンクのコレクションが目的に合わせて制限されていれば、選択した永続化テクノロジに関する独自のシンクを追加して公式の cygnus-twitter コントリビュータになることができます。

このドキュメントでは、シンクコードの記述方法に関するガイドラインと、異なるクラスの呼び出し方法、使用できるバックエンドなどを提供することで、このような代替シンクの開発についてご案内します。

トップ

ベース TwitterSink クラス

TwitterSinkは、 cygnus-twitter 拡張内のすべてのシンクの基本クラスです。cygnus-commonCygnusSink クラスから継承する抽象クラスです。Flume のネイティブ AbstractSink を拡張しています。

TwitterSink は、 Twitterライクのシンクで必要とされるほとんどのロジックを提供します :

  • すべてのシンクに共通するパラメータの設定
  • シンクの開始と停止
  • Flume イベントは、Flume トランザクションのオープン、コミット、クローズを含む、バッチのようなアプローチでイベントを消費します。
  • 統計のカウンタ。実際には、この機能は CygnusSink によって与えられます。

このクラスは、次のパスにあります :

fiware-cygnus/cygnus-twitter/src/main/java/com/telefonica/iot/cygnus/sinks/TwitterSink.java

トップ

継承された構成

TwitterSink を拡張するすべてのシンクは、次の構成パラメータを継承します :

パラメータ 必須 デフォルト値 コメント
batch_size no 1 永続性の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま持続される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用します。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
enable_lowercase no false 受け入れられる値 : true または false

これらのパラメータは、configure(Context) メソッドで読み込まれ。必要に応じてデフォルトになります。

トップ

継承された開始と停止

未定

トップ

継承されたイベントの消費

TwitterSink の最も重要な部分は、イベントがバッチのようなアプローチで消費される場所です。これは、AbstractSinkから継承された process() メソッドで行われ、上書きされます。

このようなイベント処理は、Flume トランザクションを開き、batch_size パラメータで指定されたイベントを読み取ることによって行われます(十分なイベントがない場合は、batch_timeout に達すると蓄積が終了します。読み取られたイベントごとに、トランザクションがコミットされます。累積が終了すると、トランザクションは終了します。

process() メソッドは、例外をキャッチして Flume トランザクション中に発生する可能性のあるすべてのエラーを処理することに注意してください。cygnus-common には、使用が必須であるCygnus 関連の例外のコレクションが存在します :

fiware-cygnus/cygnus-common/src/main/java/com/telefonica/iot/cygnus/errors/

特定の永続性ロジックは、TwitterSink 内の唯一の抽象メソッドを上書きすることによって実装されます。すなわち persistBatch(TwitterBatch) :

abstract void persistBatch(TwitterBatch) throws Exception;

トップ

継承されたカウンタ

TwitterSinkCygnusSink を拡張しているので、TwitterSink を拡張するすべてのシンクの統計情報を取得するために、次のカウンタが既に利用可能です:

  • 処理されたイベントの数 : つまりチャネルから取得され、永続性のためにバッチに蓄積されたイベントの数
  • 保存されたイベントの数 : つまり最終的なストレージに最後に書き込まれた/挿入された/追加されたバッチ内のイベントの数

トップ

新しいシンククラス

特定の構成

TwitterSinkconfigure(Context) メソッドは、特定の構成パラメータの読取り値および必要に応じてデフォルト値で拡張することができます。

トップ

特定のデータ構造に合わせる

ツイートを HDFS に保存するために、agent_ <id> .conf 設定ファイルに hdfs_folder プロパティと hdfs_files プロパティを追加してストレージの場所を指定しました。 他の種類のシンクでは、必要なプロパティを設定ファイルに追加する必要があり、シンクのコードはこの情報を適切な方法で処理する必要があります。

トップ

バックエンドのコンビニエンス・クラス

通知されたコンテキスト・データを永続化するために必要なすべてのロジックを、持続抽象メソッドでコーディングすることはできません。この場合、バックエンド・クラスまたは最終的なバックエンドと詳細なやりとりをラップするクラスのセットを作成することができます。それにもかかわらず、これらのクラスは、cygnus-twitter ではなく、cygnus-common にあるべきです。

トップ

新しいクラスの命名と配置

新しいシンククラスは、永続性バックエンドの名前の technologyである Twitter <technology>Sink と呼ばれる必要があります。例としては、既に存在するシンクのTwitterHDFSSink があります。

新しいシンク・クラスの場所については、次のようにする必要があります :

fiware-cygnus/cygnus-twitter/src/main/java/com/telefonica/iot/cygnus/sinks/

すでに説明したように、バックエンドは次の場所に配置する必要があります :

fiware-cygnus/cygnus-twitter/src/main/java/com/telefonica/iot/cygnus/backends/<technology>/

トップ