TwitterHDFSSink

コンテンツ :

機能性

com.telefonica.iot.cygnus.sinks.TwitterHDFSSink、または単純に TwitterHDFSSinkは、 HDFS 展開内のツイートのデータ・イベントを永続化するように設計されたシンクです。データは Twitter によって提供されています。

ツイートは、常に twitter エージェント・ソースの内部 Flume イベントに変換されます。最後に、これらの Flume イベント内の情報は、Twitter エージェント・シンクの特定の HDFS データ構造にマップする必要があります。

次のセクションでこれについて詳しく説明します。

トップ

Twitter イベントを Flume イベントにマップ

受信したTwitterイベントは、持続される最後のバックエンドとは独立して、Flume イベント(具体的には、TwitterEvent)に変換されます。

flume TwitterEvent の本体は、JSON 形式のツイートを表します。変換されると、データ (現在、Flume イベントとして) は、今後の消費のために内部チャネルに入れられます。次のセクションを参照してください。

トップ

HDFS データ構造への Flume イベントのマッピング

HDFS は大きなデータファイルを含むフォルダ内のデータを整理します。このような機構は、Flumeイベントが永続化されるたびに TwitterHDFSSink によって活用されます。

/user/<hdfs_folder>/<hdfs_file> という名前のファイルが、まだ存在しない場合に作成され、そこの <hdfs_folder><hdfs_file> が設定パラメータです。

このファイルでは、各ツイートごとに JSON 行が作成されます。ツイートには、Twitter で指定されたすべての標準フィールドが含まれています。ファイルでは、ツイートは \n で区切られています。混乱を避け、HDFS ファイルを信頼できるものにするため、ツイートの \n がすべて削除されました(セマンティック情報を提供しないため)。このように、ファイルに表示される唯一の \n 文字は、ツイートを複数の行に分割します。

トップ

Hive

現在、このバージョンの TwitterHDFSSinkでは、Hiveはサポートされていません。

トップ

管理ガイド

構成

TwitterHDFSSink は、次のパラメータによって構成されます :

パラメータ 必須 デフォルト値 コメント
type yes N/A com.telefonica.iot.cygnus.sinks.TwitterHDFSSink である必要があります
channel yes N/A
enable_lowercase no false truefalse
backend.impl no rest restは、WebHDFS/HttpFS ベースの実装を使用して HDFS と通信する場合、または、binaryは、HDFSと相互作用する場合HadoopのAPIベースの実装が使用される場合は
backend.max_conns no 500 Http ベースの HDFS バックエンドに許可される最大接続数。バイナリバックエンド実装を使用している場合は無視されます
backend.max_conns_per_route no 100 Http ベースの HDFS バックエンドに許可されているルートごとの最大接続数。バイナリバックエンド実装を使用している場合は無視されます
hdfs_host no localhost HDFS ネームノードが実行される FQDN/IP アドレス、または HDFS HA Namenodes が実行される FQDN/IP アドレスのコンマ区切りリスト
hdfs_port no 14000 HttpFS(rest)を使用する場合は 14000、WebHDFS(rest)を使用する場合は 50070、Hadoop API(binary)を使用する場合は 8020
hdfs_username yes N/A HDFS には既に存在するユーザー
hdfs_password yes N/A 上記 hdfs_username のパスワード。これは Hive 認証にのみ必要です
oauth2_token yes N/A HDFS 認証に必要な OAuth2 トークン
batch_size no 1 永続性の前に蓄積されたイベントの数
batch_timeout no 30 バッチがそのまま持続される前に構築される秒数
batch_ttl no 10 バッチを永続化できない場合の再試行回数。無限に再0試行する場合は、再試行しないでください-1。無限のTTL(非常に大きいものでさえ)がすべてのシンクのチャネル容量を非常に素早く消費するかもしれないと考えてください
krb5_auth no false truefalse
krb5_user yes empty krb5_auth=false なら無視。そうでなければ、必須
krb5_password yes empty krb5_auth=false なら無視。そうでなければ、必須
krb5_login_conf_file no /usr/cygnus/conf/krb5_login.conf krb5_auth=false なら無視
krb5_conf_file no /usr/cygnus/conf/krb5.conf krb5_auth=false なら無視
hdfs_folder yes N/A ツイート付きのファイルが保存されるフォルダ
hdfs_file yes N/A ツイートが格納されるファイルの名前。このファイルは、hdfs_folder の中に作成されます

設定例は次のとおりです :

