Splitter

概要

システムに流通するメッセージには複数の要素が含まれることがありますが、要素ごとに処理できるように、個々の要素に分割します。

例えば、メッセージがList<User>Userごとに処理を行いたい場合や、

A, 100, 東京都, ...       // 追加
D, 101, 神奈川県, ...     // 削除
U, 102, 埼玉県, ...       // 更新

のようなCSVで、先頭カラムの値によりそれぞれの処理を行いたい場合は、1行ずつ分割してから先頭のカラムの種別ごとに処理を行うケースも考えられます。


Splitterイメージ

基本

“User”クラスのリストであるList<User>を持つメッセージを分割し、各Userごとに処理する例を考えます。このときのSpring XMLで定義したルートは次の通りです。データがどのように流れるか分かるようにログを入れてあります。

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <!-- (1) 分割する前 -->
            <log message="(1) ${body}." />
            <split>
                <simple>${body}</simple>
                <!-- (2) 分割した各データ -->
                <log message="(2) ${body}." />
            </split>    
            <!-- (3) 分割終了後 -->
            <log message="(3) ${body}." />
            <to uri="mock:result"/>
        </route>
    </camelContext>
  

<simple>${body}</simple>のように${}を使用していますが、<simple>body</simple>のように省略することもできます。ただし、<simple>${body} == 'something'</simple>のように、オペレータなどを含む場合は省略できません。

このルートに対して、List形式のデータを送信します。

    public void testBodyIsList() throws Exception {  
        // 入力データはList
        List<User> users = new ArrayList<User>();
        users.add(new User("001", "User01", "Tokyo"));
        users.add(new User("002", "User02", "Chiba"));
        users.add(new User("003", "User03", "Kanagawa"));

        // 入力データ送信
        template.sendBody("direct:start", users);
    }

ログは次のように出力されます。

入力前(1)はList<User>ですが、Splitterにより分割後(2)は各User ごとにログが出力されていることが分かります。また、Splitterが終了する(3)と、分割前の入力データがメッセージに設定されていることがわかります。

(1) [User{id=001, name=User01, address=Tokyo}, User{id=002, name=User02, address=Chiba}, 
      User{id=003, name=User03, address=Kanagawa}].
(2) User{id=001, name=User01, address=Tokyo}.
(2) User{id=002, name=User02, address=Chiba}.
(2) User{id=003, name=User03, address=Kanagawa}.
(3) [User{id=001, name=User01, address=Tokyo}, User{id=002, name=User02, address=Chiba},
      User{id=003, name=User03, address=Kanagawa}].

Splitterは、

  1. Expressionを評価し、分割対象を取得
  2. 分割対象からIteratorを作成
  3. オリジナルのメッセージをコピー
  4. 作成したIteratorから分割したデータを作成し、分割後のサブメッセージのBodyに設定(無くなるまで繰り返す)
    1. サブメッセージの処理を実行
    2. サブメッセージをもとに、メッセージを集約する
  5. 集約したメッセージを返す

のような流れで処理を行います。

上記の例では、<simple>${body}</simple>で分割対象をメッセージのBodyとしています。Camelは、Bodyに設定されているList<User>からIteratorを作成します。

List以外にも、Collection, Iterator, Array, org.w3c.dom.NodeListも使用可能です。

Splitterは、分割されたサブメッセージを組み立てることで、後続の処理にメッセージを流通します。デフォルトでは、入力のメッセージをそのまま後続の処理に流しますが、strategyRef属性にAggregationStrategyインタフェースを実装したクラスを指定することで、カスタマイズすることができます。詳細はAggregatorを参照してください。

応用

Tokenize

CSVなどのテキストデータを1行ごとに分割するには、<simple>の代わりに、<tokenize>を使用しtoken属性に区切り文字\nを指定します。

        <route>
            <from uri="direct:tokenize"/>
            <!-- (1) 分割する前 -->
            <log message="(1) ${body}" />
            <split>
                <tokenize token="\n" />
                <!-- (2) 分割した各データ -->
                <log message="(2) ${body}" />
            </split>    
            <!-- (3) 分割終了後 -->
            <log message="(3) ${body}" />
            <to uri="mock:result"/>
        </route>

このSpringXMLに、CSVを3行流通すると、

    public void testTokenize() throws Exception {  
        // 入力データはCSV
        StringBuilder csv = new StringBuilder();
        csv.append("A, 001, User001,Tokyo").append("\n");
        csv.append("U, 002, User002,Chiba").append("\n");
        csv.append("D, 003, User003,Kanagawa");

        // 入力データ送信
        template.sendBody("direct:tokenize", csv.toString());
    }

以下のログのように、(1)で3行あったCSVが、(2)のように1行ずつ3回流通することがわかります。

[main] (1) A, 001, User001,Tokyo
U, 002, User002,Chiba
D, 003, User003,Kanagawa

[main] (2) A, 001, User001,Tokyo
[main] (2) U, 002, User002,Chiba
[main] (2) D, 003, User003,Kanagawa

