NGSICKANSink

コンテンツ :

機能性

com.iot.telefonica.cygnus.sinks.NGSICKANSink、または単なる NGSICKANSinkは、CKAN サーバ内で NGSI ライクのコンテキストデータ・イベントを維持するように設計されたシンクです。通常、このようなコンテキストデータは、Orion Context Broker インスタンスによって通知されますが、NGSI 言語 を話す他のシステムである可能性があります。

データジェネレータとは無関係に、NGSI コンテキストデータは常に Cygnus ソースの内部オブジェクト NGSIEvent に変換されます。最終的には、これらのイベント内の情報を特定の CKAN データ構造にマップする必要があります。

次のセクションでこれについて詳しく説明します。

トップ

NGSIEvent オブジェクトへの NGSI イベントのマッピング

コンテキストデータを含む、通知された NGSI イベントは、NGSI データジェネレータまたは英ぞかされる最終的なバックエンドとは無関係に、NGSIEvent オブジェクトに変換されます。NGSIEvent は、各コンテキスト要素に対して作成され、このようなイベントは特定のヘッダと ContextElement オブジェクトを組み合わせたものです。

これは、NGSIRestHandler のおかげで、cygnus-ngsi Http のリスナー(Flume jergon のソース)で行われています。変換されると、データ (NGSIEvent オブジェクトとして)は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。

トップ

CKAN データ構造への NGSIEvents のマッピング

CKAN は、パッケージまたはデータセットを含む組織内のデータを編成します。これらのパッケージ/データセットには、最終的に PostgreSQL データベース (CKAN Datastore) またはプレーン・ファイル (CKAN Filestore) にデータがストアされるいくつかのリソースが含まれています。そのような組織は、NGSIEvent が永続化されるたびにNGSICKANSink によって利用されます。

トップ

組織の命名規則

通知された fiware-service ヘッダ値、または、そのようなヘッダがない場合、FIWARE service の既定値を名前とした組織が作成されます(存在しない場合)。

PostgreSQLをベースにしている文字のみのは受け入れられるので、英数字とアンダースコア(_)のみが受け入れられます。ハイフン(-)も使用できます。 これにより、enable_encoding 構成パラメーターに応じて、特定のエンコードが適用されます。

それにもかかわらず、PostgreSQL とは異なり、組織の長さは最大100文字、最小2文字です。

トップ

パッケージ/データセット命名規則

通知された fiware-servicefiware-servicePath のヘッダ値またはそのようなヘッダーがない場合、fiware-service とfiware-servicePathの既定値が連結されたパッケージ/データセットが上記の組織で作成されます(まだ存在しない場合)。

PostgreSQLをベースにしている文字のみのは受け入れられるので、英数字とアンダースコア(_)のみが受け入れられます。ハイフン(-)も使用できます。これにより、enable_encoding 構成パラメーターに応じて、特定のエンコードが適用されます。

それにもかかわらず、PostgreSQL とは異なり、データセットの長さは最大100文字、最小2文字です。

トップ

リソース命名規則

CKAN リソースは、単一のデータモデル(詳細については構成セクションを参照してください)に従います。すなわちエンティティごとです。したがって、リソース名は常にエンティティ ID とタイプの連結です。このような名前は、 NGSIEvent内のnotified_entities/grouped_entities ヘッダ値(グループ化ルールを使用するかどうかに応じて。詳細については構成セクションを参照してください)ですでに指定されています。

CKAN データストアおよびビューアも作成され、上記のリソースに関連付けられていることに注意する必要があります。このデータストアは、最終的に PostgreSQL のテーブルであり、永続化されたデータを保持します。

PostgreSQL に基づいているので、英数字とアンダースコア(_)のみが受け入れられます。ハイフン(-)も使用できます。これにより、enable_encoding 構成パラメーターに応じて、特定のエンコードが適用されます。

リソース名には実質的な制限はありませんが、組織名とパッケージ名の処理に合わせて、Cygnus は、最小2文字から最大100文字までの長さを制限します。