# ============================================
# TwitterHDFSSink configuration
# channel name from where to read notification events
cygnus-twitter.sinks.hdfs-sink.channel = hdfs-channel
# sink class, must not be changed
cygnus-twitter.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.TwitterHDFSSink
# true if lower case is wanted to forced in all the element names, false otherwise
# cygnus-twitter.sinks.hdfs-sink.enable_lowercase = false
# rest if the interaction with HDFS will be WebHDFS/HttpFS-based, binary if based on the Hadoop API
# cygnus-twitter.sinks.hdfs-sink.backend.impl = rest
# maximum number of Http connections to HDFS backend
# cygnus-twitter.sinks.hdfs-sink.backend.max_conns = 500
# maximum number of Http connections per route to HDFS backend
# cygnus-twitter.sinks.hdfs-sink.backend.max_conns_per_route = 100
# Comma-separated list of FQDN/IP address regarding the HDFS Namenode endpoints
# If you are using Kerberos authentication, then the usage of FQDNs instead of IP addresses is mandatory
# cygnus-twitter.sinks.hdfs-sink.hdfs_host = localhost
# port of the HDFS service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs
# cygnus-twitter.sinks.hdfs-sink.hdfs_port = 14000
# username allowed to write in HDFS
cygnus-twitter.sinks.hdfs-sink.hdfs_username = hdfs_username
# password for the above username; this is only required for Hive authentication
cygnus-twitter.sinks.hdfs-sink.hdfs_password = xxxxxxxx
# OAuth2 token for HDFS authentication
cygnus-twitter.sinks.hdfs-sink.oauth2_token = xxxxxxxx
# timeout for batch accumulation
# cygnus-twitter.sinks.hdfs-sink.batch_timeout = 30
# number of retries upon persistence error
# cygnus-twitter.sinks.hdfs-sink.batch_ttl = 10
# Hive enabling
# cygnus-twitter.sinks.hdfs-sink.hive = false
# Hive server version, 1 or 2 (ignored if hive is false)
# cygnus-twitter.sinks.hdfs-sink.hive.server_version = 2
# Hive FQDN/IP address of the Hive server (ignored if hive is false)
# cygnus-twitter.sinks.hdfs-sink.hive.host = localhost
# Hive port for Hive external table provisioning (ignored if hive is false)
# cygnus-twitter.sinks.hdfs-sink.hive.port = 10000
# Hive database type, available types are default-db and namespace-db
# cygnus-twitter.sinks.hdfs-sink.hive.db_type = default-db
# Kerberos-based authentication enabling
# cygnus-twitter.sinks.hdfs-sink.krb5_auth = false
# Kerberos username (ignored if krb5_auth is false)
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_user = krb5_username
# Kerberos password (ignored if krb5_auth is false)
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_password = xxxxxxxxxxxxx
# Kerberos login file (ignored if krb5_auth is false)
# cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_login_conf_file = /usr/cygnus/conf/krb5_login.conf
# Kerberos configuration file (ignored if krb5_auth is false)
# cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.conf
# Set folder and file to store tweets
cygnus-twitter.sinks.hdfs-sink.hdfs_folder = olympic_games_2016
cygnus-twitter.sinks.hdfs-sink.hdfs_file = tweets.txt

トップ

重要なメモ

バイナリ・バックエンド

HDFS バイナリ・バックエンドの現在の実装は、認証メカニズムをサポートしていません。

望ましい認証方法は、FIWAREの標準である、OAuth2 ですが、バイナリ・バックエンドがアクセスするリモート RPC サーバでは現在サポートされていません。

有効な認証機構は、Kerberos と Hadoop Delegation Token ですが、それでも誰も使用されておらず、cygnus ユーザ (Cygnusを実行しているユーザ) がそれを偽装するためにはバックエンドは単純にユーザ名 (hdfs_usernameで設定されたもの) を必要とします。

したがって、マルチユーザ環境でこのバックエンドを使用することは推奨されません。少なくとも、ユーザがユーザ名を指定するだけで他のユーザを偽装する可能性があります。

fiware-cosmos プロジェクトのコンテキストで、Hadoop RPC メカニズムに OAuth2 サポートを追加する際に問題があります。

トップ

バッチ処理

プログラマ・ガイドで説明したように、、TwitterHDFSSinkTwitterSink を拡張し、内部のFlumeチャネルからイベントを収集するための組み込みのメカニズムを提供します。 このメカニズムは、退出しているクラスが、最終的なバックエンドにおけるそのようなイベントのバッチの永続性の詳細を処理するだけでよいです。

バッチ・メカニズムに関して重要なのは、書き込みの回数が大幅に減るため、シンクのパフォーマンスが大幅に向上することです。例を見てみましょう.100 Flume イベントのバッチを想定しましょう。最良の場合、これらのイベントはすべて同じエンティティに関係します。つまり、その中のすべてのデータが同じ HDFS ファイルに保持されます。イベントを1つずつ処理する場合、HDFS への書き込みは 100回必要です。それにもかかわらず、この例では、書き込みは 1回だけ必要です。明らかに、すべてのイベントが常に同じユニークなエンティティを考慮するとは限らず、多くのエンティティがバッチ内に関与する可能性があります。しかし、バッチ内にいくつかのサブバッチのイベントが作成され、最終的な HDFS ファイルごとに 1つのサブバッチが作成されるため、問題はありません。最悪の場合、100のエンティティは 100種類のエンティティ(100種類の HDFS 送り先)になりますが、これは通常のシナリオではありません。したがって、バッチあたり 10〜15のサブバッチが現実的であると仮定すると、10〜15回の書き込みしかないイベントアプローチによって、イベントの 100回の書き込みを置き換えます。

