Cygnus を使用して Orion Context Broker と Spark Streaming を接続

コンテンツ :

イントロダクション

このドキュメントの背後にあるアイデアは、Orion Context Broker から Spark Streaming へのリアルタイムのコンテキスト情報の変更を送信する方法を、段階的に説明することです。

このような接続は、Cygnusのおかげで、具体的には次のコンポーネントを使用して行われます :

  • Flume のネイティブ HTTP ソースで、TCP/5050ポートでリッスン
  • CygnusからのカスタムNGSI REST ハンドラ
  • Flumeのネイティブ・メモリチャネル
  • Flumeのネイティブ Avro シンク、Avroバイナリ・イベントをTCP/5051ポートに送信

最終的なアーキテクチャは次のようになります :

このドキュメントの目的に従って、すべてのコンポーネントのインストール、構成、および実行について説明します。簡単にするために、すべてのコンポーネントが単一のローカルマシンにデプロイされます。Apache Sparkは、特にクラスタの上で動作するように設計されていますが、ローカルモードでも動作可能です。

トップ

Orion Context Broker のセットアップ

Orion Context Broker インストール・ガイドはこちらからご覧いただけます。少なくとも、リリース1.6.0 を使用することをお勧めします。

このインテグレーション例では、デフォルトの設定でかまいません。シェルで入力するだけで簡単に実行できます:

$ sudo service contextBroker start
Starting contextBroker...                                    [  OK  ]

いつものように、Orion Context Broker が特定のコンテキスト・エンティティの更新に関する通知を Cygnus に送信するためには、サブスクリプションを行う必要があります。この例では、典型的な room1/room entity が使用されています :

$ curl -X POST "http://localhost:1026/v2/subscriptions" -s -S -H "Content-Type: application/json" -d @- <<EOF
{
  "subject": {
    "entities": [
      {
        "id": "room1",
        "type": "room"
      }
    ]
  },
  "notification": {
    "http": {
      "url": "http://localhost:5050/notify"
    },
    "attrsFormat": "legacy"
  }
}
EOF

他のコマンドを使用して、コンテキスト・エンティティが Orion に追加されます。この例では、典型的な room1/room entity エンティティが使用されています :

curl -X POST "http://localhost:1026/v2/entities" -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
  "id": "room1",
  "type": "room",
  "temperature": {
    "value": 26.5,
    "type": "Float"
  }
}
EOF

他のコマンドを使って更新しました :

curl -X POST "http://localhost:1026/v2/entities/room1/attrs" -s -S -H 'Content-Type: application/json' -d @- <<EOF
{
  "temperature": {
    "value": 27.5,
    "type": "Float"
  }
}
EOF

Orion Context Broker の詳細については、Orion Context Broker の公式ドキュメントを参照してください。

トップ

Cygnus のセットアップ

インストールに関しては、FIWARE yum repository から行います。Cygnus エージェントを最新のバージョンにインストールしたら、次のように設定する必要があります :

$ cat /usr/cygnus/conf/agent_spark.conf
cygnusagent.sources = http-source
cygnusagent.sinks = avro-sink
cygnusagent.channels = avro-channel

cygnusagent.sources.http-source.type = http
cygnusagent.sources.http-source.channels = avro-channel
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.default_service = default
cygnusagent.sources.http-source.handler.default_service_path = /

cygnusagent.sinks.avro-sink.type = avro
cygnusagent.sinks.avro-sink.channel = avro-channel
cygnusagent.sinks.avro-sink.hostname = 0.0.0.0
cygnusagent.sinks.avro-sink.port = 5051

cygnusagent.channels.avro-channel.type = memory
cygnusagent.channels.avro-channel.capacity = 1000
cygnusagent.channels.avro-channel.transactionCapacity = 100

Cygnus インスタンスの設定ファイルも追加する必要があります。次のようにします :

$ cat /usr/cygnus/conf/cygnus_instance_spark.conf
CYGNUS_USER=cygnus
CONFIG_FOLDER=/usr/cygnus/conf
CONFIG_FILE=/usr/cygnus/conf/agent_spark.conf
AGENT_NAME=cygnus-ngsi
LOGFILE_NAME=cygnus.log
ADMIN_PORT=8081
POLLING_INTERVAL=30

そして、サービスとして起動できます :

$ sudo service cygnus start
Starting Cygnus spark...                                    [  OK  ]