トップ

行ライクのストア (Row-like storing)

リソースに関連付けられたデータストア内にストアされている特定のデータに関して、attr_persistence パラメータが row (デフォルトのストアモード)に設定されている場合、通知されたデータは属性ごとにストアされ、それぞれのインサートを構成します。各インサートには次のフィールドがあります :

  • recvTimeTsUTC : タイムスタンプ(ミリ秒単位)
  • recvTime : 人間が判読可能な形式のUTCタイムスタンプ(ISO 8601)
  • fiwareServicePath : 通知された fiware-servicePath、または通知されない場合はデフォルトの設定済みパス
  • entityId : 通知されたエンティティ ID
  • entityType : 通知されたエンティティタイプ
  • attrName : 通知された属性名
  • attrType : 通知された属性タイプ
  • attrValue : 最も単純な形式では、この値は単なる文字列ですが、Orion 0.11.0 以降、Json オブジェクトまたは Json 配列になる可能性があります
  • attrMd : これには、Json の属性のメタデータ配列の文字列のシリアル化が含まれます。属性にメタデータがない場合、空の配列 []が挿入されます

トップ

列ライクのストア (Column-like storing)

リソースに関連付けられたデータストア内にストアされている特定のデータに関して、attr_persistence パラメータが column に設定されている場合は、通知されたエンティティ全体に対して、次のフィールドを含む単一のラインが作成されます :

  • recvTime : 人間が判読可能な形式のUTCタイムスタンプ (ISO 8601)
  • fiwareServicePath 通知されたパス、またはデフォルト・パス
  • entityId : 通知されたエンティティ ID
  • entityType : 通知されたエンティティタイプ
  • 通知された属性ごとに、属性として指定されたフィールドが考慮されます。このフィールドには、時間に沿って属性値が格納されます
  • 通知された属性ごとに、属性名と _md の連結として指定されたフィールドが考慮されます。このフィールドには、属性のメタデータ値が時間とともにストアされます

トップ

NGSIEvent

以下の NGSIEvent が通知された NGSI コンテキストデータから作成されたと仮定します。以下のコードは実体データ形式ではなくオブジェクト表現です :

ngsi-event={
    headers={
         content-type=application/json,
         timestamp=1429535775,
         transactionId=1429535775-308-0000000000,
         correlationId=1429535775-308-0000000000,
         fiware-service=vehicles,
         fiware-servicepath=/4wheels,
         <grouping_rules_interceptor_headers>,
         <name_mappings_interceptor_headers>
    },
    body={
        entityId=car1,
        entityType=car,
        attributes=[
            {
                attrName=speed,
                attrType=float,
                attrValue=112.9
            },
            {
                attrName=oil_level,
                attrType=float,
                attrValue=74.6
            }
        ]
    }
}

トップ

組織名、データセット名、リソース名

上記の例と古いエンコーディングを使用すると、これらは作成された CKAN 要素です

  • 組織 : vehicles
  • パッケージ : vehicles_4wheels
  • リソース : car1_car

新しいエンコーディングを使用すると :

  • 組織 : vehicles
  • パッケージ : vehiclesxffffx002f4wheels
  • リソース : car1xffffcar

トップ

行ライクのストア (Row-like storing)

構成パラメータが attr_persistence=row の場合、NGSICKANSink は、本体内のデータは次のように永続化します :

