Java EE の標準バッチAPI -- JSR-352 Batch Applications for the Java Platform --

f:id:Naotsugu:20150224011617p:plain


Java Batchとは

JSR-352 Batch Applications for the Java Platform として仕様化されたバッチ処理APIJava EE 7 に取り込まれた。通称 jBatch (日本のみ?)。

構造は Spring Batch とほとんど同じ。標準化されるのは良いことだ。

Job の種類と実行制御

Job の処理は2種類ある。

  • STEPA のように 入力・処理・出力 をある塊で処理していくもの(Chunk)
  • STEPB のように なんらかのタスクとして処理するもの(Batchlet)

f:id:Naotsugu:20150217232500p:plain

実行順序の制御は xml ファイルで定義する。 以下のような実行制御ができる。

  • シーケンシャルに実行することもできるし、パラレルで実行することもできる
  • チェックポイントを設けて、チェックポイントからリランなどできる
  • エラー発生時に、その対象をスキップして処理を継続するなど実行制御できる

Job作成の流れ

javax.batch.api.chunk.ItemReader javax.batch.api.chunk.ItemProcessor javax.batch.api.chunk.ItemWriter というインターフェースの実装クラスをそれぞれ作成し、以下のような Job 定義でMyReader MyProcessor MyWriter などと指定する。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
  <step id="mychunk">
    <chunk>
      <reader ref="MyReader"></reader>
      <processor ref="MyProcessor"></processor>
      <writer ref="MyWriter"></writer>
    </chunk>
  </step>
</job>

               

Jobの起動は以下のように BatchRuntime を介して行う。

JobOperator jobOperator = BatchRuntime.getJobOperator();
long execID = jobOperator.start("simplejob", new Properties());

start の第一引数にはxmlで定義した job の id を指定する。

なお、インターフェースの他、 AbstractItemReaderAbstractItemProcessorAbstractItemItemWriter といった抽象クラスも用意されている。

Chunk の実装例

前述の通り、ItemReader、ItemProcessor、ItemWriter を作成する。

ItemReader

ItemReader は対象アイテムの読み込み処理を書く。

public class MyReader implements ItemReader {

    private BufferedReader breader;
    @Inject
    JobContext jobCtx;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        String fileName = jobCtx.getProperties().getProperty("input_file");
        breader = new BufferedReader(new FileReader(fileName));
    }

    @Override
    public void close() throws Exception {
        breader.close();
    }

    @Override
    public Object readItem() throws Exception {
        String line = breader.readLine();
        return line;
    }
}

DI される JobContext から外部定義したプロパティを取得できる。 この例では指定ファイルを読み込むだけ。

ItemProcessor

アイテムに対する処理を書く。

public class MyProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object obj) throws Exception {
        String line = (String) obj;
        return line.toUpperCase();
    }
}

ここでは toUpperCase しているのみ。

ItemWriter

処理結果の書き込み。

public class MyWriter implements ItemWriter {
    private BufferedWriter bwriter;
    @Inject
    private JobContext jobCtx;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        String fileName = jobCtx.getProperties().getProperty("output_file");
        bwriter = new BufferedWriter(new FileWriter(fileName));
    }

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (int i = 0; i < items.size(); i++) {
            String line = (String) items.get(i);
            bwriter.write(line);
            bwriter.newLine();
        }
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
}

writeItems() の引数が List になっている通り、processItem での結果は塊(Chunk) で扱われる。 デフォルトでは 10 個のアイテムが Chunk として扱われる。 checkpointInfo() については後述。

Job 定義

定義は以下のように xml で定義する。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
  <properties>
    <property name="input_file" value="input.txt"/>
    <property name="output_file" value="output.txt"/>
  </properties>

  <step id="mychunk">
    <chunk>
      <reader ref="MyReader"></reader>
      <processor ref="MyProcessor"></processor>
      <writer ref="MyWriter"></writer>
    </chunk>
</job>

propertie を合わせて定義している。 MyReader などはパッケージ含めたクラス名を書く。

