NGSIMySQLSink

コンテンツ :

機能性

com.iot.telefonica.cygnus.sinks.NGSIMySQLSink、または単なる NGSIMySQLSinkは、MySQL サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。

データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の MySQLデータ構造にマップする必要があります。

次のセクションでこれについて詳しく説明します。

トップ

NGSIEvent オブジェクトへの NGSI イベントのマッピング

コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent オブジェクト に変換されます。NGSIEvent は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement オブジェクトを組み合わせたものです。

これは、NGSIRestHandler のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。

トップ

MySQL データ構造への NGSIEvents のマッピング

MySQL は、データ行のテーブルを含むデータベースにデータを編成します。そのような構成は、NGSIEvent が永続化されるたびに NGSIMySQLSink によって利用されます。

トップ

MySQL データベースの命名規則

通知された fiware-service ヘッダ値 または、このようなヘッダがない場合は、FIWARE service のデフォルト値 という名前のデータベースが作成されます (存在しない場合)。

MySQL は、英数字, $, _ のみを受け入れます。これにより、enable_encoding 構成パラメーターに応じて、特定のエンコードが適用されます。

MySQL データベース名の長さは、64文字に制限されています。

トップ

MySQL テーブルの命名規則

これらのテーブルの名前は、設定されたデータモデルによって異なります。詳細については、構成セクションを参照してください :

  • サービスパスによるデータモデル (data_model=dm-by-service-path)。データモデル名が示すように、通知された FIWARE service path または NGSIRestHandler のデフォルトの設定されたサービスパスがテーブルの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルに格納されます。このデータモデルに関する唯一の制約は、FIWAREサービスパスをルート (/) にすることができないことです
  • 実体によるデータモデル (data_model=dm-by-entity)。各エンティティについて、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプに連結され、テーブル名を構成します。連結文字は _ (アンダースコア)です。FIWARE service path がルート (/)である場合、エンティティ ID とタイプのみが連結されます

MySQL は、英数字, $, _ のみを受け入れます。これにより、enable_encoding 構成パラメータに応じて、特定のエンコードが適用されます。

MySQL テーブル名の長さは 64文字に制限されています。

次の表は、古いエンコードで、テーブル名の構成をまとめたものです :

FIWARE service path dm-by-service-path dm-by-entity
/ N/A <entityId>_<entityType>
/<svcPath> <svcPath> <svcPath>_<entityId>_<entityType>

次の表は、新しいエンコードで、テーブル名の構成をまとめたものです :

FIWARE service path dm-by-service-path dm-by-entity
/ x002f x002fxffff<entityId>xffff<entityType>
/<svcPath> x002f<svcPath> x002f<svcPath>xffff<entityId>xffff<entityType>

エンティティ ID とタイプの連結が、NGSIEvent 内の notified_entities/grouped_entities ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。

トップ

行ライクなストア

上記のコレクションに格納されている特定のデータに関して、attr_persistence パラメータが row (デフォルトの格納モード)に設定されている場合、通知されたデータは属性ごとに格納され、それぞれのインサートを構成します。各インサートには次のフィールドがあります :

  • recvTimeTsUTC : タイムスタンプ(ミリ秒単位)
  • recvTime : 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)
  • fiwareServicePath : 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパス
  • entityId : 通知されたエンティティ ID
  • entityType : 通知されたエンティティタイプ
  • attrName : 通知された属性名
  • attrType : 通知された属性タイプ
  • attrValue : 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性があります
  • attrMd : これには、Json の属性のメタデータ配列の文字列のシリアル化が含まれます。属性にメタデータがない場合、空の配列 []が挿入されます

トップ

列ライクなストア

上記のコレクションに格納されている特定のデータに関して、attr_persistence パラメータが column に設定されている場合、通知されたエンティティ全体に対して、次のフィールドを含むシングル・ラインが作成されます :

  • recvTime : 人間が判読可能な形式の UTC タイムスタンプ。ISO 8601 に似ていますが、すべての MySQL タイムスタンプは UTC 形式になっているはずなので、UTC を示す Z 文字は使用しないでください
  • fiwareServicePath 通知された パス、または デフォルトのパス
  • entityId : 通知されたエンティティ ID
  • entityType : 通知されたエンティティタイプ。
  • 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
  • 通知された属性ごとに、属性名と _md の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます

トップ

NGSIEvent

以下の NGSIEvent が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :

ngsi-event={
    headers={
         content-type=application/json,
         timestamp=1429535775,
         transactionId=1429535775-308-0000000000,
         correlationId=1429535775-308-0000000000,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         <grouping_rules_interceptor_headers>,
         <name_mappings_interceptor_headers>
    },
    body={
        entityId=car1,
        entityType=car,
        attributes=[
            {
                attrName=speed,
                attrType=float,
                attrValue=112.9
            },
            {
                attrName=oil_level,
                attrType=float,
                attrValue=74.6
            }
        ]
    }
}

トップ

データベース名とテーブル名

MySQL のデータベース名は常に vehicles です。

MySQL のテーブル名は、古いエンコーディングを使用すると、設定されたデータモデルに応じて、次のものになります :