$ curl -s -S -H "Authorization: myapikey" "http://192.168.80.34:80/api/3/action/datastore_search?resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7"
{
    "help": "Search a DataStore resource...",
    "success": true,
    "result": {
        "resource_id": "3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7",
        "fields": [
            {
                "type": "int4",
                "id": "_id"
            },
            {
                "type": "int4",
                "id": "recvTimeTs"
            },
            {
                "type": "timestamp",
                "id": "recvTime"
            },
            {
                "type": "text",
                "id": "fiwareServicePath"
            },
            {
                "id": "entityId",
                "type": "text"
            },
            {
                "id": "entityType",
                "type": "text"
            },
            {
                "type": "text",
                "id": "attrName"
            },
            {
                "type": "text",
                "id": "attrType"
            },
            {
                "type": "json",
                "id": "attrValue"
            },
            {
                "type": "json",
                "id": "attrMd"
            }
        ],
        "records": [
            {
                "entityId": "car1",
                "entityType": "car",
                "fiwareServicePath": "4wheels",
                "attrType": "float",
                "recvTime": "2015-04-20T12:13:22.41.124Z",
                "recvTimeTs": 1429535775,
                "attrMd": null,
                "attrValue": "112.9",
                "attrName": "speed",
                "_id": 1
            },
            {
                "entityId": "car1",
                "entityType": "car",
                "fiwareServicePath": "4wheels",
                "attrType": "float",
                "recvTime": "2015-04-20T12:13:22.41.124Z",
                "recvTimeTs": 1429535775,
                "attrMd": null,
                "attrValue": "74.6",
                "attrName": "oil_level",
                "_id": 2
            }
        ],
        "_links": {
            "start": "/api/3/action/datastore_search?resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7",
            "next": "/api/3/action/datastore_search?offset=100&resource_id=3254b3b4-6ffe-4f3f-8eef-c5c98bfff7a7"
        },
        "total": 2
    }
}

トップ

列ライクのストア (Column-like storing)