バッチ・メカニズムは、新しいデータが到着しないときにシンクがバッチ・ビルディングの永遠の状態にとどまるのを防ぐために蓄積タイムアウトを追加します。そのようなタイムアウトに達すると、バッチはそのまま維持されます。

デフォルトで、TwitterHDFSSinkでは、設定されたバッチサイズは1とバッチ蓄積タイムアウトは30秒です。それにもかかわらず、上で説明したように、パフォーマンス目的で少なくともバッチサイズを増やすことを強くお勧めします。どのような最適な値ですか?バッチのサイズは、イベントが得られているチャネルのトランザクションサイズに密接に関連しています(最初のものが2番目のものよりも大きいことは意味がありません)。また、予想されるサブバッチの数にも依存します。累積タイムアウトは、最終ストレージに新しいデータを出力する頻度に依存します。

トップ

プログラマ・ガイド

TwitterHDFSSink クラス

TwitterHDFSSink は、ベース TwitterSink を拡張ます。拡張されるメソッドは次のとおりです :

void persistBatch(Batch batch) throws Exception;

Batch は、通知されたコンテキストデータイベントを解析した結果である TwitterEvent オブジェクトのセットを管理します。 バッチ内のデータは宛先別に分類され、最終的に宛先はデータが保存される HDFS ファイルを指定します。 したがって、HDFSBackend 実装 (binary または rest) のおかげで、永続化される宛先データ・ストリングを構成するために、各宛先が反復されます。

public void start();

HDFSBackend の実装が作成されます。 起動するシーケンスは、TTwitterHDFSSink() (コンストラクタ), configure()start() であるため、これは、start() メソッドではなくコンストラクタで行う必要があります。

public void configure(Context);

上記のような完全な構成が所定の Context インスタンスから読み取られます。

トップ

HDFSBackendImpl クラス

これは、HttpBackend 抽象クラス(任意の Http 接続ベースのバックエンドに共通ロジックを提供)を拡張し、HDFSBackend インタフェース(HDFS バックエンドが実装する必要があるメソッドを提供する)を実装する HDFS の便利なバックエンドクラスです。 関連する方法は次のとおりです :

public void createDir(String dirPath) throws Exception;

パスを指定して HDFS ディレクトリを作成します。

public void createFile(String filePath, String data) throws Exception;

パスを指定して HDFS ファイルを作成し、そのファイルに初期データを書き込みます。

public void append(String filePath, String data) throws Exception;

既存の HDFS ファイルに新しいデータを追加します。

public boolean exists(String filePath) throws Exception;

パスが指定された HDFS ファイルが存在するかどうかをチェックします。

public void provisionHiveTable(FileFormat fileFormat, String dirPath, String tag) throws Exception;

トップ

OAuth2 認証

OAuth2 は、認証のためのオープン・スタンダードである OAuth プロトコルの進化です。OAuth を使用すると、クライアント・アプリケーションは、リソース所有者に代わって特定のサーバ・リソースにアクセスし、資格情報をサービスと共有せずに最高の方法でアクセスすることができます。これは、いくつかのセキュリティ情報、すなわちアクセス・トークンの発行を担当する信頼できる認証サービスのために働きます。一度要求されると、アクセストークンはサービス・リクエストに添付され、サーバは、アクセス(認証)を要求するユーザの有効性と、このユーザに対するリソース自体の可用性(認可)を認可サービスに尋ねることができます。

OAuth2 の詳細なアーキテクチャはここにありますが、簡単に言えば、FIWARE は Identity Manager GE(Keyrock の実装)とアクセス制御(AuthZForce の実装)によって上記の概念を実装しています。この2つのイネーブラーの結合は、FIWARE の OAuth2ベースの認可サービスに準拠しています :

  • アクセストークンは Identity Manager に要求されます。Identity Manager は、トークンが受信されると、認証のために最終サービスからリクエストされます。このサービスに依頼して、リクエストの背後にある本当のFIWARE ユーザが誰であるかを発見するだけでなく、そのユーザが自分が誰であるかを完全に確信していることを確認してください
  • 同時に、Identity Manager は認証の目的でアクセス制御に依存します。アクセストークンは、ユーザの実際の身元に加えて、要求されたリソースに応じた役割を与えます。アクセス制御は、ユーザの役割に基づいてすべてのリソースにアクセスできるユーザに関するポリシーのリストを所有しています。