[main] (3) A, 001, User001,Tokyo
U, 002, User002,Chiba
D, 003, User003,Kanagawa

Tokenize のグルーピング

上記では、CSVを改行コード単位に1行ずつに分割していましたが、<tokenize token=“\n” group=“10” />のようにgroup属性を指定すると、1行単位ではなく、分割して指定した行数単位でまとめることができます。

以下の例では、CSVを2行ずつに分割します。

        <route>
            <from uri="direct:tokenize_group"/>
            <!-- (1) 分割する前 -->
            <log message="(1) ${body}" />
            <split>
                <!-- 2個ずつまとめる -->
                <tokenize token="\n" group="2" />
                <!-- (2) 分割した各データ -->
                <log message="(2) ${body}" />
            </split>    
            <!-- (3) 分割終了後 -->
            <log message="(3) ${body}" />
            <to uri="mock:result"/>
        </route>

このSpringXMLに3行のCSVを流通すると、

[main] (1) A, 001, User001,Tokyo
U, 002, User002,Chiba
D, 003, User003,Kanagawa

[main] (2) A, 001, User001,Tokyo
U, 002, User002,Chiba
[main] (2) D, 003, User003,Kanagawa

[main] (3) A, 001, User001,Tokyo
U, 002, User002,Chiba
D, 003, User003,Kanagawa

上記のとおり、(1)で3行あったCSVが、1回目の(2)で2行、2回目の(2)で1行に分割されることがわかります。

ヘッダを分割

Tokenizeは、headerName属性にヘッダ名を指定することで、ヘッダの値を分割してBODYに設定することもできます。

ヘッダtypeをカンマで分割してみます。

        <!-- header.type を","でsplit -->
        <route>
            <from uri="direct:tokenize_header"/>
            <!-- (1) 分割する前 -->
            <log message="(1) body: ${body}" />
            <log message="(1) heeader.type: ${header.type}" />
            <split>
                <!-- headerName属性に、ヘッダ名を指定 -->
                <tokenize token="," headerName="type" />
                <!-- (2) 分割した各データ -->
                <log message="(2) body: ${body}" />
                <log message="(2) header.type: ${header.type}" />
            </split>    
            <!-- (3) 分割終了後 -->
            <log message="(3) body: ${body}" />
            <log message="(3) header.type: ${header.type}" />
            <to uri="mock:result"/>
        </route>

このSpringXMLに、ヘッダtypeが”type A,type B,type C”であるメッセージを流通してみます。

    public void testTokenizeHeader() throws Exception {  
        // 入力データ送信
        template.sendBodyAndHeader("direct:tokenize_header", "BODY", "type", "type A,type B,type C");
    }

type Aから順に、BODYに設定されることがわかります。

[main] (1) body: BODY
[main] (1) heeader.type: type A,type B,type C
[main] (2) body: type A
[main] (2) header.type: type A,type B,type C
[main] (2) body: type B
[main] (2) header.type: type A,type B,type C
[main] (2) body: type C
[main] (2) header.type: type A,type B,type C
[main] (3) body: BODY
[main] (3) header.type: type A,type B,type C

大量データ

Tokenizeの場合、対象データをメモリに読み込んでから分割処理を行うため、対象データが大きい場合メモリの消費量が多くなります。これを回避するには、<split>streaming属性をtrueに設定します。

        <route>
            <from uri="direct:streaming"/>
            <split streaming="true">
                <tokenize token="\n"/>
                <!-- (1) 分割した各データ -->
                <log message="(1) ${body}" />
            </split>    
            <to uri="mock:result"/>
        </route>

この場合、java.util.Scannerを使用して、ファイルから少しずつデータを読み込み分割を行います。

    public void testStreaming() throws Exception {  
        // 入力ファイル    
        File csv = new File("src/test/resources/com/buildria/camel/example/eip/splitter/streaming_data.csv");
        // 最後にデータを1つ受け取ること
        resultEndpoint.expectedMessageCount(1);
        // 入力データ送信
        template.sendBody(csv);
        // 検証
        resultEndpoint.assertIsSatisfied();
    }

並行処理

Splitterで分割されたサブメッセージは、通常シーケンシャルに処理されますが、parallelProcessing属性をtrueに設定すると、スレッドプールを使用してサブメッセージの処理を並行して処理します。並行して動作させることで、処理を効率よく行うことができます。

<split parallelProcessing="true">

ログにスレッド名を出力するようにしてみると、別スレッドで動作しているのがわかります(通常は同一スレッドです)。

