NGSICartoDBSink¶
コンテンツ :
機能性¶
com.iot.telefonica.cygnus.sinks.NGSICartoDBSink
、または単なる NGSICartoDBSSink
は、Carto 内で NGSI ライクのコンテキストデータ・イベントを維持するように設計された、cygnus-ngsi シンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI言語 を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に cygnus-ngsi ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を、Cygnus シンクの特定の Carto データ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知されたNGSI イベントは、NGSI データジェネレータまたはそれが永続化される最終的なバックエンドとは無関係に、NGSIEvent
オブジェクト に変換されます。NGSIEvent
は、各コンテキスト要素に対して作成されます。このようなイベントは、特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
Carto データ構造への NGSIEvent
のマッピング¶
Carto は PostgreSQL と PostGIS 拡張に基づいています。データベース(組織ごとに1つ)、スキーマ(組織内のユーザに1つ)、および表(スキーマには1つ以上の表があることがあります)のデータを編成します。そのような構成は、NGSIEvent
が永続化されるたびに NGSICartoDBSink
によって活用されます。
PostgreSQL データベースとスキーマの命名規則¶
PostgreSQL のデータベースとスキーマは、それぞれ組織とユーザ名の要求時に Carto によって作成されています。したがって、これらの要素の命名規則を定義するのは Carto の責任です。具体的には :
- 組織には小文字のみを入力する必要があります
- ユーザ名には、小文字、数字、およびダッシュ記号(
-
)のみを含める必要があります
ここでは、通知された/デフォルトの FIWARE service が PostgreSQL のスキーマ/ユーザ名をマップし、マルチテナンシーとデータ分離性を保証していると仮定しています。このマルチテナンシーのアプローチは、FIWARE service/Carto ユーザ名とAPIキの間のマッピングを保持する構成ファイルの使用によって補完されます。構成セクションを確認してください。
PostgreSQL のテーブル命名規則¶
これらのテーブルの名前は、設定されたデータモデルと分析モードによって異なります。詳細については、構成セクションを参照してください :
- サービスパスによるデータモデル(
data_model=dm-by-service-path
)。データモデル名が示すように、通知されたFIWARE service path (またはNGSIRestHandler
で デフォルトで設定されたもの)がテーブルの名前として使用されます。これにより、同じサービスパスに属するすべての NGSI エンティティに関するデータがこの一意のテーブルにストアされます - 実体によるデータモデル(
data_model=dm-by-entity
)。各エンティティについて、通知された/デフォルトのFIWARE service path は、通知されたエンティティ ID とタイプに連結され、テーブル名を構成します。IWARE service path がルートディレクトリ(/
)である場合、エンティティ ID とタイプのみが連結されます
上記は、有効化された分析モード(enable_raw
、enable_distance
および enable_raw_snapshot
)とは無関係に適用されます。それにもかかわらず :
- 距離分析モードでは、テーブル名に、サフィックス
xffffdistance
が追加されます - raw スナップショット解析モードでは、テーブル名に、サフィックス
xffffrawsnapshot
が追加されます
PostgreSQL に基づいているので、英数字とアンダースコア(_
)のみが受け入れられると言わなければなりません。これにより、特定のエンコーディングが適用されます。
PostgreSQLのテーブル名の長さは64文字に制限されています。
次の表は、テーブル名の構成をまとめたものです :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f |
x002fxffff<entityId>xffff<entityType>[xffffdistance] |
/<svcPath> |
x002fxffff<svcPath>[xffffdistance|xffffrawsnapshot] |
x002fxffff<svcPath>xffff<entityId>xffff<entityType>[xffffdistance] |
エンティティ ID とタイプの連結が、NGSIEvent
内の notified_entities
/grouped_entitie
ヘッダー値 (グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください) で既に指定されていることを確認してください。
Raw ベースのストア¶
テーブル内にストアされている特定のデータに関して、enable_raw
パラメータが true
(デフォルトのストアモード)に設定されている場合、通知されたデータは、処理または変更なしでそのままストアされます。これはジオロケーションデータをストアする最も簡単な方法です。
通知されたエンティティごとに、以下のフィールドを挿入した単一のインサートが作成されます :
recvTime
: 人間が判読可能な形式のUTCタイムスタンプ (ISO 8601)fiwareServicePath
: 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパスentityId
: 通知されたエンティティ識別子entityType
: 通知されたエンティティタイプthe_geom
: 現在のジオロケーションポイント。つまり経度と緯度が含まれています。PostGIS のジオメトリタイプでなければなりませんが、ポイントを含む必要があります。例えば、ST_Point()
を使って作成することができます。PostGIS ジオメトリを構成するために使用されるデータは、特別な通知属性から取得されます- いずれかの
geo:point
タイプ (ポイント) - いずれかの
geo:json
タイプ (ポイントを表す GeoJson) - タイプ
string
のlocation
メタデータとWGS84
値を関連付けるかどうか。このオプションは推奨されていません
- いずれかの
- それぞれの非ジオロケーション属性の場合、インサートには2つの追加フィールドがあり、1つは値のためのフィールド '
'、もう1つはメタデータのためのフィールド <attrName>_md
です
Cygnus は、Raw ベースのストアで Carto テーブルを作成しません。その理由は、Cygnus は、エンティティが持つ属性の完全なセットを通知から推論することができません。、つまり列を推論することができないからです。したがって、事前に、テーブルをプリ・プロビジョニングしておく必要があります。付録1で Carto の特定のクエリを確認してください。
距離に基づくストア¶
enable_distance
パラメータが true
に設定されている場合(デフォルトでは、この種の保存は実行されません)、通知されたデータは距離分析に基づいて処理されます。前述のように、エンティティの以前のジオロケーションに対する線形距離および経過時間が取得され、この情報は、距離の合計量、時間の総量および他の多くの集計を更新するために使用されます。速度は、距離を時間で除算した結果と同様に得られ、このような速度計算は、特定の集約を更新するためにも使用されます。
最終的な目標は、クエリ時に計算を実行することなく、エンティティのジオロケーションの関数として距離ベースの測定値のセットを事前に計算し、"このエンティティがこのポイントに到達するまでに要した総時間" または "この点を通過するときのこのエンティティの平均速度" などをクエリすることができます。
通知されたエンティティごとに1つのインサートが作成され、次のフィールドを挿入します :
recvTime
: 人間が判読可能な形式のUTCタイムスタンプ (ISO 8601)fiwareServicePath
: 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパスentityId
: 通知されたエンティティ識別子entityType
: 通知されたエンティティタイプthe_geom
: 現在のジオロケーションポイント。つまり経度と緯度が含まれています。PostGIS のジオメトリタイプでなければなりませんが、ポイントを含む必要があります。例えば、ST_Point()
を使って作成することができます。PostGIS ジオメトリを構成するために使用されるデータは、特別な通知属性から取得されます- いずれかの
geo:point
タイプ (ポイント) - いずれかの
geo:json
タイプ (ポイントを表す GeoJson) - タイプ
string
のlocation
メタデータとWGS84
値を関連付けるかどうか。このオプションは推奨されていません
- いずれかの
stageDistance
: 現在のジオポイントと以前のジオポイントの直線距離stageTime
: 前のジオポイントから現在のジオポイントに移動するときの経過時間stageSpeed
: ステージ距離をステージ時間で割った結果sumDistance
: ステージ距離の合計。この値をステージ数で割ると、平均ステージ距離になりますsumTime
: 経過時間の合計。この値をステージ数で割ると、平均ステージ時間になりますsumSpeed
: ステージスピードの合計。この値をステージ数で割ると、平均ステージ速度になりますsumDistance2
: ステージ距離の平方根の和。この値をステージ数で除算すると、ステージ距離の分散が生じますsumTime2
: ステージ時間の平方根の和。この値をステージ数で除算すると、ステージ時間の分散が生じますsumSpeed2
: ステージ速度の平方根の和。この値をステージ数で除算すると、ステージ速度の分散が生じますmaxDistance
: 最大ステージ距離minDistance
: 最小ステージ距離maxTime
: 最大ステージ時間minTime
: 最低ステージ時間maxSpeed
: 最大ステージ速度minSpeed
: 最小ステージ速度numStages
: ステージ数
Raw ベースのストアとは異なり、Cygnus は距離ベースのストアで使用されるテーブルを単独で作成することができます。理由は、テーブルの列が事前によく知られているからです。
Raw スナップショット・ベースのストア¶
この分析モードは、次の場合を除き、raw ベースのストアモードと同じです :
- エンティティごとのテーブルはありませんが、FIWARE service path ごとのテーブルはあります。これらの意味では、この分析モードは常に
dm-by-service-path
に設定されたdata_model
パラメータを使用して作業すると見なすことができます - 通知されたデータは新しいレコードとしてテーブルに追加されませんが、すでに存在するレコードを更新するために使用されます。もちろん、FIWARE service path、エンティティID、エンティティタイプの前のレコードがない場合、レコードはテーブルに追加されます
例¶
NGSIEvent
¶
通知された NGSI
コンテキストデータから、以下の NGSIEvent
が作成されたと仮定します。以下のコードは、実際のデータフォーマットではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
},
{
attrName=location,
attrType=geo:point,
attrValue="41.102, -3.008"
}
]
}
}
テーブル名¶
PostgreSQL のテーブル名は、設定されたデータモデルと解析モードに応じて次のようになります :
FIWARE service path | dm-by-service-path |
dm-by-entity |
---|---|---|
/ |
x002f |
x002fcar1xffffcar[xffffdistance] |
/4wheels |
x002f4wheels[xffffdistance] |
x002f4wheelsxffffcar1xffffcar[xffffdistance] |
Raw ベースのストア¶
テーブル名 x002f4wheelsxffffcar1xffffcar
(エンティティ別データモデル、非ルートサービスパス、Raw 履歴分析モードのみ) を想定しましょう。このテーブルにストアされるデータは次のとおりです :
curl "https://myusername.cartodb.com/api/v2/sql?q=select * from x002f4wheelsxffffcar1xffffcar&api_key=abcdef0123456789"
{
"rows": [
{
"cartodb_id": 1,
"the_geom": "0101000020E61000007B3D8FFE54EF0EC04E0BE7FD50B94540",
"the_geom_webmercator": "0101000020110F00002DF05823E4451AC101051DDC46865441",
"oil_level_md": "[]",
"oil_level": "74.6",
"speed_md": "[]",
"speed": "112.9",
"entitytype": "car",
"entityid": "car1",
"fiwareservicepath": "/4wheels",
"recvtime": "2016-04-21T10:34:23.423Z"
}
],
"time": 0.001,
"fields": {
"cartodb_id": {
"type": "number"
},
"the_geom": {
"type": "geometry"
},
"the_geom_webmercator": {
"type": "geometry"
},
"oil_level_md": {
"type": "string"
},
"oil_level": {
"type": "string"
},
"speed_md": {
"type": "string"
},
"speed": {
"type": "string"
},
"entitytype": {
"type": "string"
},
"entityid": {
"type": "string"
},
"fiwareservicepath": {
"type": "string"
},
"recvtime": {
"type": "string"
}
},
"total_rows": 1
}
距離に基づく履歴ベースのストア¶
前回のインサート(逆にこれが最初のインサートで、集計されたほとんどの値は0に設定されます)で、x002f4wheelsxffffcar1xffffcarxffffdistance
テーブル名(エンティティ別データモデル、ルート以外のサービスパス、距離履歴の解析モードのみ)を想定しましょう。このテーブルにストアされるデータは次のとおりです :
curl "https://myusername.cartodb.com/api/v2/sql?q=select * from x002f4wheelsxffffcar1xffffcarxffffdistance&api_key=abcdef0123456789"
{
"rows": [
{
"cartodb_id": 1,
"the_geom": "0101000020E61000001886EA3B22530EC0CADCE89377BD4540",
"the_geom_webmercator": "0101000020110F0000D1F6B55E3BC119C1EBDCC03D228B5441",
"numsamples": 1,
"minspeed": Infinity,
"mintime": Infinity,
"mindistance": Infinity,
"maxspeed": -Infinity,
"maxtime": -Infinity,
"maxdistance": -Infinity,
"sumspeed2": 0,
"sumtime2": 0,
"sumdistance2": 0,
"sumspeed": 0,
"sumtime": 0,
"sumdistance": 0,
"stagespeed": 0,
"stagetime": 0,
"stagedistance": 0,
"entitytype": "car",
"entityid": "car1",
"fiwareservicepath": "/4wheels",
"recvtime": "2016-04-21T10:34:23.423Z"
},
{
"cartodb_id": 2,
"the_geom": "0101000020E61000001886EA3B22530EC0CADCE89377BD4540",
"the_geom_webmercator": "0101000020110F0000D1F6B55E3BC119C1EBDCC03D228B5441",
"numsamples": 2,
"minspeed": 0.00503757972391,
"mintime": 15,
"mindistance": 0.0755636958586634,
"maxspeed": 0.00503757972391,
"maxtime": 15,
"maxdistance": 0.0755636958586634,
"sumspeed2": 0.00002537720947,
"sumtime2": 225,
"sumdistance2": 0.00570987213182,
"sumspeed": 0.00503757972391,
"sumtime": 15,
"sumdistance": 0.0755636958586634,
"stagespeed": 0.00503757972391,
"stagetime": 15,
"stagedistance": 0.0755636958586634,
"entitytype": "car",
"entityid": "car1",
"fiwareservicepath": "/4wheels",
"recvtime": "2016-04-21T10:34:23.448Z"
}
],
"time": 0.001,
"fields": {
"cartodb_id": {
"type": "number"
},
"the_geom": {
"type": "geometry"
},
"the_geom_webmercator": {
"type": "geometry"
},
"numsamples": {
"type": "number"
},
"minspeed": {
"type": "number"
},
"mintime": {
"type": "number"
},
"mindistance": {
"type": "number"
},
"maxspeed": {
"type": "number"
},
"maxtime": {
"type": "number"
},
"maxdistance": {
"type": "number"
},
"sumspeed2": {
"type": "number"
},
"sumtime2": {
"type": "number"
},
"sumdistance2": {
"type": "number"
},
"sumspeed": {
"type": "number"
},
"sumtime": {
"type": "number"
},
"sumdistance": {
"type": "number"
},
"stagespeed": {
"type": "number"
},
"stagetime": {
"type": "number"
},
"stagedistance": {
"type": "number"
},
"entitytype": {
"type": "string"
},
"entityid": {
"type": "string"
},
"fiwareservicepath": {
"type": "string"
},
"recvtime": {
"type": "string"
}
},
"total_rows": 2
}
Raw スナップショットベースのストア¶
すべてが Raw 履歴ベースのストアに等しいですが :
- テーブル名は
x002f4wheelsxffffcar1xffffcarxffffrawsnapshot
です - 与えられたFIWARE service path、エンティティ ID および、エンティティタイプがテーブルに存在しない場合、データは挿入されます。それ以外の場合は更新に使用されます
管理ガイド¶
構成¶
NGSICartoDBSink
は、次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSICartoDBSink である必要があります |
channel | yes | N/A | |
enable_grouping | no | false | true or false. 詳細については、このリンクをチェックしてください |
enable_name_mappings | no | false | true or false. 詳細については、このリンクをチェックしてください |
enable_lowercase | no | false | true or false. |
data_model | no | dm-by-entity | dm-by-service-path or dm-by-entity. |
keys_conf_file | yes | N/A | FIWARE service/Carto ユーザ名、エンドポイント、API キー、およびアカウントタイプ間のマッピングを含む CartoDB ファイルへの絶対パス |
swap_coordinates | no | false | true or false. true の場合は、緯度と経度の値が交換されます |
flip_coordinates | no | false | true or false. true の場合は、緯度と経度の値が交換されます。swap_coordinates の代わりに、リリース1.6.0から非推奨になりました。 |
enable_raw_historic | no | true | true or false. true の場合は、Raw 履歴ベースのストレージが行われます |
enable_raw | no | true | true or false. true の場合は、Raw 履歴ベースのストレージが行われます。enable_raw_historic の代わりに、リース1.8.0から非推奨になりました。 |
enable_distance_historic | no | false | true or false. true の場合は、距離ベースのストレージが行われます |
enable_distance | no | false | true or false. true の場合は、距離ベースのストレージが行われます。enable_distance_historic の代わりに、リース1.8.0から非推奨になりました |
enable_raw_snapshot | no | false | true or false. true の場合は、Raw スナップショット・ベースのストレージが行われます |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
backend.max_conns | no | 500 | Http ベースの HDFS バックエンドに許可される最大接続数 |
backend.max_conns_per_route | no | 100 | Http ベースの HDFS バックエンドに許可されているルートごとの最大接続数 |
構成例は次のとおりです :
cygnus-ngsi.sinks = cartodb-sink
cygnus-ngsi.channels = cartodb-channel
...
cygnus-ngsi.sinks.cartodb-sink.channel = cartodb-channel
cygnus-ngsi.sinks.cartodb-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.cartodb-sink.enable_grouping = false
cygnus-ngsi.sinks.cartodb-sink.enable_name_mappings = false
cygnus-ngsi.sinks.cartodb-sink.enable_lowercase = false
cygnus-ngsi.sinks.cartodb-sink.keys_conf_file = /usr/cygnus/conf/cartodb_keys.conf
cygnus-ngsi.sinks.cartodb-sink.swap_coordinates = true
cygnus-ngsi.sinks.cartodb-sink.enable_raw_historic = true
cygnus-ngsi.sinks.cartodb-sink.enable_distance_historic = false
cygnus-ngsi.sinks.cartodb-sink.enable_raw_snapshot = false
cygnus-ngsi.sinks.cartodb-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.cartodb-sink.batch_size = 10
cygnus-ngsi.sinks.cartodb-sink.batch_timeout = 5
cygnus-ngsi.sinks.cartodb-sink.batch_ttl = 0
cygnus-ngsi.sinks.cartodb-sink.batch_retries_intervals = 5000
cygnus-ngsi.sinks.cartodb-sink.backend.max_conns = 500
cygnus-ngsi.sinks.cartodb-sink.backend.max_conns_per_route = 100
Carto キーの設定ファイルの例があります。これは Cygnus で配布されている設定テンプレートから生成できます :
$ cat /usr/cygnus/conf/cartodb_keys.conf
{
"cartodb_keys": [
{
"username": "user1",
"endpoint": "https://user1.cartodb.com",
"key": "1234567890abcdef",
"type": "personal"
},
{
"username": "user2",
"endpoint": "https://user2.cartodb.com",
"key": "abcdef1234567890",
"type": "enterprise"
}
]
}
ユースケース¶
Raw ベースのストアは、単に特定の時刻またはジオロケーションでのエンティティの属性値であったものを保存したいというユースケースに対して処理されます。もちろん、計算時間の遅延が問題にならない場合は、より複雑な分析が可能です。クエリ時にデータを処理する必要があります。
上記のことは、特定の時刻または地理的位置に関する事前計算された集約を提供する距離ベースの記憶によって回避されます。これらの集計を事前に計算すると、クエリの応答時間が大幅に向上します。これは、次のようなクエリに適しています :
- このエンティティがこの時点に到着した総時間はどれですか?
- この点を通過するときのこのエンティティの平均速度はどれですか?
- この時点で最高速度のエンティティはどれですか?
- エンティティが移動した最大のステージはどれですか?
- など
最後に、Raw スナップショットのストアは、履歴を気にすることなく、時間の経過とともにエンティティをジオロケーションに簡単に配置します :
重要なメモ¶
NGSICartoDBSink
および非ジオ・ロケートのエンティティ¶
このシンクによって処理されることを目的とするエンティティは、geo:point
タイプの属性として、string
タイプと WGS84
値の location
メタデータを保持する属性のいずれかとして、geolocated
属性を持つことが必須です。
マルチテナンシー・サポート¶
単一の許可されたユーザが他のすべてのユーザ(データのみを読み取ることができるユーザ)に代わってユーザ空間を作成し、データを書き込むことができる他のNGSIシンクとは異なり、このシンクは、事前に作成されたユーザ空間およびユーザ空間の書き込み資格を必要とします。その理由は、Carto が、ユーザアカウント作成時にデータベースとスキーマを強制するためです。通常は、FIWARE service または FIWARE テナント ID に関連しています。また、Cygnus が作成する必要がある唯一の永続性要素は、すでにプロビジョニングされているデータベースとスキーマ内のテーブルです。推測されるように、これらのデータベースとスキーマにアクセスするには、特定のユーザ資格情報が必要です。
カスタム認証情報は、keys_conf_file
設定パラメータを介して Carto シンクが指す特別なファイルに追加する必要があります。特に興味のあるのは、personal
または enterprise
のアカウントタイプです。 このような区別は、API へのクエリが互いに異なるため重要です。
バッチ処理¶
プログラマ・ガイドで説明したように、NGSICartoDBSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ Carto テーブルに保持されます。イベントを1つずつ処理する場合は、Carto に100個のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 Carto テーブルごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の Carto デスティネーション)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSICartoDBSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、予想されるサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
最後に、現在のバッチ処理は Raw ライクのストアでのみ機能すると言わざるを得ません。以前のイベントに基づくデータ集約を含む距離的なストアのため、シンクは現在、この種の計算を実装するための最後のイベントによるクエリの PostgreSQL 機能に依存しています。もちろん、パフォーマンスのために、これは一時的にメモリー内の前のイベントを指すことによってシンク内で行われることが期待されます。これはデータベースにアクセスするよりも常に高速です。
エンコーディング¶
Cygnus は、Carto のデータ構造に合わせたこの特定のエンコーディングを適用します :
- 小文字の英数字はエンコードされません
- 大文字の英数字がエンコードされます
- 数字はエンコードされません
- アンダースコア文字 (
_
) はエンコードされません - 等号文字 (
=
) は、xffff
としてエンコードされます - FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
表の自動作成¶
これは既にコメントされていますが、Cygnus は Raw ベースのスナップショット・ベースのモードと Raw スナップショットベースのモードで必要なテーブルを自動的に作成しません。これは、エンティティに関する最初の通知がそのようなエンティティの属性の完全なリストを含むことができないためです。すなわち、更新された属性のみが通知される可能性があります。
逆に、距離ベースモードでは、表の列の数と意味が常に同じであり、エンティティの属性とは独立しているため、表が自動的に作成されます。
必要に応じて、付録1は他の興味深い操作の中で Carto のためにテーブルをプロビジョニングする方法を示します。
サポートされている Orion
のジオメトリ¶
NGSICartoDBSink
の現在のバージョンでは、次の NGSIv2 ジオメトリをサポートしています :
geo:point
: この場合、geolocated 属性は単一の点についてですgeo:json
: GeoJson は単純な点から複雑なポリゴンまでの任意のジオメトリを記述できますが、単一の点を表す必要がありますlocation
メタデータ : この場合、geolocated 属性は単一の点についてです
NGSIv2仕様("エンティティの地理空間プロパティ"のセクション)で詳細を得ることができます。
プログラマ・ガイド¶
NGSICartoDBSSink
クラス¶
他の NGSI ライクのシンクと同様に、NGSICartoDBSink
は、ベース NGSISink
を拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(Batch batch) throws Exception;
Batch
には、通知されたコンテキストデータイベントを解析した結果である一連の NGSIEvent
オブジェクトが含まれます。バッチ内のデータは宛先別に分類され、最後に宛先はデータが永続化される CartoDB テーブルを指定します。したがって、各宛先は、CartoDBBackend
実装のおかげで永続化される宛先データ文字列を構成するために反復します。
public void start();
CartoDBBackend
の実装が作成されます。起動するシーケンスは NGSICartoDBSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
認証と認可¶
認証は、ユーザ名に関連する API キーを使用して行われます。一度認証されると、クライアントは、組織内(PostgreSQL データベース)のユーザ空間(PostgreSQL スキーマ)内の PostgreSQL テーブルの作成、読み取り、更新、および削除のみが許可されます。
付録¶
付録1 : Carto でのテーブルのプロビジョニング¶
その後、Carto でテーブルをプロビジョニングするために必要なクエリを見つけることができます。まず、テーブルを作成します :
$ curl -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<api_key>" --data-urlencode "q=CREATE TABLE <table_name> (recvTime text, fiwareServicePath text, entityId text, entityType text, <attr_1> <type_1>, <attr_1>_md text, ..., <attr_n> <type_n>, <attr_n>_md text, the_geom geometry(POINT,4326))"
Carto の Web ベースのダッシュボードに表示したい場合は、Carto のすべてのテーブルを煩雑にする必要があります :
$ curl -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<api_key>" --data-urlencode "q=SELECT CDB_CartodbfyTable('<my_user_or_schema>', '<table_name>')"
さて、いくつかのデータを挿入することができるはずです。ちょうどテスト目的のために、Cygnus はこの部分を担当するでしょう :
$ curl -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<api_key>" --data-urlencode "q=INSERT INTO <table_name> (recvTime, fiwareServicePath, entityId, entityType, <attr_1>, <attr_1>_md, ..., <attr_n>, <attr_n>_md, the_geom) VALUES ('2016-04-19T07:09:53.116Z', '<service_path>', '<entity_id>', '<entity_type>', '<attr_1_value>', '<attr_1_metadata>', ..., '<attr_n_value>', '<attr_n_metadata>', 'ST_SetSRID(ST_MakePoint(<lat>, <lon>), 4326))"
次のようにデータをクエリすることができます :
$ curl -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<api_key>" --data-urlencode "q=SELECT * FROM <table_name>"
完全にするために、テーブルを削除する方法を見てみましょう :
$ curl -G "https://<my_user>.cartodb.com/api/v2/sql?api_key=<api_key>" --data-urlencode "q=DROP TABLE <table_name>"