Apache NiFi 各種 Processor ざっくりまとめのまとめ

この記事は?

Qrunch と呼ばれるサービスにて書き溜めた NiFi Processor に関するメモを一度まとめようということで作成しました。
Qrunch に関しては以下を参照のこと。
もっと気軽にアウトプットできる技術ブログサービス「Qrunch(クランチ)」をリリースした【個人開発】
最近アップデートがされよりブログ色が濃くなりました。
まだ β ということで未完成の機能も多く存在しますが、扱いやすいと個人的には感じています。
特にログ機能はアウトプットを行うハードルを下げてくれる秀逸な機能ですので初学者ほどおすすめします。

注意事項

リンク先について

先述しましたが Qrunch のログ機能に書き溜めたものをまとめようということでこの記事を作成しました。
そのためほとんどは Qrunch へのリンクとなっていますので留意の上でお願いします。

各種 Processor について

元々記事として投稿するために書いたものではないので、読み手側に優しくない書き方になっています。
少し読みづらさはあるかと思いますが、こちらも留意の上でお願いします。

内容について

内容については公式ドキュメントを元に記載してはいますが、そのまま書いても意味がないのと、私自身ドキュメントを Chrome 先生に翻訳したものを読み込んでいることから、実際の意図とは違っている可能性があります。

