NGSICKANSink¶
コンテンツ :
機能性¶
com.iot.telefonica.cygnus.sinks.NGSICKANSink
、または単なる NGSICKANSink
は、CKAN サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI 言語 を話す他のシステムである可能性があります。
データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent
に変換されます。最終的には、これらのイベント内の情報を特定の CKAN データ構造にマップする必要があります。
次のセクションでこれについて詳しく説明します。
NGSIEvent
オブジェクトへの NGSI イベントのマッピング¶
コンテキストデータを含む、通知された NGSI イベントは、NGSI データジェネレータまたは英ぞかされる最終的なバックエンドとは無関係に、NGSIEvent
オブジェクトに変換されます。NGSIEvent
は、各コンテキスト要素に対して作成され、このようなイベントは特定のヘッダと ContextElement
オブジェクトを組み合わせたものです。
これは、NGSIRestHandler
のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent
オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。
CKAN データ構造への NGSIEvent
s のマッピング¶
CKAN は、パッケージまたはデータセットを含む組織内のデータを編成します。これらのパッケージ/データセットには、最終的に PostgreSQL データベース (CKAN Datastore) またはプレーン・ファイル (CKAN Filestore) にデータがストアされるいくつかのリソースが含まれています。そのような組織は、NGSIEvent
が永続化されるたびにNGSICKANSink
によって利用されます。
組織の命名規則¶
通知された fiware-service
ヘッダ値、または、そのようなヘッダがない場合、FIWARE service の既定値を名前とした組織が作成されます(存在しない場合)。
PostgreSQLをベースにしている文字のみのは受け入れられるので、英数字とアンダースコア(_
)のみが受け入れられます。ハイフン(-
)も使用できます。 これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
それにもかかわらず、PostgreSQL とは異なり、組織の長さは最大100文字、最小2文字です。
パッケージ/データセット命名規則¶
通知された fiware-service
と fiware-servicePath
のヘッダ値またはそのようなヘッダーがない場合、fiware-service とfiware-servicePathの既定値が連結されたパッケージ/データセットが上記の組織で作成されます(まだ存在しない場合)。
PostgreSQLをベースにしている文字のみのは受け入れられるので、英数字とアンダースコア(_
)のみが受け入れられます。ハイフン(-
)も使用できます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
それにもかかわらず、PostgreSQL とは異なり、データセットの長さは最大100文字、最小2文字です。
リソース命名規則¶
CKAN リソースは、単一のデータモデル(詳細については構成セクションを参照してください)に従います。すなわちエンティティごとです。したがって、リソース名は常にエンティティ ID とタイプの連結です。このような名前は、 NGSIEvent
内のnotified_entities
/grouped_entities
ヘッダ値(グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)ですでに指定されています。
CKAN データストアおよびビューアも作成され、上記のリソースに関連付けられていることに注意する必要があります。このデータストアは、最終的に PostgreSQL のテーブルであり、永続化されたデータを保持します。
PostgreSQL に基づいているので、英数字とアンダースコア(_
)のみが受け入れられます。ハイフン(-
)も使用できます。これにより、enable_encoding
構成パラメーターに応じて、特定のエンコードが適用されます。
リソース名には実質的な制限はありませんが、組織名とパッケージ名の処理に合わせて、Cygnus は、最小2文字から最大100文字までの長さを制限します。
行ライクのストア (Row-like storing)¶
リソースに関連付けられたデータストア内にストアされている特定のデータに関して、attr_persistence
パラメータが row
(デフォルトのストアモード)に設定されている場合、通知されたデータは属性ごとにストアされ、それぞれのインサートを構成します。各インサートには次のフィールドがあります :
recvTimeTsUTC
: タイムスタンプ(ミリ秒単位)recvTime
: 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)fiwareServicePath
: 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプattrName
: 通知された属性名attrType
: 通知された属性タイプattrValue
: 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性がありますattrMd
: これには、Json の属性のメタデータ配列の文字列のシリアル化が含まれます。属性にメタデータがない場合、空の配列[]
が挿入されます
列ライクのストア (Column-like storing)¶
リソースに関連付けられたデータストア内にストアされている特定のデータに関して、attr_persistence
パラメータが column
に設定されている場合は、通知されたエンティティ全体に対して、次のフィールドを含む単一のラインが作成されます :
recvTime
: 人間が判読可能な形式のUTCタイムスタンプ (ISO 8601)fiwareServicePath
通知されたパス、またはデフォルト・パスentityId
: 通知されたエンティティ IDentityType
: 通知されたエンティティタイプ- 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
- 通知された属性ごとに、属性名と
_md
の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます
例¶
NGSIEvent
¶
以下の NGSIEvent
が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :
ngsi-event={
headers={
content-type=application/json,
timestamp=1429535775,
transactionId=1429535775-308-0000000000,
correlationId=1429535775-308-0000000000,
fiware-service=vehicles,
fiware-servicepath=/4wheels,
<grouping_rules_interceptor_headers>,
<name_mappings_interceptor_headers>
},
body={
entityId=car1,
entityType=car,
attributes=[
{
attrName=speed,
attrType=float,
attrValue=112.9
},
{
attrName=oil_level,
attrType=float,
attrValue=74.6
}
]
}
}
組織名、データセット名、リソース名¶
上記の例と古いエンコーディングを使用すると、これらは作成された CKAN 要素です
- 組織 :
vehicles
- パッケージ :
vehicles_4wheels
- リソース :
car1_car
新しいエンコーディングを使用すると :
- 組織 :
vehicles
- パッケージ :
vehiclesxffffx002f4wheels
- リソース :
car1xffffcar
行ライクのストア (Row-like storing)¶
構成パラメータが attr_persistence=row
の場合、NGSICKANSink
は、本体内のデータは次のように永続化します :
$ curl -s -S -H "Authorization: myapikey" "http://192.168.80.34:80/api/3/action/datastore_search?resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7"
{
"help": "Search a DataStore resource...",
"success": true,
"result": {
"resource_id": "3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7",
"fields": [
{
"type": "int4",
"id": "_id"
},
{
"type": "int4",
"id": "recvTimeTs"
},
{
"type": "timestamp",
"id": "recvTime"
},
{
"type": "text",
"id": "fiwareServicePath"
},
{
"id": "entityId",
"type": "text"
},
{
"id": "entityType",
"type": "text"
},
{
"type": "text",
"id": "attrName"
},
{
"type": "text",
"id": "attrType"
},
{
"type": "json",
"id": "attrValue"
},
{
"type": "json",
"id": "attrMd"
}
],
"records": [
{
"entityId": "car1",
"entityType": "car",
"fiwareServicePath": "4wheels",
"attrType": "float",
"recvTime": "2015-04-20T12:13:22.41.124Z",
"recvTimeTs": 1429535775,
"attrMd": null,
"attrValue": "112.9",
"attrName": "speed",
"_id": 1
},
{
"entityId": "car1",
"entityType": "car",
"fiwareServicePath": "4wheels",
"attrType": "float",
"recvTime": "2015-04-20T12:13:22.41.124Z",
"recvTimeTs": 1429535775,
"attrMd": null,
"attrValue": "74.6",
"attrName": "oil_level",
"_id": 2
}
],
"_links": {
"start": "/api/3/action/datastore_search?resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7",
"next": "/api/3/action/datastore_search?offset=100&resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7"
},
"total": 2
}
}
列ライクのストア (Column-like storing)¶
attr_persistence=colum
の場合は、NGSICKANSink` は、本体内のデータは次のように永続化します :
$ curl -s -S -H "Authorization: myapikey" "http://130.206.83.8:80/api/3/action/datastore_search?resource_id=611417a4-8196-4faf-83bc-663c173f6986"
{
"help": "Search a DataStore resource...",
"success": true,
"result": {
"resource_id": "611417a4-8196-4faf-83bc-663c173f6986",
"fields": [
{
"type": "int4",
"id": "_id"
},
{
"type": "timestamp",
"id": "recvTime"
},
{
"type": "text",
"fiwareServicePath": "4wheels"
},
{
"type": "text",
"entityId": "car1"
},
{
"type": "text",
"entityType": "car"
},
{
"type": "json",
"id": "speed"
},
{
"type": "json",
"id": "speed_md"
},
{
"type": "json",
"id": "oil_level"
},
{
"type": "json",
"id": "oil_level_md"
}
],
"records": [
{
"recvTime": "2015-04-20T12:13:22.41.124Z",
"fiwareServicePath": "4wheels",
"entityId": "car1",
"entityType": "car",
"speed": "112.9",
"speed_md": null,
"oil_level": "74.6",
"oil_level_md": null,
"_id": 1
}
],
"_links": {
"start": "/api/3/action/datastore_search?resource_id=611417a4-8196-4faf-83bc-663c173f6986",
"next": "/api/3/action/datastore_search?offset=100&resource_id=611417a4-8196-4faf-83bc-663c173f6986"
},
"total": 1
}
}
注 : curl
は、CKANに公開されているような REST APIs との対話を可能にする Unix コマンドです。
管理ガイド¶
構成¶
NGSICKANSink
は、 次のパラメータによって構成されます :
パラメータ | 必須 | デフォルト値 | コメント |
---|---|---|---|
type | yes | N/A | com.telefonica.iot.cygnus.sinks.NGSICKinkink である必要があります |
channel | yes | N/A | |
enable_encoding | no | false | true または false。true は新しいエンコーディングを適用し、false は古いエンコーディングを適用します |
enable_grouping | no | false | true または false。詳細については、このリンクをチェックしてください |
enable_name_mappings | no | false | true または false。詳細については、このリンクをチェックしてください |
data_model | no | dm-by-entity | 構成されていなくても、常に dm-by-entity です |
attr_persistence | no | row | row または colum |
ckan_host | no | localhost | 実行している CKAN サーバの FQDN/IPアドレス |
ckan_port | no | 80 | |
ckan_viewer | no | recline_grid_view | [利用可能な](http://docs.ckan.org/en/latest/maintaining/data-viewer.ht |
ml)ビューアをCKANのドキュメントで確認してください | |||
ssl | no | false | |
api_key | yes | N/A | |
orion_url | no | http://localhost:1026 | filestore の URL として置かれる |
batch_size | no | 1 | 永続化の前に蓄積されたイベントの数 |
batch_timeout | no | 30 | バッチがそのまま永続化される前に構築される秒数 |
batch_ttl | no | 10 | バッチを永続化できない場合のリトライ回数。リトライしない場合は 0 、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください |
batch_retry_intervals | no | 5000 | 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます |
backend.max_conns | no | 500 | Http ベースの HDFS バックエンドに許可される最大接続数 |
backend.max_conns_per_route | no | 100 | Http ベースの HDFS バックエンドに許可されているルートごとの最大接続数 |
persistence_policy.max_records | no | -1 | リソースに上限が設定される前に許可されるレコードの最大数。-1 はこのポリシーを無効にします |
persistence_policy.expiration_time | no | -1 | 有効期限が切れる前にリソース内でレコードが維持される最大秒数。-1 はこのポリシーを無効にします |
persistence_policy.checking_time | no | 3600 | シンクがレコードの有効期限をチェックする頻度 (秒単位) |
構成例は、次のとおりです :
cygnus-ngsi.sinks = ckan-sink
cygnus-ngsi.channels = ckan-channel
...
cygnus-ngsi.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.NGSICKANSink
cygnus-ngsi.sinks.ckan-sink.channel = ckan-channel
cygnus-ngsi.sinks.ckan-sink.enable_encoding = false
cygnus-ngsi.sinks.ckan-sink.enable_grouping = false
cygnus-ngsi.sinks.ckan-sink.enable_name_mappings = false
cygnus-ngsi.sinks.ckan-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.ckan-sink.attr_persistence = column
cygnus-ngsi.sinks.ckan-sink.ckan_host = 192.168.80.34
cygnus-ngsi.sinks.ckan-sink.ckan_port = 80
cygnus-ngsi.sinks.ckan-sink.ckan_viewer = recline_grid_view
cygnus-ngsi.sinks.ckan-sink.ssl = false
cygnus-ngsi.sinks.ckan-sink.api_key = myapikey
cygnus-ngsi.sinks.ckan-sink.orion_url = http://localhost:1026
cygnus-ngsi.sinks.ckan-sink.batch_size = 100
cygnus-ngsi.sinks.ckan-sink.batch_timeout = 30
cygnus-ngsi.sinks.ckan-sink.batch_ttl = 10
cygnus-ngsi.sinks.ckan-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.ckan-sink.backend.max_conns = 500
cygnus-ngsi.sinks.ckan-sink.backend.max_conns_per_route = 100
cygnus-ngsi.sinks.ckan-sink.persistence_policy.max_records = 5
cygnus-ngsi.sinks.ckan-sink.persistence_policy.expiration_time = 86400
cygnus-ngsi.sinks.ckan-sink.persistence_policy.checking_time = 600
ユースケース¶
中期的に大きく伸びていないデータベースストレージを探している場合は、NGSICKANSink
を使用してください。
重要なメモ¶
永続化モード¶
同じ数の属性が通知されるとは限りません。これは、NGSI のような送信者へのサブスクリプションに依存します。 通知された属性ごとに固定の8フィールドの行(fixed 8-fields rows)がアップデートされるため、row
の永続化モードでは問題ありません。 それにもかかわらず、column
モードはフィールドの長さが異なる複数行の影響を受ける可能性があります。 したがって、column
モードは、サブスクリプションが最後の通知以降に更新されなかった場合、常に同じ属性、イベントを送信するように設計されている場合にのみ推奨されます。
さらに、column
モードで実行している場合、通知される属性の数(したがってデータストア内に書き込まれるフィールドの数)が Cygnus によって認識されないため、Datastore を自動的に作成することはできず、Cygnusの 実行前にプロビジョニングする必要があります。これは、通知される属性の数に関係なく、書き込まれるフィールドの数が常に一定であるため、row
モードの場合ではありません。
列モードのリソースをプロビジョニングする方法を知るために、付録を確認してください :
バッチ処理¶
プログラマ・ガイドで説明したように、NGSICKANSink
は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink
を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。
バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent
を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ CKAN リソース に保持されます。イベントを1つずつ処理する場合は、 CKAN に100個のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 CKAN リソースごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の CKAN リソース)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。
バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。
永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。
デフォルトで、NGSICKANSink
は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。
エンコーディング¶
バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:
- 英数字以外の文字はすべてアンダースコア(
_
)で置き換えられました - アンダースコアは連結文字としても使用されていました
- FIWARE service paths内のスラッシュ(
/
)は無視されます
バージョン1.3.0(含む)から、Cygnus は CKAN データ構造に合わせたこの特定のエンコーディングを適用します :
- 小文字の英数字はエンコードされません
- 大文字の英数字がエンコードされます
- 数字はエンコードされません
- アンダースコア文字(
_
)はエンコードされません。 - ハイフン文字(
-
)はエンコードされません。 - 等号文字(
=
)は、xffff
としてエンコードされます。 - FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の
x
文字としてエンコードされます x
文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後にxx
としてエンコードされますxffff
は、連結文字として使用されます
将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding
パラメータを使用してエンコーディングタイプを切り替えることは可能です。
ジオ・ロケーション属性¶
CKAN はいくつかのビューアをサポートしています。その中に recline_map_viewer
があります。これは、ジオロケーション・データをレンダリングできる典型的な 2D マップです。
CKAN のジオロケーションデータは、次の2つの方法で追加できます :
- 経度には DataStore の列を使用し、緯度には DataStore の別の列を使用し、どのリソースフィールド(データストア内の列)にジオロケーション情報が含まれているかをビューアで設定します。フィールド/列の名前が
longitude
とlatitude
に直接指定されている場合は、ビューアによって自動的に選択されます。このオプションは 2D 座標でのみ有効です。 Json
タイプのgeojson
という単一の列を使用します。実際には、文字列geojson
の大文字と小文字の任意の組み合わせが有効です。例えば、GeoJson
、geoJSON
、等。デフォルトでは、このような列はジオロケーションデータのレンダリングに直接使用されます。geojson
という名前が付けられていない場合、GeoJson の値を含むフィールドまたは列を選択することができます。GeoJson は任意のジオメトリ(点、線、ポリゴン)をレンダリングできることに注意してください。
したがって、これらのジオロケーション機能とNGSIエンティティモデルをかなり簡単にマッピングするようです :
- それぞれ座標の経度と緯度の部分を含む
longitude
とlatitude
の属性のペアを追加するだけです - または、有効な GeoJson 値を含む単一の
geojson
属性を追加します
最後に、ジオロケーションの情報をCKANデータ構造にマッピングするこの方法は、実際にはジオロケーション目的の特別なタイプを定義するNGSIモデルには従っていません(geo:point
, geo:line
, geo:box
および geo:json
)。つまり、それが機能していても、geojson
または longitude
と latitude
を使うことは NGSI では標準ではありません。したがって、このシンクの将来のリリースでは、NGSI ジオロケーションタイプがサポートされ、CKANが理解できる上記のジオロケーションのフィールド/列に自動的に変換されます。
リソースの上限/レコードの有効期限¶
キャッピングと有効期限は、デフォルトでは無効になっています。それにもかかわらず、必要に応じてこれを有効にすることができます :
- レコード数で上限を設定する。これにより、構成済みの最大レコード数(
persistence_policy.max_records
)に達するまでリソースが成長し、そのような一定数のレコードが維持されます - 時間でレコードを期限切れにする。 これにより、レコードが古くなるまで、すなわち設定された特定の有効期限(
persistence_policy.expiration_time
)を超えるまで、リソースが大きくなることが可能です。
プログラマ・ガイド¶
NGSICKANSink
クラス¶
他の NGSI ライクのシンク NGSICKANSink
と同様に、NGSISink
ベースを拡張します。拡張されるメソッドは次のとおりです :
void persistBatch(NGSIBatch batch) throws Exception;
NGSIBatch
には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent
オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最終的に宛先はデータが保存される CKAN リソースを指定します。したがって、各宛先は、どの CKANBackend
実装でも永続化される宛先データ文字列を構成するために反復されます。
void capRecords(NGSIBatch batch, long maxRecords) throws EventDeliveryException;
このメソッドは常に persistBacth()
の直後に呼び出されます。upsert された同じデスティネーションリソースは、レコード数でチェックされます。設定された最大値(persistence_policy.max_records
)が更新されたリソースのいずれかで克服された場合、レコードの最大数に達するまで、必要な数の最も古いレコードが削除されます。
void expirateRecords(long expirationTime);
このメソッドは、persistence_policy.checking_time
に基づいて定期的に呼び出されます。設定された有効期限(persistence_policy.expiration_time)がいずれかのリソース内のいずれかのレコードで超えた場合、それは削除されます。
public void start();
CKANBackend
の実装が作成されます。起動するシーケンスは NGSICKANSink()
(コンストラクタ)、configure()
および、start()
であるため、start()
メソッドではなくコンストラクタで行う必要があります。
public void configure(Context);
上記のような完全な構成が、与えられた Context
インスタンスから読み取られます。
付属書類¶
列モードの CKAN リソースのプロビジョニング¶
このセクションは、CKAN API に精通していることを前提にしています。そうでない場合は、これを参照してください。
まず、データを永続化するために、リソースと関連するデータストアを作成する前に、CKAN 組織とパッケージ/データセットが必要です。
組織を作って、デモのために demo.ckan.org
で言いますが、あなたの CKAN を展開する必要があります。私たちのエンティティがその FIWARE service に入るため、組織名は service
です :
$ curl -X POST "http://demo.ckan.org/api/3/action/organization_create" -d '{"name":"service"}' -H "Authorization: xxxxxxxx"
上記の組織内にパッケージ/データセットを作成します。エンティティが、FIWARE service service
と FIWARE service path service_test なので、パッケージ名は service_test
です :
$ curl -X POST "http://demo.ckan.org/api/3/action/package_create" -d '{"name":"service_test","owner_org":"service"}' -H "Authorization: xxxxxxxx"
上記のパッケージ/データセット内のリソースを作成します。パッケージ IDは上記のパッケージ作成リクエストに対するレスポンスで与えられます。エンティティ IDが room1
でそのタイプが room
になるため、リソースの名前は room1_room
です :
$ curl -X POST "http://demo.ckan.org/api/3/action/resource_create" -d '{"name":"room1_room","url":"none","format":"","package_id":"d35fca28-732f-4096-8376-944563f175ba"}' -H "Authorization: xxxxxxxx"
最後に、上記のリソースに関連付けられたデータストアを作成し、列モードで Cygnus データを受信するのに適しています。リソース IDは、上記のリソース作成リクエストに対するレスポンスで与えられます :
$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_create" -d '{"fields":[{"id":"recvTime","type":"text"}, {"id":"fiwareServicePath","type":"text"}, {"id":"entityId","type":"text"}, {"id":"entityType","type":"text"}, {"id":"temperature","type":"float"}, {"id":"temperature_md","type":"json"}],"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7","force":"true"}' -H "Authorization: xxxxxxxx"
さて、Cygnusは、サービスとしてservice
、サービスパスとしてservice_test' の中に、
roomタイプの
room1` ID を持つエンティティのデータを永続化することができます :
time=2016-04-26T15:54:45.753CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=getEvents | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[240] : Starting internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9)
time=2016-04-26T15:54:45.754CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=getEvents | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[256] : Received data ({ "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "attributes" : [ { "name" : "temperature", "type" : "centigrade", "value" : "26.5" } ], "type" : "room", "isPattern" : "false", "id" : "room1" }, "statusCode" : { "code" : "200", "reasonPhrase" : "OK" } } ]})
time=2016-04-26T15:55:07.843CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[342] : Batch accumulation time reached, the batch will be processed as it is
time=2016-04-26T15:55:07.844CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[396] : Batch completed, persisting it
time=2016-04-26T15:55:07.846CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=persistAggregation | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSICKANSink[419] : [ckan-sink] Persisting data at OrionCKANSink (orgName=service, pkgName=service_test, resName=room1_room, data={"recvTime": "2016-04-26T13:54:45.756Z","fiwareServicePath": "/service_test","entityId": "room1","entityType": "room","temperature": "26.5"})
time=2016-04-26T15:55:08.948CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[400] : Finishing internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9)
インサートは CKAN API でも確認できます :
$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_search" -d '{"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7"}' -H "Authorization: xxxxxxxx"