Batchlet の簡単な例

タスク指向のバッチステップは Batchlet で扱う。

javax.batch.api.Batchlet を実装する。 もちろん javax.batch.api.AbstractBatchlet もある。

public class MyBatchlet implements Batchlet {
    @Inject
    private JobContext jobCtx;
    
    @Override
    public String process() throws Exception {
        String fileName = jobCtx.getProperties().getProperty("output_file");
        System.out.println(""+(new File(fileName)).length());
        return "COMPLETED";
    }
}

           

定義ファイルは以下のようになる。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
  <properties>
    <property name="input_file" value="input.txt"/>
    <property name="output_file" value="output.txt"/>
  </properties>
  <step id="mytask">
    <batchlet ref="MyBatchlet"></batchlet>
    <end on="COMPLETED"/>
  </step>
</job>

batchlet 要素で作成した MyBatchlet を指定している。

バッチステータス

バッチ のステータスは JobExecution を介して取得できる。

long execID = jobOperator.start("simplejob", new Properties());
JobExecution jobExec = jobOperator.getJobExecution(execID);
String status = jobExec.getBatchStatus().toString();

        

以下のような enum が用意されている。

public enum BatchStatus {
STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED }

ステータスの意味は、

Value Description
STARTING ジョブがバッチランタイムにサブミットされた
STARTED ジョブの実行中.
STOPPING ジョブ中断のリクエスト受付済み
STOPPED ジョブが中断された
FAILED エラーによりジョブ実行が終了した
COMPLETED ジョブが成功終了した
ABANDONED ジョブが破棄としてマークされた

Job Specification Language

ジョブの実行制御は xml ファイルで指定する。配置場所は以下。

  • jar の場合 META-INF/batch-jobs/
  • warの場合 WEB-INF/classes/META-INF/batch-jobs/

ファイル名はジョブIDと一致させる必要があり、以下の場合は simplejob.xml とする。

<?xml version="1.0" encoding="UTF-8"?>
<job id="simplejob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    ・・・・
</job>

Job 定義の要素

job 要素がトップレベルとなる。

<job id="jobname" restartable="true">
  <listeners>
    <listener ref="com.xyz.pkg.ListenerBatchArtifact"/>
  </listeners>
  <properties>
    <property name="propertyName1" value="propertyValue1"/>
    <property name="propertyName2" value="propertyValue2"/>
  </properties>
  <step ...> ... </step>
  <step ...> ... </step>
  <decision ...> ... </decision>
  <flow ...> ... </flow>
  <split ...> ... </split>
</job>

実行制御は以下で行う。

  • ステップ(Steps)
  • フロー(Flows)
  • スプリット(Splits)
  • 条件分岐(Decision elements)

listeners でイベントをインタセプトするリスナを定義する。 properties で Job に渡すプロパティを定義する。

Step 要素

step 要素に next で次Jobを指定する。

<job id="loganalysis" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">

  <step id="logprocessor" next="cleanup">
    <chunk checkpoint-policy="item" item-count="10">
      <reader ref="com.xyz.pkg.LogItemReader"></reader>
      <processor ref="com.xyz.pkg.LogItemProcessor"></processor>
      <writer ref="com.xyz.pkg.LogItemWriter"></writer>
    </chunk>
  </step>

  <step id="cleanup">
    <batchlet ref="com.xyz.pkg.CleanUp"></batchlet>
    <end on="COMPLETED"/>
  </step>
</job>

上記例では next="cleanup" として次ステップを定義している。

chunk 要素では item-count で chunk サイズを指定する。この例では 10 回毎に ItemWriter へ制御が移る。

checkpoint-policyitem が指定されているため、chunk 毎にコミットされる。 checkpoint-policycustom を指定すると checkpoint-algorithm で指定するアルゴリズムでコミットされる。

chunk ステップの例

具体的な chunk ステップの例は以下のような感じになる。