attr_persistence=colum の場合は、NGSICKANSink` は、本体内のデータは次のように永続化します :

$ curl -s -S -H "Authorization: myapikey" "http://130.206.83.8:80/api/3/action/datastore_search?resource_id=611417a4-8196-4faf-83bc-663c173f6986"
{
    "help": "Search a DataStore resource...",
    "success": true,
    "result": {
        "resource_id": "611417a4-8196-4faf-83bc-663c173f6986",
        "fields": [
            {
                "type": "int4",
                "id": "_id"
            },
            {
                "type": "timestamp",
                "id": "recvTime"
            },
            {
                "type": "text",
                "fiwareServicePath": "4wheels"
            },
            {
                "type": "text",
                "entityId": "car1"
            },
            {
                "type": "text",
                "entityType": "car"
            },
            {
                "type": "json",
                "id": "speed"
            },
            {
                "type": "json",
                "id": "speed_md"
            },
            {
                "type": "json",
                "id": "oil_level"
            },
            {
                "type": "json",
                "id": "oil_level_md"
            }
        ],
        "records": [
            {
                "recvTime": "2015-04-20T12:13:22.41.124Z",
                "fiwareServicePath": "4wheels",
                "entityId": "car1",
                "entityType": "car",
                "speed": "112.9",
                "speed_md": null,
                "oil_level": "74.6",
                "oil_level_md": null,
                "_id": 1
            }
        ],
        "_links": {
            "start": "/api/3/action/datastore_search?resource_id=611417a4-8196-4faf-83bc-663c173f6986",
            "next": "/api/3/action/datastore_search?offset=100&resource_id=611417a4-8196-4faf-83bc-663c173f6986"
        },
        "total": 1
    }
}

注 : curl は、CKANに公開されているような REST APIs との対話を可能にする Unix コマンドです。

トップ

管理ガイド

構成

NGSICKANSinkは、 次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.NGSICKinkink である必要があります
channel yes N/A
enable_encoding  no false true または falsetrue は新しいエンコーディングを適用し、false は古いエンコーディングを適用します
enable_grouping no false true または false。詳細については、このリンクをチェックしてください
enable_name_mappings no false true または false。詳細については、このリンクをチェックしてください
data_model no dm-by-entity 構成されていなくても、常に dm-by-entity です
attr_persistence no row row または colum
ckan_host no localhost 実行している CKAN サーバの FQDN/IPアドレス
ckan_port no 80
ckan_viewer no recline_grid_view [利用可能な](http://docs.ckan.org/en/latest/maintaining/data-viewer.ht
ml)ビューアをCKANのドキュメントで確認してください
ssl no false
api_key yes N/A
orion_url no  http://localhost:1026 filestore の URL として置かれる
batch_size no 1 永続化の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま永続化される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合のリトライ回数。リトライしない場合は 0、無限にリトライする場合は -1 を使用してください。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
batch_retry_intervals no 5000 永続化されていないバッチに関するリトライが行われるコンマ区切りの間隔(ミリ秒単位)のリスト。最初のリトライは、最初の値と同じ数ミリ秒後に実行され、2回目の再試行は2番目の値の後に完了します。batch_ttl が間隔の数より大きい場合、最後の間隔が繰り返されます
backend.max_conns no 500 Http ベースの HDFS バックエンドに許可される最大接続数
backend.max_conns_per_route no 100 Http ベースの HDFS バックエンドに許可されているルートごとの最大接続数
persistence_policy.max_records no -1 リソースに上限が設定される前に許可されるレコードの最大数。-1はこのポリシーを無効にします
persistence_policy.expiration_time no -1 有効期限が切れる前にリソース内でレコードが維持される最大秒数。-1はこのポリシーを無効にします
persistence_policy.checking_time no 3600 シンクがレコードの有効期限をチェックする頻度 (秒単位)

構成例は、次のとおりです :

cygnus-ngsi.sinks = ckan-sink
cygnus-ngsi.channels = ckan-channel
...
cygnus-ngsi.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.NGSICKANSink
cygnus-ngsi.sinks.ckan-sink.channel = ckan-channel
cygnus-ngsi.sinks.ckan-sink.enable_encoding = false
cygnus-ngsi.sinks.ckan-sink.enable_grouping = false
cygnus-ngsi.sinks.ckan-sink.enable_name_mappings = false
cygnus-ngsi.sinks.ckan-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.ckan-sink.attr_persistence = column
cygnus-ngsi.sinks.ckan-sink.ckan_host = 192.168.80.34
cygnus-ngsi.sinks.ckan-sink.ckan_port = 80
cygnus-ngsi.sinks.ckan-sink.ckan_viewer = recline_grid_view
cygnus-ngsi.sinks.ckan-sink.ssl = false
cygnus-ngsi.sinks.ckan-sink.api_key = myapikey
cygnus-ngsi.sinks.ckan-sink.orion_url = http://localhost:1026
cygnus-ngsi.sinks.ckan-sink.batch_size = 100
cygnus-ngsi.sinks.ckan-sink.batch_timeout = 30
cygnus-ngsi.sinks.ckan-sink.batch_ttl = 10
cygnus-ngsi.sinks.ckan-sink.batch_retry_intervals = 5000
cygnus-ngsi.sinks.ckan-sink.backend.max_conns = 500
cygnus-ngsi.sinks.ckan-sink.backend.max_conns_per_route = 100
cygnus-ngsi.sinks.ckan-sink.persistence_policy.max_records = 5
cygnus-ngsi.sinks.ckan-sink.persistence_policy.expiration_time = 86400
cygnus-ngsi.sinks.ckan-sink.persistence_policy.checking_time = 600

トップ

ユースケース

中期的に大きく伸びていないデータベースストレージを探している場合は、NGSICKANSink を使用してください。

トップ

重要なメモ

永続化モード

同じ数の属性が通知されるとは限りません。これは、NGSI のような送信者へのサブスクリプションに依存します。 通知された属性ごとに固定の8フィールドの行(fixed 8-fields rows)がアップデートされるため、row の永続化モードでは問題ありません。 それにもかかわらず、column モードはフィールドの長さが異なる複数行の影響を受ける可能性があります。 したがって、column モードは、サブスクリプションが最後の通知以降に更新されなかった場合、常に同じ属性、イベントを送信するように設計されている場合にのみ推奨されます。

さらに、column モードで実行している場合、通知される属性の数(したがってデータストア内に書き込まれるフィールドの数)が Cygnus によって認識されないため、Datastore を自動的に作成することはできず、Cygnusの 実行前にプロビジョニングする必要があります。これは、通知される属性の数に関係なく、書き込まれるフィールドの数が常に一定であるため、row モードの場合ではありません。

列モードのリソースをプロビジョニングする方法を知るために、付録を確認してください :

トップ

バッチ処理

プログラマ・ガイドで説明したように、NGSICKANSink は、内部 Flume チャネルからイベントを収集するための組み込みのメカニズムを提供する NGSISink を拡張します。このメカニズムにより、拡張クラスは、最終的なバックエンドにおけるこのような一連のイベントの永続化の詳細のみを処理することができます。

バッチのメカニズムに関して重要なことは、インサートの数が大幅に減少するため、シンクの性能を大幅に向上させることです。例を見てみましょう。100個の NGSIEvent を一組としましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ CKAN リソース に保持されます。イベントを1つずつ処理する場合は、 CKAN に100個のインサートが必要です。それにもかかわらず、この例では、1つのインサートのみが必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、これは問題ではありません。イベントのいくつかのサブバッチがバッチ内に作成されるため、最終的な出力先 CKAN リソースごとに1つのサブバッチが作成されるためです。 最悪の場合、100のエンティティは100種類のエンティティ(100種類の CKAN リソース)になりますが、これは通常のシナリオではありません。したがって、1バッチあたり10〜15個のサブバッチが現実的であると仮定すると、10〜15個のインサートのみを用いたイベントアプローチによるイベントの100個のインサートを置き換えます。

バッチ処理メカニズムは、新しいデータが到着しないときにシンクがバッチ構築の待ち状態にとどまるのを防ぐために蓄積タイムアウトを追加します。 そのようなタイムアウトに達すると、バッチはそのまま維持されます。

永続化されていないバッチのリトライに関しては、2つのパラメータが使用されます。一方は、Cygnus がイベントを確実にドロップする前にリトライする回数を指定して、Time-To-Live(TTL) を使用します。もう一方は、リトライ間隔のリストを構成することができます。そのようなリストは最初のリトライ間隔を定義し、次に2番目のリトライ間隔を定義します。TTLがリストの長さより大きい場合、最後のリトライ間隔が必要な回数繰り返されます。

デフォルトで、NGSICKANSink は、設定されたバッチサイズは1で、バッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものより大きいことは意味がありません)。また、推定されたサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。イベントのバッチとその適切なサイジングについての深い議論は、パフォーマンスのドキュメントで見つけることができます。

トップ

エンコーディング

バージョン1.2.0(含む)まで、Cygnus は非常に単純なエンコーディングを適用しました:

  • 英数字以外の文字はすべてアンダースコア(_)で置き換えられました
  • アンダースコアは連結文字としても使用されていました
  • FIWARE service paths内のスラッシュ(/)は無視されます

バージョン1.3.0(含む)から、Cygnus は CKAN データ構造に合わせたこの特定のエンコーディングを適用します :

  • 小文字の英数字はエンコードされません
  • 大文字の英数字がエンコードされます
  • 数字はエンコードされません
  • アンダースコア文字(_)はエンコードされません。
  • ハイフン文字(-)はエンコードされません。
  • 等号文字(=)は、xffff としてエンコードされます。
  • FIWARE service paths のスラッシュを含む他のすべての文字は、Unicode 文字の x 文字としてエンコードされます
  • x 文字と Unicode で構成されるユーザ定義の文字列は、Unicodeの後に xx としてエンコードされます
  • xffff は、連結文字として使用されます

将来、古いエンコーディングは非推奨になりますが、構成セクションで説明したように、enable_encoding パラメータを使用してエンコーディングタイプを切り替えることは可能です。

トップ

ジオ・ロケーション属性

CKAN はいくつかのビューアをサポートしています。その中に recline_map_viewer があります。これは、ジオロケーション・データをレンダリングできる典型的な 2D マップです。

CKAN のジオロケーションデータは、次の2つの方法で追加できます :

  • 経度には DataStore の列を使用し、緯度には DataStore の別の列を使用し、どのリソースフィールド(データストア内の列)にジオロケーション情報が含まれているかをビューアで設定します。フィールド/列の名前が longitudelatitude に直接指定されている場合は、ビューアによって自動的に選択されます。このオプションは 2D 座標でのみ有効です。
  • Json タイプの geojson という単一の列を使用します。実際には、文字列 geojson の大文字と小文字の任意の組み合わせが有効です。例えば、GeoJsongeoJSON、等。デフォルトでは、このような列はジオロケーションデータのレンダリングに直接使用されます。geojson という名前が付けられていない場合、GeoJson の値を含むフィールドまたは列を選択することができます。GeoJson は任意のジオメトリ(点、線、ポリゴン)をレンダリングできることに注意してください。

したがって、これらのジオロケーション機能とNGSIエンティティモデルをかなり簡単にマッピングするようです :

  • それぞれ座標の経度と緯度の部分を含む longitudelatitude の属性のペアを追加するだけです
  • または、有効な GeoJson 値を含む単一の geojson 属性を追加します

最後に、ジオロケーションの情報をCKANデータ構造にマッピングするこの方法は、実際にはジオロケーション目的の特別なタイプを定義するNGSIモデルには従っていません(geo:point, geo:line, geo:box および geo:json)。つまり、それが機能していても、geojson または longitudelatitude を使うことは NGSI では標準ではありません。したがって、このシンクの将来のリリースでは、NGSI ジオロケーションタイプがサポートされ、CKANが理解できる上記のジオロケーションのフィールド/列に自動的に変換されます。

トップ

リソースの上限/レコードの有効期限

キャッピングと有効期限は、デフォルトでは無効になっています。それにもかかわらず、必要に応じてこれを有効にすることができます :

  • レコード数で上限を設定する。これにより、構成済みの最大レコード数(persistence_policy.max_records)に達するまでリソースが成長し、そのような一定数のレコードが維持されます
  • 時間でレコードを期限切れにする。 これにより、レコードが古くなるまで、すなわち設定された特定の有効期限(persistence_policy.expiration_time)を超えるまで、リソースが大きくなることが可能です。

トップ

プログラマ・ガイド

NGSICKANSink クラス

他の NGSI ライクのシンク NGSICKANSink と同様に、NGSISink ベースを拡張します。拡張されるメソッドは次のとおりです :

void persistBatch(NGSIBatch batch) throws Exception;

NGSIBatch には 通知されたコンテキストデータイベントを解析した結果である NGSIEvent オブジェクトのセットが含まれます。バッチ内のデータは宛先別に分類され、最終的に宛先はデータが保存される CKAN リソースを指定します。したがって、各宛先は、どの CKANBackend 実装でも永続化される宛先データ文字列を構成するために反復されます。

void capRecords(NGSIBatch batch, long maxRecords) throws EventDeliveryException;

このメソッドは常に persistBacth() の直後に呼び出されます。upsert された同じデスティネーションリソースは、レコード数でチェックされます。設定された最大値(persistence_policy.max_records)が更新されたリソースのいずれかで克服された場合、レコードの最大数に達するまで、必要な数の最も古いレコードが削除されます。

void expirateRecords(long expirationTime);

このメソッドは、persistence_policy.checking_time に基づいて定期的に呼び出されます。設定された有効期限(persistence_policy.expiration_time)がいずれかのリソース内のいずれかのレコードで超えた場合、それは削除されます。

public void start();

CKANBackend の実装が作成されます。起動するシーケンスは NGSICKANSink() (コンストラクタ)、configure() および、start() であるため、start() メソッドではなくコンストラクタで行う必要があります。

public void configure(Context);

上記のような完全な構成が、与えられた Context インスタンスから読み取られます。

トップ

付属書類

列モードの CKAN リソースのプロビジョニング

このセクションは、CKAN API に精通していることを前提にしています。そうでない場合は、これを参照してください。

まず、データを永続化するために、リソースと関連するデータストアを作成する前に、CKAN 組織とパッケージ/データセットが必要です。

組織を作って、デモのために demo.ckan.org で言いますが、あなたの CKAN を展開する必要があります。私たちのエンティティがその FIWARE service に入るため、組織名は service です :

$ curl -X POST "http://demo.ckan.org/api/3/action/organization_create" -d '{"name":"service"}' -H "Authorization: xxxxxxxx"

上記の組織内にパッケージ/データセットを作成します。エンティティが、FIWARE service service と FIWARE service path service_test なので、パッケージ名は service_testです :

$ curl -X POST "http://demo.ckan.org/api/3/action/package_create" -d '{"name":"service_test","owner_org":"service"}' -H "Authorization: xxxxxxxx"

上記のパッケージ/データセット内のリソースを作成します。パッケージ IDは上記のパッケージ作成リクエストに対するレスポンスで与えられます。エンティティ IDが room1 でそのタイプが room になるため、リソースの名前は room1_room です :

$ curl -X POST "http://demo.ckan.org/api/3/action/resource_create" -d '{"name":"room1_room","url":"none","format":"","package_id":"d35fca28-732f-4096-8376-944563f175ba"}' -H "Authorization: xxxxxxxx"

最後に、上記のリソースに関連付けられたデータストアを作成し、列モードで Cygnus データを受信するのに適しています。リソース IDは、上記のリソース作成リクエストに対するレスポンスで与えられます :

$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_create" -d '{"fields":[{"id":"recvTime","type":"text"}, {"id":"fiwareServicePath","type":"text"}, {"id":"entityId","type":"text"}, {"id":"entityType","type":"text"}, {"id":"temperature","type":"float"}, {"id":"temperature_md","type":"json"}],"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7","force":"true"}' -H "Authorization: xxxxxxxx"

さて、Cygnusは、サービスとしてservice、サービスパスとしてservice_test' の中に、roomタイプのroom1` ID を持つエンティティのデータを永続化することができます :

