NGSIPostgreSQLSink¶
コンテンツ:
Functionality¶
com.iot.telefonica.cygnus.sinks.NGSIPostgreSQLSink
、または単なる NGSIPostgreSQLSink
は、PostgreSQL サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語 を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の PostgreSQL 構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
PostgreSQL データ構造への NGSIEvent
s のマッピング¶
PostgreSQL は、データ行のテーブルを含むデータベースにデータを編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSIPostgreSQLSink
によって利用されます。
PostgreSQL データベースの命名規則¶
PostgreSQL の操作の前には、使用するデータベースを作成する必要があります。
PostgreSQLは、英数字とアンダースコア(_
)のみを受け入れます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
PostgreSQLのデータベース名の長さは、63文字に制限されています。
PostgreSQL スキーマの命名規則¶
通知された fiware-service
ヘッダ値 または、このようなヘッダがない場合は、FIWARE service のデフォルト値 という名前のスキーマが作成されます (存在しない場合)。
PostgreSQLは、英数字とアンダースコア(_
)のみを受け入れます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
PostgreSQLのスキーマ名の長さは、63文字に制限されています。
PostgreSQL テーブルの命名規則¶
これらのテーブルの名前は、設定されたデータモデルによって異なります。詳細については、構成セクションを参照してください :
- サービスパスによるデータモデル (
data_model=dm-by-service-path
)。データモデル名が示すように、通知された FIWARE service path またはNGSIRestHandler
のデフォルトの設定されたサービスパスがテーブルの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルに格納されます。このデータモデルに関する唯一の制約は、FIWAREサービスパスをルート (/
) にすることができないことです - 実体によるデータモデル (
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトの FIWARE service path は、通知されたエンティティ ID とタイプに連結され、テーブル名を構成します。FIWARE service path がルート (/
)である場合、エンティティ ID とタイプのみが連結されます
PostgreSQLは、英数字とアンダースコア(_
)のみを受け入れます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
PostgreSQL のテーブル名の長さは、63文字に制限されています。
次の表は、古いエンコードで、テーブル名の構成をまとめたものです :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
N/A | <entityId>_<entityType> |
/<svcPath> |
<svcPath> |
<svcPath>_<entityId>_<entityType> |
次の表は、新しいエンコードで、テーブル名の構成をまとめたものです :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f |
x002fxffff<entityId>xffff<entityType> |
/<svcPath> |
x002f<svcPath> |
x002f<svcPath>xffff<entityId>xffff<entityType> |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entities
ヘッダ値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)で既に指定されていることを確認してください。
行ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが row
(デフォルトの格納モード)に設定されている場合、通知されたデータは属性ごとに格納され、それぞれのインサートを構成します。各インサートには次のフィールドがあります :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)fiwareServicePath
: 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプattrName
: 通知された属性名attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMd
: これには、Json の属性のメタデータ配列の文字列のシリアル化が含まれます。属性にメタデータがない場合、空の配列[]
が挿入されます
列ライクなストア¶
上記のコレクションに格納されている特定のデータに関して、attr_persistence
パラメータが column
に設定されている場合、通知されたエンティティ全体に対して、次のフィールドを含むシングル・ラインが作成されます :
recvTime
: 人間が判読可能な形式の UTC タイムスタンプ。ISO 8601 に似ていますが、すべての MySQL タイムスタンプは UTC 形式になっているはずなので、UTC を示すZ
文字は使用しないでくださいfiwareServicePath
通知された パス、または デフォルトのパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプ。- 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
- 通知された属性ごとに、属性名と
_md
の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます
例¶
NGSIEvent
¶
以下の NGSIEvent
が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
データベース、スキーマ、およびテーブル名¶
PostgreSQL のデータベース名は、ユーザが選択します。
PostgreSQL のスキーマは常に vehicles
です。
PostgreSQL のテーブル名は、古いエンコーディングを使用すると、設定されたデータモデルに応じて、次のものになります :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
N/A | car1_car |
/4wheels |
4wheels |
4wheels_car1_car |
新しいエンコーディングを使用すると :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f | x002fxffffcar1xffffcar |
/4wheels |
x002f4wheels |
x002f4wheelsxffffcar1xffffcar |
行ライクなストア¶
設定パラメータとして、attr_persistence=row
を想定すると、NGSIPostgreSQLSink
は、本体内のデータは次のように保持します :
$ psql -U myuser
psql (9.5.0)
Type "help" for help.
postgres-# \c my-database
my-database# \dn
List of schemas
Name | Owner
----------+----------
vehicles | postgres
public | postgres
(2 rows)
my-database=# \dt vehicles.*
List of relations
Schema | Name | Type | Owner
----------+-------------------+-------+----------
vehicles | 4wheels_car1_car | table | postgres
(1 row)
postgresql> select * from vehicles.4wheels_car1_car;
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| recvTimeTs | recvTime | fiwareServicePath | entityId | entityType | attrName | attrType | attrValue | attrMd |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels | car1 | car | speed | float | 112.9 | [] |
| 1429535775 | 2015-04-20T12:13:22.41.124 | 4wheels | car1 | car | oil_level | float | 74.6 | [] |
+------------+----------------------------+-------------------+----------+------------+-------------+-----------+-----------+--------+
2 row in set (0.00 sec)
列ライクなストア¶
近日公開
管理ガイド¶
構成¶
NGSIPostgreSQLSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink である必要があります |
channel | yes | N/A | |
enable_encoding | no | false | true または false。true は新しいエンコーディングを適用し、false は古いエンコーディングを適用します |
enable_grouping | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_name_mappings | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true または false |
data_model | no | dm-by-entity | dm-by-service-path、dm-by-entity。dm-by-service は現在サポートされていません |
postgresql_host | no | localhost | PostgreSQL サーバが実行される FQDN/IP アドレス |
postgresql_port | no | 5432 | |
postgresql_database | no | postgres | postgres インストール時に自動的に作成されるデフォルトのデータベース |
postgresql_username | no | postgres | postgres インストール時に自動的に作成されるデフォルトのユーザ名 |
postgresql_password | no | N/A | デフォルトでは空の値。インストール時にパスワードは作成されません |
attr_persistence | no | row | row または column |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
backend.enable_cache | no | false | true または false。true はキャッシュの作成を有効にし、false はキャッシュの作成を無効にします |
設定例は次のとおりです :
cygnus-ngsi.sinks = postgresql-sink
cygnus-ngsi.channels = postgresql-channel
...
cygnus-ngsi.sinks.postgresql-sink.type = com.telefonica.iot.cygnus.sinks.NGSIPostgreSQLSink
cygnus-ngsi.sinks.postgresql-sink.channel = postgresql-channel
cygnus-ngsi.sinks.postgresql-sink.enable_encoding = false
cygnus-ngsi.sinks.postgresql-sink.enable_grouping = false
cygnus-ngsi.sinks.postgresql-sink.enable_lowercase = false
cygnus-ngsi.sinks.postgresql-sink.enable_name_mappings = false
cygnus-ngsi.sinks.postgresql-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.postgresql-sink.postgresql_host = 192.168.80.34
cygnus-ngsi.sinks.postgresql-sink.postgresql_port = 5432
cygnus-ngsi.sinks.postgresql-sink.postgresql_database = mydatabase
cygnus-ngsi.sinks.postgresql-sink.postgresql_username = myuser
cygnus-ngsi.sinks.postgresql-sink.postgresql_password = mypassword
cygnus-ngsi.sinks.postgresql-sink.attr_persistence = row
cygnus-ngsi.sinks.postgresql-sink.batch_size = 100
cygnus-ngsi.sinks.postgresql-sink.batch_timeout = 30
cygnus-ngsi.sinks.postgresql-sink.batch_ttl = 10
cygnus-ngsi.sinks.postgresql-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.postgresql.backend.enable_cache = false
ユースケース¶
複数のテナントを持つ大きなデータベースを探している場合は、NGSIPostgreSQLSink
を使用します。PostgreSQL はいくつかのデータベースを持っているのが悪い点ですが、異なるスキーマを持つことは非常にうまくいきます。
重要なメモ¶
テーブルタイプとそのグループ化ルールとの関係¶
ご覧のように、テーブルタイプ構成パラメータは、データの直接集計のためのメソッドです。 : デフォルトの宛先によって、同じエンティティに関するすべての通知は、同じ PostgreSQL テーブル内に格納されます。または、デフォルトの service-path によって、同じ service-path に関するすべての通知は、同じ PostgreSQL テーブルに格納されます。
グループ化機能は別の集約メカニズムですが、間接的な機能です。つまり、グループ化機能は実際にはデータを単一のテーブルに集約するわけではありません。これは、設定されたテーブルタイプ (上記参照) に基づいてシンクが実行され、デフォルトの宛先またはサービスパスが変更され、 テーブルの種類に依存しません。
たとえば、選択されたテーブルタイプが宛先であり、グループ化機能が有効になっていない場合、car
タイプの car1
とcar2
の2つの異なるエンティティ・データが、デフォルトの宛先に従って、2つの異なる MySQL テーブルに永続化されます。すなわち、それぞれ car1_car
および car2_car
です。ただし、"car
タイプのすべての車は、cars
という変更された宛先がある" というグループルールが有効になっていると、両方のエンティティデータは、cars
という単一のテーブルに永続化されます。 この例では、直接集計は宛先別にテーブルタイプによって決定されますが、間接的にグループ化ルールによって集約も決定しています。
永続化モード¶
同じ数の属性が通知されるとは限りません。これは、NGSI のような送信者へのサブスクリプションに依存します。 通知された属性ごとに固定の8フィールドの行(fixed 8-fields rows)がアップデートされるため、row
の永続化モードでは問題ありません。 それにもかかわらず、column
モードはフィールドの長さが異なる複数行の影響を受ける可能性があります。 したがって、column
モードは、サブスクリプションが最後の通知以降に更新されなかった場合、常に同じ属性、イベントを送信するように設計されている場合にのみ推奨されます。
さらに、column
モードで実行している場合、通知される属性の数(したがってデータストア内に書き込まれるフィールドの数)が Cygnus によって認識されないため、Datastore を自動的に作成することはできず、Cygnusの 実行前にプロビジョニングする必要があります。これは、通知される属性の数に関係なく、書き込まれるフィールドの数が常に一定であるため、row
モードの場合ではありません。
バッチ処理¶
プログラマ・ガイドで説明したように、NGSIPostgreSQLSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ PostgreSQL テーブルに保持されます。イベントを1つずつ処理する場合は、PostgreSQL に100回のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 PostgreSQL テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の PostgreSQL テーブル)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSIPostgreSQLSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
タイムゾーン情報¶
PostgreSQL がタイムゾーン情報を環境変数として保存するため、タイムゾーン情報は、PostgreSQL タイムスタンプに追加されません。PostgreSQL のタイムスタンプは UTC 時間で保存されます。
エンコーディング¶
バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:
- 英数字以外の文字はすべてアンダースコア (
_
) で置き換えられます - アンダースコア (
_
) は連結文字としても使用されています - FIWARE service paths内のスラッシュ (
/
) は無視されます
バージョン1.3.0(含む)から、Cygnus は MySQL データ構造に合わせたこの特定のエンコーディングを適用します :
- 小文字の英数字はエンコードされません
- 大文字の英数字がエンコードされます
- 数字はエンコードされません
- アンダースコア文字 (
_
) はエンコードされません。 - 等号文字 (
=
)は、xffff
としてエンコードされます - FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding
パラメータを使用してエンコーディングタイプを切り替えることは可能です。
プログラマ・ガイド¶
NGSIPostgreSQLSink
クラス¶
他の NGSI ライクのシンクと同様に、NGSIPostgreSQLSink
、ベース NGSISink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent
オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される PostgreSQL テーブル を指定します。したがって、PostgreSQLBackend
実装のおかげで、各宛先は、永続化される宛先データ文字列を構成するために反復されます。
public void start();
PostgreSQLBackend
の実装が作成されます。起動するシーケンスは NGSIPostgreSQLSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
認証と認可¶
NGSIPostgreSQLSink
の現在の実装は、PostgreSQL エンドポイントで作成されたデータベース、ユーザ名、およびパスワード認証情報に依存しています。