<step id="stepC" next="stepD">
  <chunk checkpoint-policy="item" item-count="5" time-limit="180"
         buffer-items="true" skip-limit="10" retry-limit="3">
    <reader ref="pkg.MyItemReaderImpl"></reader>
    <processor ref="pkg.MyItemProcessorImpl"></processor>
    <writer ref="pkg.MyItemWriterImpl"></writer>
    <skippable-exception-classes>
      <include class="pkg.MyItemException"/>
      <exclude class="pkg.MyItemSeriousSubException"/>
    </skippable-exception-classes>
    <retryable-exception-classes>
      <include class="pkg.MyResourceTempUnavailable"/>
    </retryable-exception-classes>
  </chunk>
</step>

chunk ステップの要素

chunk ステップには以下のような要素を指定可能。

<step id="stepA" next="stepB">
  <properties> ... </properties>
  <listeners>
    <listener ref="MyItemReadListenerImpl"/>
    ...
  </listeners>
  <chunk ...> ... </chunk>
  <partition> ... </partition>
  <end on="COMPLETED" exit-status="MY_COMPLETED_EXIT_STATUS"/>
  <stop on="MY_TEMP_ISSUE_EXIST_STATUS" restart="step0"/>
  <fail on="MY_ERROR_EXIT_STATUS" exit-status="MY_ERROR_EXIT_STATUS"/>
</step>

partition は並列実行を定義する。

end on は終了ステータスに割り当てるバッチステータスを定義する。

stop on はJobをストップする条件を定義する。

fail on は失敗に割り当てる終了状態を定義する。

タスクステップの例

具体的な chunk ステップの例は以下のような感じになる。

<step id="stepD" next="stepE">
  <batchlet ref="pkg.MyBatchletImpl">
    <properties>
      <property name="pname" value="pvalue"/>
    </properties>
  </batchlet>
</step>

タスクステップの要素

タスクステップには以下のような要素を指定可能。

<step id="stepB" next="stepC">
  <batchlet ...> ... </batchlet>
  <properties> ... </properties>
  <listener ref="MyStepListenerImpl"/>
</step>

partition ステップの例

扱うデータを分割して並列に扱う。

plan 利用

<step id="stepE" next="stepF">
  <chunk>
    <reader ...></reader>
    <processor ...></processor>
    <writer ...></writer>
  </chunk>
  <partition>
    <plan partitions="2" threads="2">
      <properties partition="0">
        <property name="firstItem" value="0"/>
        <property name="lastItem" value="500"/>
      </properties>
      <properties partition="1">
        <property name="firstItem" value="501"/>
        <property name="lastItem" value="999"/>
      </properties>
    </plan>
  </partition>
  <reducer ref="MyPartitionReducerImpl"/>
  <collector ref="MyPartitionCollectorImpl"/>
  <analyzer ref="MyPartitionAnalyzerImpl"/>
</step>

mapper 利用

<step id="stepE" next="stepF">
  <chunk>
    <reader ...></reader>
    <processor ...></processor>
    <writer ...></writer>
  </chunk>
  <partition>
    <mapper ref="MyPartitionMapperImpl"/>
    <reducer ref="MyPartitionReducerImpl"/>
    <collector ref="MyPartitionCollectorImpl"/>
    <analyzer ref="MyPartitionAnalyzerImpl"/>
  </partition>
</step>

フロー要素

Job のステップをまとめて一処理単位として扱う。

job flow split の子要素となる。

<flow id="flowA" next="stepE">
  <step id="flowAstepA" next="flowAstepB">...</step>
  <step id="flowAstepB" next="flowAflowC">...</step>
  <flow id="flowAflowC" next="flowAsplitD">...</flow>
  <split id="flowAsplitD" next="flowAstepE">...</split>
  <step id="flowAstepE">...</step>
</flow>

スプリット要素

フローはそれぞれ別スレッドで実行される。 すべて完了した時に次へ遷移する。 複数の業務処理を並列で実行(partition は扱うデータを並列で処理)。

job flow の子要素となる。

