オープンソース

一覧へ

13. パイプライン処理によるログの加工方法(Processing pipelines)

Graylogには、streamを流れるメッセージを、extractorよりも柔軟に操作する仕組みが備わっています。それがpipelineです。

extractorでは、Graylogのインタフェースを使って、ログの分解などの設定を行うことができます。一方でpipelineには、そのようなインタフェースはありません。代わりに、Droolsと呼ばれるルールエンジンのルールを記述して、ログを操作します。

つまり、pipelineとは、プログラムを書いてログを操作するための機能です。 例えば以下のようなことができます。

  • 特定の値を持つログは不要とみなして破棄する
  • フィールドの値を結合・整形して新しいフィールドを作成する
  • 不要なフィールドを削除する

13.1. pipelineの概要

パイプラインは、メッセージに適用されるルールのまとまりのことを指す言葉です。

イメージは次の通りです。

_images/pipe.png

パイプラインは、1つ以上のstreamに接続できるため、各streamを流れるログをパイプラインに通して、ルールを適用することができます。

ルールの構造を単純化すると、以下のように条件とアクションがあるだけです。:

rule "function howto"
when
  // 条件
  // secret_idというフィールドが存在した場合
  has_field("secret_id")
then
  // アクション
  // secret_idフィールドを削除する
  remove_field("secret_id");
end

またルールはステージというまとまりで動きます。 ステージを移行する条件は2津あります。

  • All rules on this stage match the message

    • すべてのルールの条件がマッチした場合のみ、次のステージに進む
  • At least one of the rules on this stage matches the message

    • どれかひとつのルールにマッチした場合、次のステージに進む

上記の図では、便宜的にステージ1、ステージ2という番号を振っていますが、Graylogでは、ステージの番号は、優先順位を意味します。より若い番号が優先順位が高いことを意味します。

13.2. 設定方法

13.2.1. pipelineの作成

pipelineの設定は、メニューの Pipelines リンクから移動した画面で行います。

_images/pipe1.png

新しいpipelineを作るためには、 Add new pipe line をクリックします。

_images/pipe2.png

パイプラインの title (タイトル) と Description (説明)を入力して新しいpipelineを作成します。

13.2.2. ルールの作成

pipelineの設定は、メニューの Manage rules リンクから移動した画面で行います。

_images/pipe3.png

Create Rule ボタンをクリックするとルールの作成画面に移動します。

_images/pipe4.png

ルールの作成画面では、画面の左側が入力フォーム、右側が利用できる関数のリファレンスとルールの例になっています。

Description にはルールの説明を入力します。 Rule source については、以下で詳しく解説します。

13.2.3. ルールについて

ルールは、パイプライン処理の基礎です。ルールを使うことで、ログメッセージの変更や値の追加・削除、streamの切り替えを行うことができます。

ルールの中で行う、条件の一致や値の操作などは関数で行われます。Graylogには、文字列変換やJSONや日付形式の解析、正規表現やGrokパターンによる値の比較を行うための関数が付属しています。

さらに関数はプラグインで追加可能です。

13.2.4. ルールの定義

ルールの定義方法を次の例を使って解説します。 ルール1

rule "has firewall fields"
when
    has_field("src_ip") && has_field("dst_ip")
then
end

ルール2

rule "from firewall subnet"
when
    cidr_match("10.10.10.0/24", to_ip($message.gl2_remote_ip))
then
end

ルール3

rule "drop_message"
when
    true
then
    drop_message();
end

まず、 ルール1を見ていきます。 rule の行で、ルールの名前を定義しています。この名前はルール全体で一意である必要があります。また次の when-then の間では、ルールを適用する条件を定義しています。この条件は、 has_field 関数を使い、src_ipフィールドとdst_ipフィールドが存在するか検査しています。

続く then-end には何も記載がありませんが、本来であれば、この間には条件にマッチしたログに対するアクションが記載できます。条件のみを定義した場合、そのルールは、パイプラインの次のステージに進むかどうかの検査を行う役割になります。このように条件を細分化しておくことで、後々同じようなルールのときに再利用できるメリットがあります。