Orion Context Broker と Cygnusとの統合をテストするには、単にコンテキスト・エンティティを更新し、Cygnus ログ内の関連する通知受信をチェックします :

$ tail -f /var/log/cygnus/cygnus.log
...
time=2017-01-27T13:33:28.236UTC | lvl=INFO | corr=a72db6ff-ce15-4d38-a3a3-95af2ecc73b8 | trans=a72db6ff-ce15-4d38-a3a3-95af2ecc73b8 | srv=sc_valencia | subsrv=/gardens | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[304] : [NGSIRestHandler] Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "temperature",            "type" : "centigrade",            "value" : 26.5          }        ],        "type" : "room",        "isPattern" : "false",        "id" : "room1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
...

このインテグレーションの瞬間には、Spark に接続できないという Java の例外がいくつか見受けられます。TCP/ 5051 ポート上の Spark リスナーはまだ実行されていないので、これは正常です。

トップ

Spark Streamingのセットアップ

Apache Sparkは、Spark Download ページで入力した URL からインストールされ、以下を選択します :

  • Spark リリース : 1.6.3
  • パッケージタイプ : Hadoop 2.3 用に事前ビルド済み
  • ダウンロードタイプ : 直接ダウンロード

"download Spark" リンクが指し示す URL をコピーして、例えば、自分のマシンのホームディレクトリにダウンロードする必要があります。 あなたのケースでは、d3kbcqa49mib13.cloudfront.net を実際のベース URL に置き換えてください :

$ cd ~
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.3-bin-hadoop2.3.tgz
--2017-01-18 14:40:31--  http://d3kbcqa49mib13.cloudfront.net/spark-1.6.3-bin-hadoop2.3.tgz
Resolviendo d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)... 54.240.186.114, 54.240.186.149, 54.240.186.159, ...
Conectando con d3kbcqa49mib13.cloudfront.net (d3kbcqa49mib13.cloudfront.net)[54.240.186.114]:80... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 273211089 (261M) [application/x-tar]
Grabando a: “spark-1.6.3-bin-hadoop2.3.tgz”

100%[===========================================================================================================================================>] 273.211.089 30,8MB/s   en 9,6s   

2017-01-18 14:40:40 (27,3 MB/s) - “spark-1.6.3-bin-hadoop2.3.tgz” guardado [273211089/273211089]

ダウンロードしたファイルを解凍して削除する必要があります :

$ tar xzf spark-1.6.3-bin-hadoop2.3.tgz
$ rm spark-1.6.3-bin-hadoop2.3.tgz

さて、Spark は、ホームディレクトリ下で利用可能になるはずですので、ディレクトリを変更しましょう :

$ cd spark-1.6.3-bin-hadoop2.3

トップ

総合テスト

リアルタイム・コンテキスト情報解析のアーキテクチャ全体をテストするために、Sparkが JavaFlumeEventCount というサンプル内で提供する、すでに開発された解析アプリケーションを使用します :

$ ls examples/src/main/java/org/apache/spark/examples/streaming/ | grep JavaFlumeEventCount.java
JavaFlumeEventCount.java

JavaFlumeEventCount は、設定されたポートで受信した Avro イベントの数を単純に数えます。このようなアプリケーションは Spark の run-example スクリプトを使って実行します :

$ ./bin/run-example streaming.JavaFlumeEventCount 0.0.0.0 5051
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/01/18 14:46:26 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
17/01/18 14:46:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1484750792000 ms
-------------------------------------------
Received 0 flume events.

ご覧のように、Avro イベントは読み取られません。それにもかかわらず、サブスクライブしているものの中からあるコンテキスト・エンティティを更新すると、アプリケーションがそれを受け取ることがわかります。初回は数秒かかるはずです:

-------------------------------------------
Time: 1484750800000 ms
-------------------------------------------
Received 1 flume events.

トップ

詳細情報

この後、ユーザは、NGSI コンテキスト情報を含むバイナリ Avro イベントに基づいて、Spark Streaming のための独自の分析を作成する必要があります。

FIWARE C osmos repository のライブラリとしてバンドルされた Spark 用の NGSI 固有の関数を使用することを強くお勧めします。このようなライブラリには、バイナリ Avro イベントをデシリアライズするためのプリミティブ、コンテキスト・エンティティモデルに適合したタプル(tuples)の使用などがあります。また、FIWARE 開発者を確実にサポートし、Spark の上に NGSI ライクのアナリティクスの設計を簡素化します。

トップ