FIWARE service path dm-by-service-path dm-by-entity
/ N/A car1_car
/4wheels 4wheels 4wheels_car1_car

新しいエンコーディングを使用すると :

FIWARE service path dm-by-service-path dm-by-entity
/ x002f car1xffffcar
/wheels x002f4wheels x002f4wheelsxffffcar1xffffcar

トップ

行ライクなストア

設定パラメータとして、attr_persistence=row を想定すると、NGSIMySQLSinkは、本体内のデータは次のように保持します :

mysql> select * from 4wheels_car1_car;
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| recvTimeTs | recvTime                   | fiwareServicePath | entityId | entityType | attrName    | attrType  | attrValue | attrMd |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels           | car1     | car        |  speed      | float     | 112.9     | []     |
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels           | car1     | car        |  oil_level  | float     | 74.6      | []     |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
2 row in set (0.00 sec)

トップ

列ライクなストア

attr_persistence=colum の場合は、NGSIMySQLSink は、本体内のデータを次のように保持します : If attr_persistence=colum then NGSIMySQLSink will persist the data within the body as:

mysql> select * from 4wheels_car1_car;
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
| recvTime                   | fiwareServicePath | entityId | entityType | speed | speed_md | oil_level | oil_level_md |
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
| 2015-04-20T12:13:22.41.124 | 4wheels           | car1     | car        | 112.9 | []       |  74.6     | []           |
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
1 row in set (0.00 sec)

トップ

管理ガイド

構成

NGSIMySQLSink は、次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.NGSIMySQLSink である必要があります
channel yes N/A
enable_encoding  no false true または falsetrue は新しいエンコーディングを適用し、false は古いエンコーディングを適用します
enable_grouping no false true または false。詳細については、このリンクをチェックしてください
enable_name_mappings no false true または false。詳細については、このリンクをチェックしてください
enable_lowercase no false true または false
data_model no dm-by-entity dm-by-service-pathdm-by-entitydm-by-service は現在サポートされていません
mysql_host no localhost MySQL サーバが動作する FQDN/IP アドレス
mysql_port no 3306
mysql_username no root root は自動的に作成されるデフォルトのユーザ名です
mysql_password no N/A デフォルト値として空の値。パスワードは自動的に作成されません
attr_persistence no row row または column
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
batch_retry_intervals no 5000 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます
persistence_policy.max_records no -1 テーブルにキャッピングされる前に許可されるレコードの最大数。-1 は、このポリシーを無効にします
persistence_policy.expiration_time no -1 有効期限が切れる前にテーブル内でレコードが維持される最大秒数。-1 は、このポリシーを無効にします
persistence_policy.checking_time no 3600 シンクがレコードの有効期限をチェックする頻度(秒単位)

設定例は次のとおりです :

cygnus-ngsi.sinks = mysql-sink
cygnus-ngsi.channels = mysql-channel
...
cygnus-ngsi.sinks.mysql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIMySQLSink
cygnus-ngsi.sinks.mysql-sink.channel = mysql-channel
cygnus-ngsi.sinks.mysql-sink.enable_encoding = false
cygnus-ngsi.sinks.mysql-sink.enable_grouping = false
cygnus-ngsi.sinks.mysql-sink.enable_lowercase = false
cygnus-ngsi.sinks.mysql-sink.enable_name_mappings = false
cygnus-ngsi.sinks.mysql-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.mysql-sink.mysql_host = 192.168.80.34
cygnus-ngsi.sinks.mysql-sink.mysql_port = 3306
cygnus-ngsi.sinks.mysql-sink.mysql_username = myuser
cygnus-ngsi.sinks.mysql-sink.mysql_password = mypassword
cygnus-ngsi.sinks.mysql-sink.attr_persistence = row
cygnus-ngsi.sinks.mysql-sink.batch_size = 100
cygnus-ngsi.sinks.mysql-sink.batch_timeout = 30
cygnus-ngsi.sinks.mysql-sink.batch_ttl = 10
cygnus-ngsi.sinks.mysql-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.mysql-sink.persistence_policy.max_records = 5
cygnus-ngsi.sinks.mysql-sink.persistence_policy.expiration_time = 86400
cygnus-ngsi.sinks.mysql-sink.persistence_policy.checking_time = 600

トップ

ユースケース

中期的に大きく成長しないデータベースストレージを探している場合に、NGSIMySQLSink に使用します。

トップ

重要なメモ

テーブルタイプとそのグループ化ルールとの関係

ご覧のように、テーブルタイプ構成パラメータは、データの直接集計のためのメソッドです。 : デフォルトの宛先によって、同じエンティティに関するすべての通知は、同じ MySQL テーブル内に格納されます。または、デフォルトの service-path によって、同じ service-path に関するすべての通知は、同じ MySQL テーブルに格納されます。

グループ化機能は別の集約メカニズムですが、間接的な機能です。つまり、グループ化機能は実際にはデータを単一のテーブルに集約するわけではありません。これは、設定されたテーブルタイプ (上記参照) に基づいてシンクが実行され、デフォルトの宛先またはサービスパスが変更され、 テーブルの種類に依存しません。