time=2016-04-26T15:54:45.753CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=getEvents | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[240] : Starting internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9)
time=2016-04-26T15:54:45.754CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=getEvents | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[256] : Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "temperature",            "type" : "centigrade",            "value" : "26.5"          }        ],        "type" : "room",        "isPattern" : "false",        "id" : "room1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
time=2016-04-26T15:55:07.843CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[342] : Batch accumulation time reached, the batch will be processed as it is
time=2016-04-26T15:55:07.844CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[396] : Batch completed, persisting it
time=2016-04-26T15:55:07.846CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=persistAggregation | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSICKANSink[419] : [ckan-sink] Persisting data at OrionCKANSink (orgName=service, pkgName=service_test, resName=room1_room, data={"recvTime": "2016-04-26T13:54:45.756Z","fiwareServicePath": "/service_test","entityId": "room1","entityType": "room","temperature": "26.5"})
time=2016-04-26T15:55:08.948CEST | lvl=INFO | corr=b465ffb8-710f-4cd3-9573-dc3799f774f9 | trans=b465ffb8-710f-4cd3-9573-dc3799f774f9 | svc=service | subsvc=/service_test | function=processNewBatches | comp=cygnus-ngsi | msg=com.telefonica.iot.cygnus.sinks.NGSISink[400] : Finishing internal transaction (b465ffb8-710f-4cd3-9573-dc3799f774f9)

インサートは CKAN API でも確認できます :

$ curl -X POST "http://demo.ckan.org/api/3/action/datastore_search" -d '{"resource_id":"48c120df-5bcd-48c7-81fa-8ecf4e4ef9d7"}' -H "Authorization: xxxxxxxx"

トップ