Aggregator

概要

Aggregatorは、入力の複数メッセージを認識し、関連するメッセージを1つのメッセージにまとめます。Splitterと逆の処理を行います。

Aggregatorを理解する上で必要な要素が3つあります。

  • 集約識別子 - 入力のメッセージを1つにまとめるための識別子です。この識別子を評価することで、メッセージをまとめるグループを判断します。
  • 完了条件 - 入力メッセージのグルーピングを完了する条件です。条件式か時間ベース(タイムアウトなど)の条件で記述します。
  • Aggregation Strategy - 複数のメッセージを1つのメッセージにまとめる方式です。

この3つを設定しないと、Aggregatorは動作しません。


Aggregator

基本

次のようなAggregatorを作成してみます。

集約識別子 ヘッダのtype
完了条件 メッセージが3つ揃うこと
Aggregation StrategyCSV形式

Spring XMLは次の様になります。

<bean>タグでAggregation Strategyを定義し、aggregateタグのstrategyRef属性に設定します。また、完了条件をcompletionSize属性として設定しています。

また、集約識別子をcorrelationExpressionタグの子要素<simple>に設定しています。

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <!-- (1) 集約前 -->
            <log message="(1) before: ${header.type} : ${body}" />
            <!-- Aggregation Strategy と 完了条件(メッセージを3つ) -->
            <aggregate strategyRef="MyAggregationStrategy" 
                       completionSize="3">
                <!-- 集約識別子 -->
                <correlationExpression>
                    <simple>${header.type}</simple>
                </correlationExpression>
                <!-- (2) 集約した各データ -->
                <log message="(2) aggregated: ${body}" />
            </aggregate>    
            <!-- (3) 集約終了後 -->
            <log message="(3) end: ${header.type} - ${body}" />
            <to uri="mock:result"/>
        </route>
    </camelContext>
    <!-- Aggregation Strategy -->
    <bean id="MyAggregationStrategy" 
          class="com.buildria.camel.example.eip.aggregator.MyAggregationStrategy" />

このSpring XMLに対して、次のようにメッセージを送信してみます。 集約識別子であるヘッダのtypeが、A, B, C, A, A, B, Bの順で送信します。

    public void testAggregate() throws Exception {  
        // 入力データ送信
        template.sendBodyAndHeader("direct:start", "No.1", "type", "A");
        template.sendBodyAndHeader("direct:start", "No.2", "type", "B");
        template.sendBodyAndHeader("direct:start", "No.3", "type", "C");
        template.sendBodyAndHeader("direct:start", "No.4", "type", "A");
        template.sendBodyAndHeader("direct:start", "No.5", "type", "A");
        template.sendBodyAndHeader("direct:start", "No.6", "type", "B");
        template.sendBodyAndHeader("direct:start", "No.7", "type", "B");
    }

ログを見ると、10行目でtypeがAであるメッセージが3つ揃ったので、CSV形式に集約されていることが分かります。 また、集約されても</aggregate>タグの次には流れないことが分かります。

集約されるのは<aggregate> - </aggregate>の中だけ。</aggregate>を抜けると、もとのメッセージが流れます。

[main] (1) before: A : No.1
[main] (3) end: A - No.1
[main] (1) before: B : No.2
[main] (3) end: B - No.2
[main] (1) before: C : No.3
[main] (3) end: C - No.3
[main] (1) before: A : No.4
[main] (3) end: A - No.4
[main] (1) before: A : No.5
[main] (2) aggregated: No.1,No.4,No.5
[main] (3) end: A - No.5
[main] (1) before: B : No.6
[main] (3) end: B - No.6
[main] (1) before: B : No.7
[main] (2) aggregated: No.2,No.6,No.7
[main] (3) end: B - No.7

最後に、使用したMyAggregationStrategyを示します。 MyAggregationStrategyは、AggregationStrategyを実装します。集約するメッセージを受けとると、aggregateメソッドを実行します。

aggregateメソッドは、集約中のデータを保持するoldExchangeとこれから集約するデータを持つnewExchangeを受けとります。oldExchangenewExchangeのIn MessageのBodyを取得し何らかの加工を行って、oldExchangeのIn MassegaeのBodyに再設定して返すのが一般的です。

public class MyAggregationStrategy implements AggregationStrategy {