[Camel (camel-1) thread #3 - Split] (2) User{id=2, name=User2, address=Tokyo}.
[Camel (camel-1) thread #2 - Split] (2) User{id=1, name=User1, address=Tokyo}.
[Camel (camel-1) thread #4 - Split] (2) User{id=3, name=User3, address=Tokyo}.
[Camel (camel-1) thread #5 - Split] (2) User{id=4, name=User4, address=Tokyo}.
[Camel (camel-1) thread #7 - Split] (2) User{id=6, name=User6, address=Tokyo}.
[Camel (camel-1) thread #6 - Split] (2) User{id=5, name=User5, address=Tokyo}.

スレッド数を制御したい場合は、executorServiceRef属性にスレッドプールを指定します。この属性を指定すると、自動的にparallelProcessing属性はtrueになるので不要です。

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <!-- Splitterで使用するプールの設定 -->
        <threadPoolProfile id="splitterPool" 
                               poolSize="3" maxPoolSize="3" maxQueueSize="-1"/>
        <route>
            <from uri="direct:start_b"/>
            <!-- (1) 分割する前 -->
            <log message="(1) ${body}." />
            <!-- パラレル -->
            <split executorServiceRef="splitterPool"> // ここ
                <simple>${body}</simple>
                <!-- (2) 分割した各データ -->
                <log message="(2) ${body}." />
            </split>    
            <!-- (3) 分割終了後 -->
            <log message="(3) ${body}." />
            <to uri="mock:result_b"/>
        </route>                
    </camelContext>

エラー処理

分割中にエラーが発生した場合はどうなるでしょうか。

Spring XMLが次のような場合に、3つのデータを含むメッセージを送信してみます。この場合、分割中にログを出力した後、例外をスローします。

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <!-- 例外発生時の処理 -->
        <onException>
            <exception>java.lang.RuntimeException</exception>
            <log message="(5) Error occured while splitting." />
        </onException>
        
        <!-- stopOnException未設定 -->
        <route id="No_stopOnException">
            <from uri="direct:start_a"/>
            <!-- (1) 分割する前 -->
            <log message="(1) ${body}" />
            <split>
                <simple>${body}</simple>
                <!-- (2) 分割した各データ -->
                <log message="(2) ${header.CamelSplitIndex}: ${body}" />
                <!-- (3) 例外をスロー -->
                <log message="(3) 例外をスロー" />
                <throwException ref="runtimeException" />
            </split>    
            <!-- (4) 分割終了後 -->
            <log message="(4) ${body}" />
            <to uri="mock:result_a"/>
        </route>
    </camelContext>
    <!-- Split中に発生する例外 -->
    <bean id="runtimeException" class="java.lang.RuntimeException" />
 

このときのログは以下のようになります。エラーが発生しても次のサブメッセージの処理を行っていることが分かります。

(1) [User{id=001, name=User01, address=Tokyo}, User{id=002, name=User02, address=Chiba}, 
     User{id=003, name=User03, address=Kanagawa}]
(2) 0: User{id=001, name=User01, address=Tokyo}
(3) 例外をスロー
(5) Error occured while splitting.
Failed delivery for (MessageId: ID-emilia-34234-1388548668359-0-1 on ExchangeId: 
ID-emilia-34234-1388548668359-0-3). Exhausted after delivery attempt: 1 caught:
 java.lang.RuntimeException. Processed by failure processor:
 FatalFallbackErrorHandler[Channel[Log(No_stopOnException)[(5) Error occured while splitting.]]]

(snip)

(2) 1: User{id=002, name=User02, address=Chiba}
(3) 例外をスロー
(5) Error occured while splitting.
Failed delivery for (MessageId: ID-emilia-34234-1388548668359-0-1 on ExchangeId: 
ID-emilia-34234-1388548668359-0-4). Exhausted after delivery attempt: 1 caught:
 java.lang.RuntimeException. Processed by failure processor:
 FatalFallbackErrorHandler[Channel[Log(No_stopOnException)[(5) Error occured while splitting.]]]

(snip)

(2) 2: User{id=003, name=User03, address=Kanagawa}
(3) 例外をスロー
(5) Error occured while splitting.
Failed delivery for (MessageId: ID-emilia-34234-1388548668359-0-1 on ExchangeId: 
ID-emilia-34234-1388548668359-0-5). Exhausted after delivery attempt: 1 caught:
 java.lang.RuntimeException. Processed by failure processor:
 FatalFallbackErrorHandler[Channel[Log(No_stopOnException)[(5) Error occured while splitting.]]]

(snip)

エラーが発生した場合に、次のサブメッセージでもエラーが発生する場合は、

            <split>
                <simple>${body}</simple>
を、
            <split stopOnException="true">
                <simple>${body}</simple>

に変えることで、例外が発生した時点で処理を中止することができます。

(1) [User{id=001, name=User01, address=Tokyo}, User{id=002, name=User02, address=Chiba}, 
    User{id=003, name=User03, address=Kanagawa}]
(2) 0: User{id=001, name=User01, address=Tokyo}
(5) Error occured while splitting.
Failed delivery for (MessageId: ID-emilia-55106-1388549454989-0-1 on ExchangeId: 
ID-emilia-55106-1388549454989-0-3). Exhausted after delivery attempt: 1 caught:
 java.lang.RuntimeException. Processed by failure processor:
 FatalFallbackErrorHandler[Channel[Log(stopOnException)[(5) Error occured while splitting.]]]

(snip)

関連項目