Cygnus にとって重要なのは、HDFS (ビッグ)データにネイティブの WebHDFS RESTful API を介してアクセスできるためです。そして、それは上記のメカニズムで保護されるかもしれません。その場合、単にアクセストークンを要求し、cygnus-twitter.sinks.hdfs-sink.oauth2_token パラメータを使用して設定に追加します。

アクセストークンを取得するには、OAuth2 トークンプロバイダに対して次の要求を行います。FIWARE Lab では、これは cosmos.lab.fi-ware.org:13000 です :

$ curl -X POST "http://cosmos.lab.fi-ware.org:13000/cosmos-auth/v1/token" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=password&username=frb@tid.es&password=xxxxxxxx”
{"access_token": "qjHPUcnW6leYAqr3Xw34DWLQlja0Ix", "token_type": "Bearer", "expires_in": 3600, "refresh_token": “V2Wlk7aFCnElKlW9BOmRzGhBtqgR2z"}

お分かりのように、FIWARE Lab の資格情報は、パスワードベースの認可タイプの形式でペイロードに必要です。これはあなたがそれらを与える必要がある唯一の時間になります。

トップ

Kerberos 認証

Hadoop 分散ファイルシステム (HDFS)は、WebHDFS というREST API を使用してリモート管理できます。この API は、セキュリティなしで使用できます。このユーザの HDFS スペースにアクセスするには有効な HDFS ユーザ名を知っていれば十分です。または、Kerberos インフラストラクチャを使用してユーザを認証できます。

Kerberos は MIT によって作成された認証プロトコルであり、現在のバージョンは5です。これは対称鍵暗号方式と信頼できるサードパーティの Kerberos サーバ自体に基づいています。このプロトコルは、認証サーバ(AS)を認証するのと同じくらい簡単です。これは、最終的な client-to-server チケットを取得するために使用できるチケット許可チケット(TGT)を使用して、ユーザーを Key Distribution Center(KDC)に転送します。このチケットは、双方向でサービスサーバーに対して認証目的で使用できます。

SPNEGO は、セキュリティ技術の選択を交渉するために使用されるメカニズムです。SPNEGO により、クライアントとサーバの両方が認証技術として Kerberos の使用をネゴシエートできます。

Kerberos 5 クライアントがインストールされ、ユーザーが Kerberos インフラストラクチャのプリンシパルとして既に存在する場合、コマンドラインから HDFS の Kerberos 認証を簡単に実行できます。次に、有効なチケットを取得し、curl--negotiate オプションを使用します :

$ kinit <USER>
Password for <USER>@<REALM>:
$ curl -i --negotiate -u:<USER> "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=..."

それにもかかわらず、Cygnus はこのプロセスを自動化する必要があります。設定の仕方を見てみましょう。

トップ

conf/cygnus.conf

このファイルは、配布された conf/cugnus.conf.template から構築できます。NGSIHDFSSink 設定のこの部分を適切に編集します :

# Kerberos-based authentication enabling
cygnus-twitter.sinks.hdfs-sink.krb5_auth = true
# Kerberos username
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_user = krb5_username
# Kerberos password
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_password = xxxxxxxxxxxxx
# Kerberos login file
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_login_file = /usr/cygnus/conf/krb5_login.conf
# Kerberos configuration file
cygnus-twitter.sinks.hdfs-sink.krb5_auth.krb5_conf_file = /usr/cygnus/conf/krb5.conf

つまり、Kerberos 認証を有効にするかどうかを開始します。次に、すでに登録されている Kerberos プリンシパルとそのパスワードを使用してユーザーを構成します。最後に、2つの特別な Kerberos ファイルの場所を指定します。

トップ

conf/krb5_login.conf

変更してはならない次の行が含まれています。したがって、配布されたファイルはテンプレートではなく、最終的なものです。

cygnus_krb5_login {
    com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=false debug=true useTicketCache=false;
};

トップ

conf/krb5.conf

このファイルは、配布された conf/krb5.conf.template から構築できます。EXAMPLE.COM を Kerberos 領域(これはあなたのドメインと同じですが、大文字で、example.com の領域は EXAMPLE.COMです)で置き換え、Kerberos Key Distribution Center(KDC) と Kerberos 管理/認証サーバを構成することで、適切に編集します。それらを知るためにネットワーク管理者に尋ねてください。

[libdefaults]
 default_realm = EXAMPLE.COM
 dns_lookup_realm = false
 dns_lookup_kdc = false
 ticket_lifetime = 24h
 renew_lifetime = 7d
 forwardable = true

[realms]
 EXAMPLE.COM = {
  kdc = kdc.example.com
  admin_server = admin_server.example.com
 }

[domain_realms]
 .example.com = EXAMPLE.COM
 example.com = EXAMPLE.COM

トップ