<split id="splitA" next="stepB">
  <flow id="splitAflowA" next="splitAflowB">...</flow>
  <flow id="splitAflowB">...</flow>
  <flow id="splitAflowC">...</flow>
</split>

decision 要素

前ステップの完了ステータスにて、次ステップの分岐や続行/継続を判断する。

job flow の子要素となる。

<decision id="decisionA" ref="MyDeciderImpl">
  <fail on="FAILED" exit-status="FAILED_AT_DECIDER"/>
  <end on="COMPLETED" exit-status="COMPLETED_AT_DECIDER"/>
  <stop on="MY_TEMP_ISSUE_EXIST_STATUS" restart="step2"/>
</decision>

分岐。

<decision id="decisionA" ref="MyDeciderImpl">
  <next on="FAILED" to="fooStep"/>
  <end on="COMPLETED" to="barStep"/>
</decision>

JobContext

バッチ処理の各実装クラスには CDI によるコンテキストのDIが可能。

public class MyItemReaderImpl implements ItemReader {
    @Inject
    JobContext jobCtx;

    public MyItemReaderImpl() {}

    @Override
    public void open(Serializable checkpoint) throws Exception {
        String fileName = jobCtx.getProperties()
                                .getProperty("log_file_name");
        ・・・
    }
    ・・・
}

JobContextCDI 経由で取得できる。 ただし コンストラクタで JobContext を取得することがはできない。

Job のスケジュール起動

@Schedule にてJobのスケジュール起動ができる。

@Singleton
public class BatchJobRunner {
    @Schedule(deyOfWeek = "Sun")
    public void scheduleJob() {
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        jobOperator.start("simplejob", new Properties());
}

        

ManagedScheduledExecutorServicd 経由の例。

@Resource(lookup = "java:comp/DefaultManagedScheduledExecutorServicd")
private ManagedScheduledExecutorService executor;

public void scheduleJob() {
    JobOperator jobOperator = BatchRuntime.getJobOperator();
    executor.schedule( () -> jobOperator.start("", new Properties()), 7, TimeUnit.DAYS);
}    

listener

Job 実行の各タイミングで listener が用意されている。

  • Job の実行前後 : javax.batch.api.listener.JobListener
  • ステップの前後 : javax.batch.api.listener.StepListener
  • 各Chunkの実行前後 : javax.batch.api.chunk.listener.ChunkListener
  • 各Itemの読み込み前後 : javax.batch.api.chunk.listener.ItemReadListener
  • 各Itemの処理前後 : javax.batch.api.chunk.listener.ItemProcessListener
  • 各Itemの書き込み前後 : javax.batch.api.chunk.listener.ItemWriteListener
  • リトライ時のItem読み込み時 : javax.batch.api.chunk.listener.RetryReadListener
  • リトライ時の処理時 : javax.batch.api.chunk.listener.RetryProcessListener
  • リトライ時の書き込み時 : javax.batch.api.chunk.listener.RetryWriteListener
  • 読み込みクラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipReadListener
  • 処理クラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipProcessListener
  • 書き込みクラスからのSkip可能例外発生時 : javax.batch.api.chunk.listener.SkipWriteListener

リスナの実装クラスを用意しJob定義の listener 要素として定義する。

CDI 名による参照

Named 定義しておくと

@Dependent
@Named("myItemReaderImpl")
public class MyItemReaderImpl implements ItemReader {
    ・・・
}

@Dependent は CDI1.1 からの CDI デフォルト化 でスコープアノテーションがあるものが CDI 対象となるため付与。

beans.xmlbean-discovery-mode="all" の指定であれば不要。

CDI 名で定義できる

  <chunk>
    <reader ref="myItemReaderImpl"></reader>
    ...
  </chunk>

IDE サポート

Netbeans では jBatch Suite が提供されている。

プラグイン入れる。

f:id:Naotsugu:20150224005135p:plain

f:id:Naotsugu:20150224005201p:plain

GUI で組んで Generate すると、Job定義の xml やら ステップの実装の骨組みを生成してくれる。

f:id:Naotsugu:20150224011632p:plain