たとえば、選択されたテーブルタイプが宛先であり、グループ化機能が有効になっていない場合、car タイプの car1car2 の2つの異なるエンティティ・データが、デフォルトの宛先に従って、2つの異なる MySQL テーブルに永続化されます。すなわち、それぞれ car1_car および car2_car です。ただし、"car タイプのすべての車は、cars という変更された宛先がある" というグループルールが有効になっていると、両方のエンティティデータは、cars という単一のテーブルに永続化されます。 この例では、直接集計は宛先別にテーブルタイプによって決定されますが、間接的にグループ化ルールによって集約も決定しています。

トップ

永続化モード

同じ数の属性が通知されるとは限りません。これは、NGSI のような送信者へのサブスクリプションに依存します。 通知された属性ごとに固定の8フィールドの行(fixed 8-fields rows)がアップデートされるため、row の永続化モードでは問題ありません。 それにもかかわらず、column モードはフィールドの長さが異なる複数行の影響を受ける可能性があります。 したがって、column モードは、サブスクリプションが最後の通知以降に更新されなかった場合、常に同じ属性、イベントを送信するように設計されている場合にのみ推奨されます。

さらに、column モードで実行している場合、通知される属性の数(したがってデータストア内に書き込まれるフィールドの数)が Cygnus によって認識されないため、Datastore を自動的に作成することはできず、Cygnusの 実行前にプロビジョニングする必要があります。これは、通知される属性の数に関係なく、書き込まれるフィールドの数が常に一定であるため、row モードの場合ではありません。

トップ

バッチ処理

プログラマ・ガイドで説明したように、NGSIMySQLSink は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。

バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ MySQL テーブルに保持されます。イベントを1つずつ処理する場合は、MySQL に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 MySQL テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の MySQL テーブル)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。

バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。

永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。

デフォルトで、NGSIMySQLSink は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。

トップ

タイムゾーン情報

MySQL はタイムゾーン情報を環境変数として保存するため、タイムゾーン情報は MySQL タイムスタンプに追加されません。MySQL のタイムスタンプは UTC 時刻で保存されます。

トップ

エンコーディング

バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:

  • 英数字以外の文字はすべてアンダースコア (_) で置き換えられます
  • アンダースコア (_) は連結文字としても使用されています
  • FIWARE service paths内のスラッシュ (/) は無視されます

バージョン1.3.0(含む)から、Cygnus は MySQL データ構造に合わせたこの特定のエンコーディングを適用します :

  • 英数字はエンコードされません
  • 数字はエンコードされません
  • アンダースコア文字 (_) はエンコードされません。
  • 等号文字 (=)は、xffff としてエンコードされます
  • FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の x 文字としてエンコードされます
  • x 文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後に xx としてエンコードされます
  • その他の文字はすべてエンコードされません
  • xffff は、連結文字として使用されます

将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding パラメータを使用してエンコーディングタイプを切り替えることは可能です。

トップ

リソースの上限/レコードの有効期限

キャッピング (capping)と有効期限は、デフォルトでは無効になっています。それにもかかわらず、必要に応じてこれを有効にすることができます :

  • レコード数で上限を設定する。これにより、構成済みの最大レコード数(persistence_policy.max_records)に達するまでリソースが成長し、そのような一定数のレコードが維持されます
  • 時間でレコードを期限切れにする。 これにより、レコードが古くなるまで、すなわち設定された特定の有効期限(persistence_policy.expiration_time)を超えるまで、リソースが大きくなることが可能です

トップ

プログラマ・ガイド

NGSIMySQLSink クラス

他の NGSI ライクのシンクと同様に、NGSIMySQLSink、ベース NGSISink を拡張します。拡張されるメソッドは次のとおりです :

void persistBatch(Batch batch) throws Exception;

Batch には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される MySQL テーブル を指定します。したがって、MySQLBackend 実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。

void capRecords(NGSIBatch batch, long maxRecords) throws EventDeliveryException;

このメソッドは、常に persistBacth() の直後に呼び出されます。upsert された同じデスティネーションテーブルはレコード数の点でチェックされます。更新されたテーブルのいずれかが、設定された最大値 (persistence_policy.max_records) を超えた場合、レコードの最大数に達するまで、必要な数の最も古いレコードが削除されます。

void expirateRecords(long expirationTime);

このメソッドは、persistence_policy.checking_time に基づいて定期的に呼び出されます。いずれかの表内のいずれかのレコードが、構成済みの有効期限 (persistence_policy.expiration_time) を超えた場合、そのレコードは削除されます。

public void start();

MySQLBackend の実装が作成されます。起動するシーケンスは NGSIMySQLSink() (コンストラクタ)、configure() および、start() であるため、start() メソッドではなくコンストラクタで行う必要があります。

public void configure(Context);

上記のような完全な構成が、与えられた Context インスタンスから読み取られます。

トップ

認証と認可

NGSIMySQLSink の現在の実装は、MySQL エンドポイントで作成されたユーザー名とパスワードの資格情報に依存しています。

トップ