読者です 読者をやめる 読者になる 読者になる

Payara Micro clustered CDI Event Bus -インスタンスを跨いだCDIイベント-


f:id:Naotsugu:20161002032336p:plain

Clustered CDI Event Bus

Payara Micro は Glassfish を元にしたマイクロサービス向けの JavaEE コンテナです。

Payara Micro の簡単な概要は以下を参照してください。

etc9.hatenablog.com

Payara は Hazelcast でクラスタサポートをしており、Payara Micro では Hazelcast 経由で CDI イベントをクラスタ間でやり取りできるような拡張が行われています。

イベントの送信

イベントの送信は普通のCDIイベントと同じように、Event をインジェクトして、event.fire() とするだけです。

違いは @Outbound の限定子を付ける点。

  @Inject
  @Outbound
  Event<Message> event;

何かしらのメッセージを fire。

  Message message = // ・・・
  event.fire(message);

イベントの受信

受信側もCDIイベントと同じですが、@Inbound の限定子を付けます。

    public void observe(@Observes @Inbound Message message) {
    }

Clustered

アプリケーション側のコードでは、@Outbound@Inbound のアノテーションを加えるだけで、Payara Micro のクラスタ間でイベントの送受信が可能となります。

f:id:Naotsugu:20170107021554p:plain

CDI イベントは以下の EventBus で送受信されます。

@Service(name = "payara-event-bus")
@RunLevel(StartupRunLevel.VAL)
public class EventBus {

    @Inject
    private HazelcastCore hzCore;

    public boolean publish(String topic, ClusterMessage message) {
        boolean result = false;
        if (hzCore.isEnabled()) {
            hzCore.getInstance().getTopic(topic).publish(message);
            result = true;
        }
        return result;
    }
}

ご覧のように EventBus は Glassfish のサービスになっており、この EventBus から Hazelcast の Topic 経由でメッセージが publish されます。

受信側も EventBus を介し、TopicListener にて処理されます。

@Service(name = "payara-event-bus")
@RunLevel(StartupRunLevel.VAL)
public class EventBus {

    public boolean addMessageReceiver(String topic, MessageReceiver mr) {
        boolean result = false;
        if (hzCore.isEnabled()) {
            TopicListener tl = messageReceivers.get(topic);
            if (tl == null) {
                // create a topic listener on the specified topic
                TopicListener newTL = new TopicListener(topic);
                String regId = hzCore.getInstance().getTopic(topic).addMessageListener(newTL);
                messageReceivers.put(topic, newTL);
                tl = newTL;
                tl.setRegistrationID(regId);
            }
            
            tl.addMessageReceiver(mr);
            result = true;
        }
        return result;
    }

TopicListener は以下のようになっており、onMessage() にて処理されます。

public class TopicListener implements MessageListener {

    private HashSet<MessageReceiver> receivers;

    @Override
    @SuppressWarnings("unchecked")
    public void onMessage(Message msg) {
        for (MessageReceiver receiver : receivers) {
            receiver.receiveMessage((ClusterMessage)msg.getMessageObject());
        }
    }

ちなみに、TopicListener は fish.payara.nucleus.eventbus.TopicListener MessageListener は com.hazelcast.core.MessageListener となります。

イベント送信側の実装

簡単なサンプルを作ってみましょう。

JAX-RS の Application クラスも用意しておきます。

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("/rs")
public class MyApplication extends Application {
}

イベント送信側。

import fish.payara.micro.cdi.ClusteredCDIEventBus;
import fish.payara.micro.cdi.Outbound;

import javax.annotation.PostConstruct;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

@Path("/")
public class MyResource {

    @Inject
    ClusteredCDIEventBus bus;

    @Inject
    @Outbound
    Event<String> event;

    @PostConstruct
    private void postConstruct() {
        bus.initialize();
    }

