NGSIMySQLSink¶
コンテンツ :
機能性¶
com.iot.telefonica.cygnus.sinks.NGSIMySQLSink
、または単なる NGSIMySQLSink
は、MySQL サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の MySQLデータ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
MySQL データ構造への NGSIEvent
s のマッピング¶
MySQL は、データ行のテーブルを含むデータベースにデータを編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSIMySQLSink
によって利用されます。
MySQL データベースの命名規則¶
通知された fiware-service
ヘッダ値 または、このようなヘッダがない場合は、FIWARE service のデフォルト値 という名前のデータベースが作成されます (存在しない場合)。
MySQL は、英数字, $
, _
のみを受け入れます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
MySQL データベース名の長さは、64文字に制限されています。
MySQL テーブルの命名規則¶
これらのテーブルの名前は、設定されたデータモデルによって異なります。詳細については、構成セクションを参照してください :
- サービスパスによるデータモデル (
data_model=dm-by-service-path
)。データモデル名が示すように、通知された FIWARE service path またはNGSIRestHandler
のデフォルトの設定されたサービスパスがテーブルの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルに格納されます。このデータモデルに関する唯一の制約は、FIWAREサービスパスをルート (/
) にすることができないことです - 実体によるデータモデル (
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプに連結され、テーブル名を構成します。連結文字は_
(アンダースコア)です。FIWARE service path がルート (/
)である場合、エンティティ ID とタイプのみが連結されます
MySQL は、英数字, $
, _
のみを受け入れます。これにより、enable_encoding
構成パラメータに応じて、特定のエンコードが適用されます。
MySQL テーブル名の長さは 64文字に制限されています。
次の表は、古いエンコードで、テーブル名の構成をまとめたものです :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
N/A | <entityId>_<entityType> |
/<svcPath> |
<svcPath> |
<svcPath>_<entityId>_<entityType> |
次の表は、新しいエンコードで、テーブル名の構成をまとめたものです :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f |
x002fxffff<entityId>xffff<entityType> |
/<svcPath> |
x002f<svcPath> |
x002f<svcPath>xffff<entityId>xffff<entityType> |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entities
ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。
行ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが row
(デフォルトの格納モード)に設定されている場合、通知されたデータは属性ごとに格納され、それぞれのインサートを構成します。各インサートには次のフィールドがあります :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)fiwareServicePath
: 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプattrName
: 通知された属性名attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMd
: これには、Json の属性のメタデータ配列の文字列のシリアル化が含まれます。属性にメタデータがない場合、空の配列[]
が挿入されます
列ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが column
に設定されている場合、通知されたエンティティ全体に対して、次のフィールドを含むシングル・ラインが作成されます :
recvTime
: 人間が判読可能な形式の UTC タイムスタンプ。ISO 8601 に似ていますが、すべての MySQL タイムスタンプは UTC 形式になっているはずなので、UTC を示すZ
文字は使用しないでくださいfiwareServicePath
通知された パス、または デフォルトのパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプ。- 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
- 通知された属性ごとに、属性名と
_md
の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます
例¶
NGSIEvent
¶
以下の NGSIEvent
が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
データベース名とテーブル名¶
MySQL のデータベース名は常に vehicles
です。
MySQL のテーブル名は、古いエンコーディングを使用すると、設定されたデータモデルに応じて、次のものになります :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
N/A | car1_car |
/4wheels |
4wheels |
4wheels_car1_car |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f |
car1xffffcar |
/wheels |
x002f4wheels |
x002f4wheelsxffffcar1xffffcar |
行ライクなストア¶
設定パラメータとして、attr_persistence=row
を想定すると、NGSIMySQLSink
は、本体内のデータは次のように保持します :
mysql> select * from 4wheels_car1_car;
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| recvTimeTs | recvTime | fiwareServicePath | entityId | entityType | attrName | attrType | attrValue | attrMd |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels | car1 | car | speed | float | 112.9 | [] |
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels | car1 | car | oil_level | float | 74.6 | [] |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
2 row in set (0.00 sec)
列ライクなストア¶
attr_persistence=colum
の場合は、NGSIMySQLSink
は、本体内のデータを次のように保持します :
If attr_persistence=colum
then NGSIMySQLSink
will persist the data within the body as:
mysql> select * from 4wheels_car1_car;
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
| recvTime | fiwareServicePath | entityId | entityType | speed | speed_md | oil_level | oil_level_md |
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
| 2015-04-20T12:13:22.41.124 | 4wheels | car1 | car | 112.9 | [] | 74.6 | [] |
+----------------------------+-------------------+----------+------------+-------+----------+-----------+--------------+
1 row in set (0.00 sec)
管理ガイド¶
構成¶
NGSIMySQLSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSIMySQLSink である必要があります |
channel | yes | N/A | |
enable_encoding | no | false | true または false。true は新しいエンコーディングを適用し、false は古いエンコーディングを適用します |
enable_grouping | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_name_mappings | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true または false |
data_model | no | dm-by-entity | dm-by-service-path、dm-by-entity。dm-by-service は現在サポートされていません |
mysql_host | no | localhost | MySQL サーバが動作する FQDN/IP アドレス |
mysql_port | no | 3306 | |
mysql_username | no | root | root は自動的に作成されるデフォルトのユーザ名です |
mysql_password | no | N/A | デフォルト値として空の値。パスワードは自動的に作成されません |
attr_persistence | no | row | row または column |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
persistence_policy.max_records | no | -1 | テーブルにキャッピングされる前に許可されるレコードの最大数。-1 は、このポリシーを無効にします |
persistence_policy.expiration_time | no | -1 | 有効期限が切れる前にテーブル内でレコードが維持される最大秒数。-1 は、このポリシーを無効にします |
persistence_policy.checking_time | no | 3600 | シンクがレコードの有効期限をチェックする頻度(秒単位) |
設定例は次のとおりです :
cygnus-ngsi.sinks = mysql-sink
cygnus-ngsi.channels = mysql-channel
...
cygnus-ngsi.sinks.mysql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIMySQLSink
cygnus-ngsi.sinks.mysql-sink.channel = mysql-channel
cygnus-ngsi.sinks.mysql-sink.enable_encoding = false
cygnus-ngsi.sinks.mysql-sink.enable_grouping = false
cygnus-ngsi.sinks.mysql-sink.enable_lowercase = false
cygnus-ngsi.sinks.mysql-sink.enable_name_mappings = false
cygnus-ngsi.sinks.mysql-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.mysql-sink.mysql_host = 192.168.80.34
cygnus-ngsi.sinks.mysql-sink.mysql_port = 3306
cygnus-ngsi.sinks.mysql-sink.mysql_username = myuser
cygnus-ngsi.sinks.mysql-sink.mysql_password = mypassword
cygnus-ngsi.sinks.mysql-sink.attr_persistence = row
cygnus-ngsi.sinks.mysql-sink.batch_size = 100
cygnus-ngsi.sinks.mysql-sink.batch_timeout = 30
cygnus-ngsi.sinks.mysql-sink.batch_ttl = 10
cygnus-ngsi.sinks.mysql-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.mysql-sink.persistence_policy.max_records = 5
cygnus-ngsi.sinks.mysql-sink.persistence_policy.expiration_time = 86400
cygnus-ngsi.sinks.mysql-sink.persistence_policy.checking_time = 600
ユースケース¶
中期的に大きく成長しないデータベースストレージを探している場合に、NGSIMySQLSink
に使用します。
重要なメモ¶
テーブルタイプとそのグループ化ルールとの関係¶
ご覧のように、テーブルタイプ構成パラメータは、データの直接集計のためのメソッドです。 : デフォルトの宛先によって、同じエンティティに関するすべての通知は、同じ MySQL テーブル内に格納されます。または、デフォルトの service-path によって、同じ service-path に関するすべての通知は、同じ MySQL テーブルに格納されます。
グループ化機能は別の集約メカニズムですが、間接的な機能です。つまり、グループ化機能は実際にはデータを単一のテーブルに集約するわけではありません。これは、設定されたテーブルタイプ (上記参照) に基づいてシンクが実行され、デフォルトの宛先またはサービスパスが変更され、 テーブルの種類に依存しません。
たとえば、選択されたテーブルタイプが宛先であり、グループ化機能が有効になっていない場合、car
タイプの car1
とcar2
の2つの異なるエンティティ・データが、デフォルトの宛先に従って、2つの異なる MySQL テーブルに永続化されます。すなわち、それぞれ car1_car
および car2_car
です。ただし、"car
タイプのすべての車は、cars
という変更された宛先がある" というグループルールが有効になっていると、両方のエンティティデータは、cars
という単一のテーブルに永続化されます。 この例では、直接集計は宛先別にテーブルタイプによって決定されますが、間接的にグループ化ルールによって集約も決定しています。
永続化モード¶
同じ数の属性が通知されるとは限りません。これは、NGSI のような送信者へのサブスクリプションに依存します。 通知された属性ごとに固定の8フィールドの行(fixed 8-fields rows)がアップデートされるため、row
の永続化モードでは問題ありません。 それにもかかわらず、column
モードはフィールドの長さが異なる複数行の影響を受ける可能性があります。 したがって、column
モードは、サブスクリプションが最後の通知以降に更新されなかった場合、常に同じ属性、イベントを送信するように設計されている場合にのみ推奨されます。
さらに、column
モードで実行している場合、通知される属性の数(したがってデータストア内に書き込まれるフィールドの数)が Cygnus によって認識されないため、Datastore を自動的に作成することはできず、Cygnusの 実行前にプロビジョニングする必要があります。これは、通知される属性の数に関係なく、書き込まれるフィールドの数が常に一定であるため、row
モードの場合ではありません。
バッチ処理¶
プログラマ・ガイドで説明したように、NGSIMySQLSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ MySQL テーブルに保持されます。イベントを1つずつ処理する場合は、MySQL に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 MySQL テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の MySQL テーブル)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSIMySQLSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
タイムゾーン情報¶
MySQL はタイムゾーン情報を環境変数として保存するため、タイムゾーン情報は MySQL タイムスタンプに追加されません。MySQL のタイムスタンプは UTC 時刻で保存されます。
エンコーディング¶
バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:
- 英数字以外の文字はすべてアンダースコア (
_
) で置き換えられます - アンダースコア (
_
) は連結文字としても使用されています - FIWARE service paths内のスラッシュ (
/
) は無視されます
バージョン1.3.0(含む)から、Cygnus は MySQL データ構造に合わせたこの特定のエンコーディングを適用します :
- 英数字はエンコードされません
- 数字はエンコードされません
- アンダースコア文字 (
_
) はエンコードされません。 - 等号文字 (
=
)は、xffff
としてエンコードされます - FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされます- その他の文字はすべてエンコードされません
xffff
は、連結文字として使用されます
将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding
パラメータを使用してエンコーディングタイプを切り替えることは可能です。
リソースの上限/レコードの有効期限¶
キャッピング (capping)と有効期限は、デフォルトでは無効になっています。それにもかかわらず、必要に応じてこれを有効にすることができます :
- レコード数で上限を設定する。これにより、構成済みの最大レコード数(
persistence_policy.max_records
)に達するまでリソースが成長し、そのような一定数のレコードが維持されます - 時間でレコードを期限切れにする。 これにより、レコードが古くなるまで、すなわち設定された特定の有効期限(
persistence_policy.expiration_time
)を超えるまで、リソースが大きくなることが可能です
プログラマ・ガイド¶
NGSIMySQLSink
クラス¶
他の NGSI ライクのシンクと同様に、NGSIMySQLSink
、ベース NGSISink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent
オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される MySQL テーブル を指定します。したがって、MySQLBackend
実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。
void capRecords(NGSIBatch batch, long maxRecords) throws EventDeliveryException;
このメソッドは、常に persistBacth()
の直後に呼び出されます。upsert された同じデスティネーションテーブルはレコード数の点でチェックされます。更新されたテーブルのいずれかが、設定された最大値 (persistence_policy.max_records
) を超えた場合、レコードの最大数に達するまで、必要な数の最も古いレコードが削除されます。
void expirateRecords(long expirationTime);
このメソッドは、persistence_policy.checking_time
に基づいて定期的に呼び出されます。いずれかの表内のいずれかのレコードが、構成済みの有効期限 (persistence_policy.expiration_time
) を超えた場合、そのレコードは削除されます。
public void start();
MySQLBackend
の実装が作成されます。起動するシーケンスは NGSIMySQLSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
認証と認可¶
NGSIMySQLSink
の現在の実装は、MySQL エンドポイントで作成されたユーザー名とパスワードの資格情報に依存しています。