私の見解やなんとなくこういうものに近いか、みたいな感じで独自の視点や偏見が混じっているので、どちらかというと NiFi を急に任されて何をどう実装したらいいかわからないという人向けの内容となっているので、同様に留意の上で(ry

疑問点や不足点に関して

疑問点、修正点に関してはこの記事にコメント頂いてもいいですし、Qrunch のほうでも、勿論 Twitter などでも構いません。じゃんじゃんご意見いただければと思います。

NiFi に関する知識について

ざっくり Qiita の記事合わせいくつかリンクを用意しました。よろしければ参考にしていただければと思います。

データフローオーケストレーションツールApache NiFiとは?

以前別記事でも紹介しましたが NiFi とはなんなのか大体説明してくれている記事です。
NiFi についてはこちらで見ていただければなんとなく把握できるかと。

Apache NiFiのそもそものコンセプトは?

さらに掘り下げたのがこちらの記事。
先程の記事と同じ方が著者です。

NiFi 情報まとめ

現在進行系で NiFi に関してアウトプットを続けている方のまとめ記事です。
私よりよっぽど実践向けの学習を行っている方なので、NiFi に慣れた方はこちらの方を追ってみるのも良いと思います。

NiFi 用語まとめ

こちらは Qrunch へのリンクとなっています私の記事です。
記載していますが、各種 NiFi 用語を私の独自の視点と偏見で簡単に説明しています。
そのため認識が違うことなどあるかと思いますが、こういうツールに慣れていない人や(というか初期の私)初めての人に向けてのものでもあるのである程度多目に見ていただければと…。
勿論コメントで補足等は大歓迎でございます。
こちらの内容を元に本記事やリンク先でも紹介を行っていますので、何にせよ一度目を通していただいたほうが良いかと思います。


それではお待たせしました。
Processor の詳細に合わせ私の独断と偏見で簡単にジャンル分けしましたので、参考にしていただければと思います。

NiFi Processor

通信系

HandleHttpRequest

簡単に言えばリクエストを受け付ける入り口ですね。
これを置いて設定すれば、ローカルで初期値のまま NiFi をインストールしたなら http://localhost:8080/~ に curl など飛ばせばもうリクエストを受けることができます。

HandleHttpResponse

先程のがリクエストを受け付ける入り口ならこちらはレスポンスを返す出口です。
同様にリクエストからこのプロセッサに繋げればレスポンスが帰ってくることを確認できるかと思います。

ログ未記載の Processor から一部抜粋

InvokeHTTP

NiFi 内部で API などを叩きたい時はこれを使います。
メソッド、パスの指定は勿論、プロキシの設定など色々機能がてんこ盛り。
使用経験もあるしなんだったら現場で使用していますが、設定を行っていない項目が多い上に、不明な部分が多すぎるのでログへの記載を断念しています。もう少し知識がついたら一通り見直してみようと思います。

InvokeScriptedProcessor

使用経験はないのですが公式ドキュメントを見ていたら気になったので。
見る感じ script で外部連携を行えるって感じみたいですね。
前述の InvokeHTTP で十分だとは思いますが、script だからこそできることがあるんですかね?
あとはややこしい処理をさせてから連携を行うとか…それこそ Processor にやらせるべきだとは思いますが、正直全ての Processor を把握するのも中々厳しいものがあるので、最終手段としてこういったものがあるのは良いことなんでしょうね。

ExecuteSQL

これは通信系と言っていいのか…? となりましたがまあとりあえず。
名前通り SQL を用いて DB 操作を行うプロセッサです。
こちらも使用しているのですが Controller Service という機能に関して知識が薄く、また SQL をどこに書くなどはわかっているものの、他の設定は上司が行う等把握していない部分が多いためログの作成は行っていません。
使用されることの多いプロセッサだと思うので近い内にしっかり確認したいですね。

ログ系

LogMessage

ログを出力してくれるプロセッサです。
ただこのプロセッサは簡易的と言うか、名前通りあるメッセージを出力するのみの機能しか持っていません。
NiFi 開始時などは後述の LogAttribute を使用すると良さげ…なのかなぁ…(経験が短いのでログ出力の普通だとかがまだよくわからない…)

ログ未記載の Processor から一部抜粋

LogAttribute

ログ書いてるかと思ったら書いていませんでした…時間のあるときにまとめます。
こちらは前述の LogMessage と違い、Attribute なども出力を行ってくれたりと、詳細な情報も出力してくれます。そのため NiFi の始めや終わりなどはこちらのが良い?

分岐系

RouteOnAttribute

Attribute 用いた条件を記載し、複数の分岐を作れるプロセッサです。
If 文で複数分岐を作ったり、AND,OR 条件で一致不一致を判別したりと割となんでもござれ。
基本的に分岐はこれで問題ないです。

RouteOnContent

前述の RouteOnAttribute と似たような名前ですが、使い方はほぼそのままです。
ただしこちらは対象が Attribute ではなく Content です。
そのため If 文はなく、AND,OR 条件のみとなっています。

ValidateXml

Content に格納されている xml を XML Schema という手法を用いてバリデーションを行うプロセッサです。
ただ後述する EvaluateXPath というプロセッサがあるのですが、取得を行う機能に加え、DTD で検証を行う機能を持ち合わせているため、手間を省くという意味ではあちらのほうが使われる…のかなぁ(どちらも該当機能を使ったことがない)

取得系

EvaluateJsonPath

Content に格納されている Json 形式のデータから各種値を JsonPath を用いて取得を行うプロセッサです。

EvaluateXPath

EvaluateJsonPath と同様かつ、対象が xml になり、こちらは XPath を用いて値の取得を行うプロセッサです。
ValidateXml で前述しましたが取得する前に DTD を検証するかなどもすることができます。

変換系

AttributeToJson

これは名前通りですね… Attribute から Json データを生成するプロセッサです。割と使う。

JoltTransformJSON

Jolt と呼ばれる Java のライブラリを用いて Json 形式のデータを元に新たに Json 形式のデータを生み出すプロセッサです。これが中々面白い。
階層のない Json を階層があるように変換したり、デフォルトの初期値を入れたりと、複雑な操作が難しい NiFi の救世主ですね。先程の AttributeToJson もそのまま Json データを作るだけで階層は作れないっぽいので。

ログ未記載の Processor から一部抜粋

ConvertAvroToJSON

Avro 形式から Json 形式に変換を行うプロセッサです。
前述の ExecuteSQL から扱える値にするために変換を行うときなどに使われます。
これも使用してはいるのですが Avro Schema というのがググってもよくわからず…断念しています。

ReplaceText

Content の中身を書き換えるプロセッサです(変換系…?)
正規表現に当てはまるものを指定した値で書き換えます。
使用経験ありですが、全てを丸々書き換えるような処理しかしていないため…。
というか書き換えるどころか Content を空にするのに使っているので理解できているとは言えず…
まあでも内容は理解できそうなので落ち着いたら書こうかと思ってます。

Script 系

ExecuteGroovyScript

前述した InvokeScriptedProcessor と似たもので、処理を script などで行いたいときに使用されるプロセッサです。
名称から分かる通り、このプロセッサに関しては groovy 限定のプロセッサとなっています。

ログ未記載の Processor から一部抜粋

ExecuteScript

前述の ExecuteGroovyScript と役割は同じです。処理を script で処理したいときに使用されるプロセッサです。
あちらと違うのはこちらは groovy 限定でないこと。rubypython などが用意されています。
何故 groovy だけ別でプロセッサが用意されているのかは謎です。
java と親和性があるから…とか? でもこのプロセッサ groovy も使えるんですよね…あとから追加されたプロセッサだからとかでしょうか…。

NiFi 系

UpdateAttribute

簡単に言えば Attribute を新たに生成するプロセッサです。
なんだかんだ多用することになるプロセッサですが、実は値の保持を行える機能を持っていたりと侮れないプロセッサです。

以上です!

勿論前述の通り質問やご意見はしっかり受けとめ、お答えできる範囲で回答しますのでじゃんじゃんご連絡いただければと思います。

それでは!