    @GET
    @Produces("text/plain")
    public String get() {
        event.fire("hello");
        return "Hello world";
    }
}

JAX-RS のリソースでGETリクエスト受けた場合に固定で hello という文字列を送信します。

CDI Event Bus を利用する場合には、ClusteredCDIEventBus の初期化が必要となるため、 @PostConstructbus.initialize() を読んでやる必要があります。


加えて beans.xml を用意します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="all">
</beans>

送信側の @Outboundbean-discovery-mode="all" となっていないと現状の Payara Micro(4.1.1.164) ではインジェクションできないので注意が必要です。

bean-discovery-mode については以下を参照してください。

etc9.hatenablog.com

イベント受信側の実装

受信側もは単なる CDI Bean にしてsysout するだけにします。

import fish.payara.micro.cdi.ClusteredCDIEventBus;
import fish.payara.micro.cdi.Inbound;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Observes;
import javax.inject.Inject;

@ApplicationScoped
public class MessageReceiver {

    @Inject
    ClusteredCDIEventBus bus;

    public void handle(@Observes @Initialized(ApplicationScoped.class) Object event) {
        bus.initialize();
    }

    public void observe(@Observes @Inbound String event) {
        System.out.println("observe:" + event);
    }
}

こちら側でも ClusteredCDIEventBus の初期化が必要なので、CDI の @Initialized で無理やり初期化しときます。

@ApplicationScoped の CDI Bean は、どこかに Inject されたタイミングでインスタンス化されますが、@Initialized(ApplicationScoped.class) のイベントを受けることでアプリケーションの起動時にインスタンス化することができます(リソースクラスなどにInjectして利用する場合はこのようなイベントによる初期化は不要です)。

ビルド

以下のようなGradleスクリプトでビルドします(特になんでもいいです)。

apply plugin: 'war'

[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
sourceCompatibility = targetCompatibility = '1.8'

repositories { jcenter() }

dependencies {
    compile 'fish.payara.extras:payara-micro:4.1.1.164'
}


task explodedWar(type: Copy) {
    into "$buildDir/exploded"
    with war
}

war {
    archiveName = 'app.war'
    rootSpec.exclude('**/payara/**')
    rootSpec.exclude('**/payara-micro*.jar')
    dependsOn explodedWar
}

task uber(type: JavaExec) {
    dependsOn war
    classpath = sourceSets.main.runtimeClasspath
    main = 'fish.payara.micro.PayaraMicro'
    args '--deploy', war.archivePath.path, '--outputUberJar', "$buildDir/uber.jar"
}

gradlew war で war 作成した後、gradlew uber とすると uber.jar が出来上がります。

実行

今回は同一マシン上で試してみます。一つ目のコンソールから作成したuber.jar を単に実行します。

java -jar uber.jar --port 8090

起動が完了したら、もう一つのコンソールでも 別ポートで実行します。

java -jar uber.jar --port 8080

二つ目の起動のタイミングで以下のログが出てクラスタが自動構成されていることがわかります。

Members [2] {
    Member [XX.XX.XX.XX]:5900 - 6956fbc3-82fc-4f78-9395-c1891c19ec52 this
    Member [XX.XX.XX.XX]:5901 - 1a424ce2-7299-4539-bce1-1e49794547f4
}
]]


ブラウザから http://localhost:8090/app/rs/(一つ目のコンソールで起動したアプリケーション) にアクセスすると、二つ目のコンソールに以下のログが出力され、CDI イベントが送受信されていることがわかります。

observe:hello


簡単ですね!

おまけ

以下のようなmainクラスを用意しておいてIDEから起動することもできます。

import fish.payara.micro.PayaraMicro;
import fish.payara.micro.PayaraMicroRuntime;
import java.io.File;

public class BootMicro {

    public static void main(String[] args) throws Exception {

        PayaraMicro micro = PayaraMicro.getInstance();
        micro.setHttpAutoBind(true);

        PayaraMicroRuntime instance = micro.bootStrap();
        instance.deploy("app", "app", new File("build/exploded/"));
    }
}

payara-micro の依存を compile にしていたのはこのためで、Uber.jar 作るだけなら providedCompile でいいです。