    /**
     * <p>メッセージを集約します。</p>
     * 
     * 取得したString型のメッセージを<code>List&lt;String&gt;</code>に追加します。
     * 
     * ある集約識別子単位で初めての<code>newExchange</code>を受け取った場合、
     * <code>oldExchange</code>は<code>null</code>です。
     * 
     * @param oldExchange 集約中のExchange
     * @param newExchange 集約するExchange
     * @return 集約したExchange
     */
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        // 初めてのExchange
        if (oldExchange == null) {
            return newExchange;
        }

        // 集約中のCSVを取得
        String oldBody = oldExchange.getIn().getBody(String.class);
        // 新規に集約する文字列を習得
        String newBody = newExchange.getIn().getBody(String.class);
        // CSV形式の文字列を生成して再設定
        oldExchange.getIn().setBody(oldBody + "," + newBody);

        return oldExchange;
    }
}

応用

完了条件

上記で使用したcompletionSize以外にも完了条件があります。これらの条件を複数設定することもできます。

完了条件 説明
completionSize集約するメッセージの数を設定します。ヘッダ等の値をもとに動的に設定することも可能です。
completionTimeoutメッセージが指定した時間(ms)集約されない場合に集約を完了します。集約識別子単位にスケジュールされます。固定の時間(ms)もしくは動的に設定することも可能です。completionInternalと同時に設定することはできません。
completionInternal一定期間毎(ms)に集約を完了します。
completionPredicate集約が完了する条件をSimple言語などで設定します。
completionFromBatchConsumer バッチコンシューマから取得したメッセージを、バッチコンシューマが設定したヘッダCamelBatchSizeをもとに集約します。MyBatisコンポーネントなどで、1回のポーリングで取得した複数のメッセージを集約する場合に使用します。
forceCompletionOnStopコンテキストが終了する場合に集約を完了します。

completionSizeのみ設定した場合、設定した数のメッセージが揃わないと集約したメッセージが流れません。これを回避するには、completionTimeoutをあわせて設定します。

動的な完了条件の設定 (completionSize, completionTimeout)

completionSizeの値を、固定値ではなくヘッダのcompletionSizeの値によって動的に設定するには、 <aggregate>の属性ではなく子要素として設定し、Simpe言語などでヘッダcompletionSizeを設定します。

<aggregate strategyRef="MyAggregationStrategy"> 
    <!-- 集約識別子 -->
    <correlationExpression>
        <simple>${header.type}</simple>
     </correlationExpression>
     <!-- 完了条件を動的に設定 -->
     <completionSize>
         <simple>${header.completionSize}</simple>
     </completionSize>
      (snip)
</aggreagte>      

completionTimeoutも同様です。

completionPredicate

メッセージの数やタイムアウト以外の条件を完了条件とする場合、completionPredicateを使用します。

例えば、集約中のExchangeのヘッダcompletiontrueであることを完了条件とする場合は、

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <!-- (1) 集約前 -->
            <log message="(1) before: ${header.type} : ${body}" />
            <!-- Aggregation Strategy -->
            <aggregate strategyRef="MyPredicateAggregationStrategy"> 
                <!-- 集約識別子 -->
                <correlationExpression>
                    <simple>${header.type}</simple>
                </correlationExpression>
                <!-- 完了条件 集約中のExchangeのヘッダ"completion"が"true" -->
                <completionPredicate>
                    <simple>${header.completion}</simple>
                </completionPredicate>
                <!-- (2) 集約した各データ -->
                <log message="(2) aggregated: ${body}" />
            </aggregate>    
            <!-- (3) 集約終了後 -->
            <log message="(3) end: ${header.type} - ${body}" />
            <to uri="mock:result"/>
        </route>
    </camelContext>
    <!-- Aggregation Strategy -->
    <bean id="MyPredicateAggregationStrategy" 
          class="com.buildria.camel.example.eip.aggregator.MyPredicateAggregationStrategy" />

のように、<aggregate>の子要素に以下を追加します。

                <!-- 完了条件 集約中のExchangeのヘッダ"completion"が"true" -->
                <completionPredicate>
                    <simple>${header.completion}</simple>
                </completionPredicate>

デフォルトでは、集約中のExchange(oldExchange)に対しての条件となりますが、 <aggregate>の属性eagerCheckCompletiontrueを設定すると、入力のExchange(newExchange)が対象になります。

関連項目