ルール2もルール1と基本的な構造は変わりません。 ここでの重要な要素は以下のとおりです。

  • $message 変数を利用してフィールドの値を参照している
    • $message.gl2_remote_ip はgl2_remote_ipフィールドを参照する意味となる
  • to_ip 関数を使い文字列のIPアドレスをネットワークバイトオーダに変換している
  • cidr_match 関数を行い、上記で変換したIPアドレスのデータが10.10.10.0/24の中であるか検査している

ルール3は、アクションだけを実行するルールです。 when-then の間で true を記載することで、必ず条件に一致してアクションが実行されるようになります。アクションで行っていることは、 drop_message 関数を使い、メッセージを破棄しています。

13.2.5. ルールとステージの紐付け

上記で解説したルールを作成すると次のように一覧が表示されます。

_images/pipe5.png

この画面から Manage pipelines ボタンでパイプラインの管理画面に戻ります。 次に Add new stage のボタンをクリックして、新しいステージを追加します。

_images/pipe6.png

入力項目は次の通りです。

  • Stage : ステージの優先順位を入力します。

  • All rules on this stage match the message

    • すべてのルールの条件がマッチした場合のみ、次のステージに進む
  • At least one of the rules on this stage matches the message

    • どれかひとつのルールにマッチした場合、次のステージに進む
  • Stage rules : 作成したルールを追加します。複数のルールを追加できます。

作成したルールをそれぞれステージ別に追加すると次のようになります。

_images/pipe7.png

このpipelineの動きは以下の通りです。

  • ステージ0で、src_ipなどのフィールドの存在検査をする
    • 存在する場合だけステージ1に進む
  • ステージ2で、サブネットの範囲内であるかを検査する
    • 範囲内の場合だけステージ2に進む
  • ステージ3で、メッセージを破棄する

つまり、ルールの組み合わせで値の検査を行い、条件にマッチしたログだけを破棄するというブラックリストの設定が行われました。

ただし、これだけではまだpipelineは動作しません。 pipelineとstreamの紐付けを行う必要があります。

13.2.6. pipelineとstreamの紐付け

_images/pipe7.png

Edit connections をクリックしてstreamを選択します。

_images/pipe8.png

この設定で、pipelineの処理が開始されます。

13.3. シミュレーター

pipelineの動作を確認するために、Simulator を使うことができます。 次は、上記までで設定したpipelineにマッチするデータをGELFの形式で送信するシミュレーションを行った例です。

_images/pipe9.png

結果は以下のように表示されます。

_images/pipe10.png

13.4. ルールの詳細

13.4.1. データ型

関数に値を渡す場合には、関数の引数に適合する型のデータを渡す必要があります。 Graylogのルール言語のパーサは、無効な型の使用を拒否します Graylogのルール言語の組み込み型は以下の通りです。

  • str : utf-8文字列
  • double : 浮動小数点数(Javaのdoubleと同等)
  • long : 数値型(JavaのLongと同等)
  • boolean : ブール値
  • void : 返り値なしを示す型
  • ip : IPアドレス

型はプラグインにより追加可能です。 また慣例的に型を変換する関数のプレフィックスは to_ となっています。

13.4.2. 条件

Graylogの規則では、 when-then の間は、ブール式で評価されます。 式は AND (または && )、 OR または || )、 < , <= , >= , > , == , != をサポートしています。

when-then の間で、関数を使うこともできますが、最終的には、ブール値を返す必要があります。条件に必ず一致させたい場合は、ブールリテラル true を使うことができます。

13.4.3. アクション

then-end の間には、アクションを定義します。アクションには2種類あります。

  • 関数呼び出し
  • 変数の割当

変数の代入は次のような形式になります。:

let name = value;

変数は、データを保持して、再計算するために役立ちます。またルールの可読性を上げるためにも役立ちます。

13.4.4. 関数

関数には、それぞれ必要なパラメータと戻り値が定義されています。 関数のパラメータには、必須なパラメータと、必須ではないオプショナルなパラメータが存在しています。

関数のパラメータをすべて渡す必要がない場合、次のようにパラメータの名前を指定して渡すことができます。:

let new_date = parse_date(
                    value: to_str($message.transaction_date),
                    pattern: "yyyy-MM-dd HH:mm:ss",
                    timezone: to_str($message.transaction_timezone)

13.4.5. 関数一覧

Built-in Functions
関数名 説明
debug 渡された値を文字列としてGraylogログに出力します。
to_bool 文字列値を使用して、単一のパラメータをブール値に変換します。
to_double 最初のパラメーターをdouble浮動小数点値に変換します。
to_long 最初のパラメータをlong整数値に変換します。
to_str 最初のパラメータを文字列表現に変換します。
to_url 文字列表現を使用して値を有効なURLに変換します。
is_null 値がnullかどうかをチェックします。
is_not_null 値が nullでないかどうかをチェックします。
abbreviate ellipsesを使用して文字列を省略します。
capitalize 最初の文字を大文字にします。
uncapitalize 最初の文字を小文字にします。
uppercase 大文字に変換します。
lowercase 小文字に変換します。
swapcase 大文字と小文字を入れ替えます。
contains 文字列に別の文字列が含まれているか検査します。
substr 指定された開始オフセットと終了オフセットでvalue部分文字列を返します。
concat 2つの文字列を連結します。
split パターンと一致する文字列を分割します(Java構文)。
regex 正規表現を使って文字列の検査を多ないます。
grok 文字列にGrokパターンを適用します。
key_value 文字列からキー/値のペアを抽出します。
crc32 指定された文字列の16進数で符号化されたCRC32ダイジェストを返します。
crc32c 指定された文字列の16進数で符号化されたCRC32C(RFC 3720、Section 12.1)ダイジェストを返します。
md5 指定された文字列の16進数でエンコードされたMD5ダイジェストを返します。
murmur3_32 指定された文字列の16進数で符号化されたMurmurHash3(32ビット)ダイジェストを返します。
murmur3_128 指定された文字列の16進数で符号化されたMurmurHash3(128ビット)ダイジェストを返します。
sha1 指定された文字列の16進数で符号化されたSHA1ダイジェストを返します。
sha256 指定された文字列の16進数で符号化されたSHA256ダイジェストを返します。
sha512 指定された文字列の16進数で符号化されたSHA512ダイジェストを返します。
parse_json JSONを解析します。
select_jsonpath JSONツリーから1つ以上の名前付きJSON Path式を選択します。
to_ip 指定された文字列をIPオブジェクトに変換します。
cidr_match 指定されたIPがCIDRパターンと一致するかどうかをチェックします。
from_input 現在のメッセージが指定されたinputで受信されたかどうかをチェックします。
route_to_stream 現在のメッセージを指定されたstreamに割り当てます。
remove_from_stream 指定されたstreamから現在のメッセージを削除します。
create_message ※現在不完全 処理パイプライン全体で評価される新しいメッセージを作成します。
clone_message メッセージをクローンします。
drop_message ルールの終了後に処理パイプラインから削除
has_field メッセージに名前付きフィールドが含まれているかどうか検査します。
remove_field メッセージから名前付きフィールドを削除します。
set_field 新しいフィールドをセットします。。
set_fields 新しいフィールドを複数セットします。。
rename_field フィールド名を変更します。
syslog_facility syslogファシリティ番号を文字列表現に変換します。
syslog_level syslogレベル番号を文字列表現に変換します。
expand_syslog_priority syslog優先順位番号をそのレベルと機能に変換します。
expand_syslog_priority_as_str syslogプライオリティ番号をレベル文字列とファシリティ文字列表現に変換します。
now 現在の日付と時刻を返します。
parse_date 指定された文字列から日時を解析します。
flex_parse_date Natty日付パーサーを使用して日付と時刻を解析しようとします。
format_date 指定されたフォーマッタパターンに従って日付と時刻をフォーマットします。
to_date date型に変換します。
years 指定した年数でピリオドを作成します。
months 指定した月数の期間を作成します。
weeks 指定した週数で期間を作成します。
days 指定した日数の期間を作成します。
hours 指定した時間数でピリオドを作成します。
minutes 指定した分数でピリオドを作成します。
seconds 指定した秒数でピリオドを作成します。
millis 指定したミリ秒数でピリオドを作成します。
period 指定された文字列からISO 8601の期間を解析します。
lookup 名前付きルックアップテーブルで複数の値を検索します。
lookup_value 名前付きルックアップテーブルで単一の値を検索します。

13.4.5.1. debug

debug(value: any)

渡された値を文字列としてGraylogログに出力します。 デバッグメッセージは、デバッグしようとしているメッセージを処理しているGraylogノードのログにのみ表示されます。

例:

// Print: "INFO : org.graylog.plugins.pipelineprocessor.ast.functions.Function - PIPELINE DEBUG: Dropped message from <source>"
let debug_message = concat("Dropped message from ", to_str($message.source));
debug(debug_message);

13.4.5.2. to_bool

to_bool(value: any)

文字列値を使用して、単一のパラメータをブール値に変換します。

13.4.5.3. to_double

to_double(value: any, [default: double])

最初のパラメーターをdouble浮動小数点値に変換します。

13.4.5.4. to_long

to_long(value: any, [default: long])

最初のパラメータをlong整数値に変換します。

13.4.5.5. to_str

to_str(value: any, [default: str])

最初のパラメータを文字列表現に変換します。

13.4.5.6. to_url

to_url(url: any, [default: str])

文字列表現を使用して値を有効なURLに変換します。

13.4.5.7. is_null

is_null(value: any)

値がnullかどうかをチェックします。

13.4.5.8. is_not_null

is_not_null(value: any)

値が nullでないかどうかをチェックします。

13.4.5.9. abbreviate

abbreviate(value: str, width: long)

ellipsesを使用して文字列を省略します。

13.4.5.10. capitalize

capitalize(value: str)

最初の文字を大文字にします。

13.4.5.11. uncapitalize

uncapitalize(value: str)

最初の文字を小文字にします。

13.4.5.12. uppercase

uppercase(value: str, [locale: str])

大文字に変換します。 localeのデフォルトはenです。

13.4.5.13. lowercase

lowercase(value: str, [locale: str])

小文字に変換します。 localeのデフォルトはenです。

13.4.5.14. swapcase

swapcase(value: str)

大文字と小文字を入れ替えます。

13.4.5.15. contains

contains(value: str, search: str, [ignore_case: boolean])

valueにsearchが含まれているかどうかをチェックし、オプションで検索パターンの大文字と小文字を無視します。

13.4.5.16. substr

substr(value: str, start: long, [end: long])

startオフセット(ゼロベースのインデックス)から始まるvalue部分文字列を返します。オプションでendオフセットを指定できます。

13.4.5.17. concat

concat(first: str, second: str)

firstとsecondテキストを結合した新しい文字列を返します。

13.4.5.18. split

split(pattern: str, value: str, [limit: int])

pattern文字列で、strを分割します。limitオプションを指定することで、分割する回数を指定できます。 パターンは有効な Java文字列リテラル でなければなりません。正規表現でバックスラッシュをエスケープしてください。

13.4.5.19. regex

regex(pattern: str, value: str, [group_names: array[str])

patternの正規表現とvalueを一致するか検査します。 正規表現でグルーピングしている場合、groupe_namesオプションの配列を使用してグループ名をつけられます。 名前がつけられていない場合、グループ名を0で始まる文字列です。

13.4.5.20. grok

grok(pattern: str, value: str, [only_named_captures: boolean])

Grokパターンpatternをvalueに適用します。 only_named_capturesだけをtrueに設定すると、名前付きキャプチャを使用して一致を返すことができます。

13.4.5.21. key_value

key_value(
  value: str,
  [delimiters: str],
  [kv_delimiters: str],
  [ignore_empty_values: boolean],
  [allow_dup_keys: boolean],
  [handle_dup_keys: str],
  [trim_key_chars: str],
  [trim_value_chars: str]
)

指定されたvalueからキーと値の北亜を抽出し、フィールド名と値のマップとして返します。オプションで以下を指定することができます。
delimiters
ペアの区切り文字。デフォルト値: スペース
kv_delimiters
キーと値を区切るために使用される文字。デフォルト値: =
ignore_empty_values
空の値を許可するかどうか。デフォルト値: true
allow_dup_keys
重複キーを許可するかどうか。デフォルト値: true
handle_dup_keys
take_firstを指定すると重複したキーの最初を採用します。 take_lastを指定すると重複したキーの最後を採用します。 ","などを指定すると、キーの値が,で結合されます。 デフォルト値: take_first
trim_key_chars
キーの文字をトリムする
trim_value_chars
値の文字をトリムする

Tip: key_value関数の実行結果をkey_value引数として渡して、抽出されたフィールドをメッセージに設定することができます。

13.4.5.22. crc32

crc32(value: str)

CRC32ダイジェストを作成します。

13.4.5.23. crc32c

crc32c(value: str)

CRC32C(RFC 3720、セクション12.1)ダイジェストを作成します。

13.4.5.24. md5

md5(value: str)

MD5ダイジェストを作成します。

13.4.5.25. murmur3_32

murmur3_32(value: str)

MurmurHash3(32ビット)ダイジェストを作成します。

13.4.5.26. murmur3_128

murmur3_128(value: str)

MurmurHash3(128ビット)ダイジェストを作成します。

13.4.5.27. sha1

sha1(value: str)

SHA1ダイジェストを作成します。

13.4.5.28. sha256

sha256(value: str)

SHA256ダイジェストを作成します。

13.4.5.29. sha512

sha512(value: str)

SHA512ダイジェストを作成します。

13.4.5.30. parse_json

parse_json(value: str)

JSON文字列をJSONツリーオブジェクトに変換します。

13.4.5.31. select_jsonpath

select_jsonpath(json: JsonNode, paths: Map<str, str>)

指定されたpathsをjsonツリーに対して評価し、結果の値のマップを返します。

13.4.5.32. to_ip

to_ip(ip: str)

指定されたip文字列をIpAddressオブジェクトに変換します。

13.4.5.33. cidr_match

cidr_match(cidr: str, ip: IpAddress)

指定されたipアドレスオブジェクトがcidrパターンと一致するかどうかを確認します。

13.4.5.34. from_input

from_input(id: str | name: str)

現在処理されているメッセージが指定されたinpuで受信されたかどうかを確認します。 inputは、nameまたはidを指定することによって調べることができます。

13.4.5.35. route_to_stream

route_to_stream(id: str | name: str, [message: str], [remove_from_default: boolean])

messageを指定されたstreamにルーティングします 。 streamは、nameまたはidで指定できます。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

remove_from_defaultがtrue場合、メッセージはデフォルトstream "All messages"からも削除されます。

13.4.5.36. remove_from_stream

remove_from_stream(id: str | name: str, [message: str])

messageを指定されたstreamから削除します 。 streamは、nameまたはidで指定できます。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

削除した結果、メッセージがすべてのstreamから外れた場合、デフォルトstream "All messages"に戻されます。 明示的に完全な削除をしたい場合、drop_messages関数を使用します。

13.4.5.37. create_message

create_message([message: str], [source: str], [timestamp: DateTime])

指定されたパラメータから新しいメッセージを作成します。 いずれかが省略された場合、その値は現在処理中のメッセージの対応するフィールドから取得されます。 timestampが省略された場合、作成されたメッセージのタイムスタンプはその時点のタイムスタンプになります。

13.4.5.38. clone_message

clone_message([message: str])

メッセージをクローンします。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.39. drop_message

drop_message(message: str)

処理パイプラインは、ルールの実行が終了した後に、指定されたmessageを削除します。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.40. has_field

has_field(field: str, [message: str])

指定されたmessageにfieldが含まれているかどうかを確認します。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.41. remove_field

remove_field(field: str, [message: str])

指定されたmessageから指定されたfieldを削除します。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.42. set_field

set_field(field: str, value: any, [prefix: str], [suffix: str], [message: str])

指定されたfieldにvalueを設定します。fieldは有効な文字列でなければならず、 .を含むことはできません。 先頭と末尾の空白は削除されます。

オプションのprefixおよびsuffixパラメータは、挿入されたフィールド名に追加する接頭辞または接尾辞を指定します。 messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.43. set_fields

set_fields(fields: Map<str, any>, [prefix: str], [suffix: str], [message: str])

指定されたメッセージのfieldに、指定された名前と値のペアをすべて設定します。 これは、 set_fieldのように機能する便利な関数です 。

オプションのprefixおよびsuffixパラメータは、挿入されたフィールド名に追加する接頭辞または接尾辞を指定します。

messageが省略された場合、この関数は現在処理中のメッセージを使用します。

13.4.5.44. rename_field

rename_field(old_field: str, new_field: str, [message: str])

指定されたメッセージのフィールド名old_fieldをnew_fieldに変更します。 値は変更されません。

13.4.5.45. syslog_facility

syslog_facility(value: any)

syslogファシリティ番号valueを文字列表現に変換します。

13.4.5.46. syslog_level

syslog_level(value: any)

syslogレベルvalueを文字列表現に変換します。

13.4.5.47. expand_syslog_priority

expand_syslog_priority(value: any)

syslog優先順位番号を数値重大度とファシリティ値に変換します。

13.4.5.48. expand_syslog_priority_as_str

expand_syslog_priority_as_str(value: any)

syslog優先度番号を重大度およびファシリティ文字列表現に変換します。

13.4.5.49. now

now([timezone: str])

現在の日付と時刻を返します。 デフォルトのタイムゾーンUTC使用します。

13.4.5.50. parse_date

parse_date(value: str, pattern: str, [locale: str], [timezone: str])

patternを使用して、 valueを日付と時刻のオブジェクトに変換します 。 パターンでタイムゾーンが検出されない場合は、オプションのtimezoneパラメータが想定されるタイムゾーンとして使用されます。 省略すると、タイムゾーンのデフォルトはUTCになります。

記号 意味 形式
G era text AD
C century of era (>=0) number 20
Y year of era (>=0) year 1996
x weekyear year 1996
w week of weekyear number 27
e day of week number 2
E day of week text Tuesday; Tue
y year year 1996
D day of year number 189
M month of year month July; Jul; 07
d day of month number 10
a halfday of day text PM
K hour of halfday (0~11) number 0
h clockhour of halfday (1~12) number 12
H hour of day (0~23) number 0
k clockhour of day (1~24) number 24
m minute of hour number 30
s second of minute number 55
S fraction of second millis 978
z time zone text Pacific Standard Time; PST
Z time zone offset/id zone -0800; -08:00; America/Los_Angeles
' escape for text delimiter  
'' single quote literal '

13.4.5.51. flex_parse_date

flex_parse_date(value: str, [default: DateTime], [timezone: str])

Natty日付パーサを使用して日付と時刻のvalueを解析します。 パターンでタイムゾーンが検出されない場合は、オプションのtimezoneパラメータが想定されるタイムゾーンとして使用されます。 省略すると、タイムゾーンのデフォルトはUTCになります。

13.4.5.52. format_date

format_date(value: DateTime, format: str, [timezone: str])

指定された日付と時刻のvalueフォーマット文字列に従ってフォーマットして返します。 タイムゾーンが指定されていない場合、デフォルトはUTCです。

13.4.5.53. to_date

to_date(value: any, [timezone: str])

valueを日付に変換します。 timezoneが指定されていない場合、デフォルトはUTCです。

13.4.5.54. years

years(value: long)

valueの年数でピリオドを作成します。

13.4.5.55. months

months(value: long)

valueの月数で月を作成します。

13.4.5.56. weeks

weeks(value: long)

valueの週数でピリオドを作成します。

13.4.5.57. days

days(value: long)

valueの日数でピリオドを作成します。

13.4.5.58. hours

hours(value: long)

valueの時間数でピリオドを作成します。

13.4.5.59. minutes

minutes(value: long)

valueの分数のピリオドを作成します。

13.4.5.60. seconds

seconds(value: long)

valueの秒数でピリオドを作成します。

13.4.5.61. millis

millis(value: long)

ミリ秒のvalue持つピリオドを作成します。

13.4.5.62. period

period(value: str)

valueからISO 8601で適宜された期間を解析します。

13.4.5.63. lookup

lookup(lookup_table: str, key: any, [default: any])

名前付きルックアップテーブルで複数の値を検索します。

13.4.5.64. lookup_value

lookup_value(lookup_table: str, key: any, [default: any])

名前付きルックアップテーブルで単一の値を検索します。

一覧へ