The query REST service

We create a second REST service as the query service. The service contains the following files:

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>ninja.disruptor</groupId>
    <artifactId>battleapp.query</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>
    <dependencies>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.12</version>
        </dependency>
        <dependency>
            <groupId>com.airhacks</groupId>
            <artifactId>porcupine</artifactId>
            <version>0.0.4</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>battleapp</finalName>
    </build>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>
</project>

Dockerfile

FROM jboss/keycloak-adapter-wildfly:2.4.0.Final

MAINTAINER Robert Brem <brem_robert@hotmail.com>

ENV DEPLOYMENT_DIR ${JBOSS_HOME}/standalone/deployments/

ADD target/battleapp.war ${DEPLOYMENT_DIR}

build.js

#!/usr/bin/jjs -fv

var version = $ENV.VERSION;
var username = $ENV.REGISTRY_USERNAME;
var password = $ENV.REGISTRY_PASSWORD;
var email = $ENV.REGISTRY_EMAIL;

var registry = "disruptor.ninja:30500";
var image = "robertbrem/battleapp-query";
var completeImageName = registry + "/" + image + ":" + version;

var dockerBuild = "docker build -t " + completeImageName + " .";
execute(dockerBuild);

var dockerLogin = "docker login --username=" + username + " --password=" + password + " --email=" + email + " " + registry;
execute(dockerLogin);

var push = "docker push " + completeImageName;
execute(push);

function execute(command) {
    $EXEC(command);
    print($OUT);
    print($ERR);
}

beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="all">
</beans>

keycloak.json

{
  "realm": "${env.REALM_NAME}",
  "bearer-only": true,
  "auth-server-url": "${env.AUTH_SERVER_URL}",
  "ssl-required": "none",
  "resource": "battleapp-query",
  "enable-cors": true
}

web.xml

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">

    <security-constraint>
        <web-resource-collection>
            <web-resource-name>health</web-resource-name>
            <url-pattern>/resources/health</url-pattern>
        </web-resource-collection>
        <!-- OMIT auth-constraint -->
    </security-constraint>

    <security-constraint>
        <web-resource-collection>
            <web-resource-name>cors</web-resource-name>
            <url-pattern>/*</url-pattern>
            <http-method>GET</http-method>
            <http-method>POST</http-method>
            <http-method>PUT</http-method>
            <http-method>DELETE</http-method>
        </web-resource-collection>
        <auth-constraint>
            <role-name>user</role-name>
        </auth-constraint>
    </security-constraint>

    <login-config>
        <auth-method>KEYCLOAK</auth-method>
        <realm-name>this is ignored currently</realm-name>
    </login-config>

    <security-role>
        <role-name>admin</role-name>
    </security-role>
    <security-role>
        <role-name>user</role-name>
    </security-role>
</web-app>

Copy all the events under ninja.disruptor.battleapp.user.entity.event and the CoreEvent.

All the events have to be in the same package as the events from the command side.

Create the following classes:

package ninja.disruptor.battleapp.health.boundary;

import com.airhacks.porcupine.execution.boundary.Dedicated;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@Path("health")
public class HealthResource {

    @Dedicated
    @Inject
    ExecutorService healthPool;

    @GET
    public void getHealth(@Suspended AsyncResponse response) {
        CompletableFuture
                .supplyAsync(() -> "everything OK!")
                .thenAccept(response::resume);
    }

}
package ninja.disruptor.battleapp.kafka.control;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

import javax.annotation.PostConstruct;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.enterprise.inject.Produces;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;

@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class KafkaProvider {
    public static final String TOPIC = "battleapp";
    public static final String KAFKA_ADDRESS = System.getenv("KAFKA_ADDRESS");
    public static final String GROUP_ID = "battleapp";

    private KafkaProducer<String, String> producer;
    private KafkaConsumer<String, String> consumer;

    @PostConstruct
    public void init() {
        this.producer = createProducer();
        this.consumer = createConsumer();
    }

    @Produces
    public KafkaProducer<String, String> getProducer() {
        return producer;
    }

    @Produces
    public KafkaConsumer<String, String> getConsumer() {
        return consumer;
    }

    public KafkaProducer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_ADDRESS);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<>(properties);
    }

    public KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", KAFKA_ADDRESS);
        properties.put("group.id", GROUP_ID + UUID.randomUUID().toString());
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(TOPIC));
        return consumer;
    }

}
package ninja.disruptor.battleapp.user.boundary;

import com.airhacks.porcupine.execution.boundary.Dedicated;

import javax.inject.Inject;
import javax.ws.rs.*;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@Path("users")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class UserResource {

    @Dedicated
    @Inject
    ExecutorService usersPool;

    @Inject
    UserService service;

    @GET
    public void getUsers(@Suspended AsyncResponse response, @QueryParam("nickname") String nickname) {
        CompletableFuture
                .supplyAsync(service.getUsersFilteredByNickname(nickname), usersPool)
                .thenAccept(response::resume);
    }

}
package ninja.disruptor.battleapp.user.boundary;

import ninja.disruptor.battleapp.InMemoryCache;
import ninja.disruptor.battleapp.user.entity.User;

import javax.ejb.Stateless;
import javax.inject.Inject;
import javax.ws.rs.core.GenericEntity;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

@Stateless
public class UserService {

    @Inject
    InMemoryCache cache;

    public Supplier<GenericEntity<Set<User>>> getUsersFilteredByNickname(String nickname) {
        if (nickname == null || nickname.isEmpty()) {
            return () -> new GenericEntity<Set<User>>(getUsers()) {
            };
        } else {
            return () -> new GenericEntity<Set<User>>(getUsers()
                    .parallelStream()
                    .filter(u -> u.getNickname().toLowerCase().contains(nickname.toLowerCase()))
                    .collect(Collectors.toSet())) {
            };
        }
    }

    public Set<User> getUsers() {
        return new HashSet<>(cache.getUsers().values());
    }

}
package ninja.disruptor.battleapp.user.entity;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import ninja.disruptor.battleapp.eventstore.entity.CoreEvent;
import ninja.disruptor.battleapp.user.entity.event.UserCreated;
import ninja.disruptor.battleapp.user.entity.event.UserFirstNameChanged;
import ninja.disruptor.battleapp.user.entity.event.UserLastNameChanged;
import ninja.disruptor.battleapp.user.entity.event.UserNicknameChanged;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import java.util.List;

@NoArgsConstructor
@ToString
@XmlAccessorType(XmlAccessType.FIELD)
@Data
public class User {
    private String id;
    private String nickname;
    private String firstName;
    private String lastName;

    public User(List<CoreEvent> events) {
        for (CoreEvent event : events) {
            mutate(event);
        }
    }

    public void mutate(CoreEvent event) {
        when(event);
    }

    public void when(CoreEvent event) {
        if (event instanceof UserCreated) {
            this.id = event.getId();
        } else if (event instanceof UserFirstNameChanged) {
            this.firstName = ((UserFirstNameChanged) event).getFirstName();
        } else if (event instanceof UserLastNameChanged) {
            this.lastName = ((UserLastNameChanged) event).getLastName();
        } else if (event instanceof UserNicknameChanged) {
            this.nickname = ((UserNicknameChanged) event).getNickname();
        }
    }

}
package ninja.disruptor.battleapp;

import com.airhacks.porcupine.execution.boundary.Dedicated;
import lombok.Getter;
import ninja.disruptor.battleapp.eventstore.entity.CoreEvent;
import ninja.disruptor.battleapp.kafka.control.KafkaProvider;
import ninja.disruptor.battleapp.user.entity.User;
import ninja.disruptor.battleapp.user.entity.event.UserCreated;
import ninja.disruptor.battleapp.user.entity.event.UserDeleted;
import ninja.disruptor.battleapp.user.entity.event.UserEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.PostConstruct;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonObject;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@Startup
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class InMemoryCache {

    @Getter
    private Map<String, User> users = new HashMap<>();

    @Inject
    KafkaConsumer<String, String> consumer;

    @Inject
    KafkaProducer<String, String> producer;

    @Dedicated
    @Inject
    ExecutorService kafka;

    @Inject
    JsonConverter converter;

    @PostConstruct
    public void onInit() {
        String topicName = getTopicName();
        JsonObject event = Json.createObjectBuilder()
                .add("topicName", topicName)
                .build();

        CompletableFuture
                .runAsync(this::handleKafkaEvent, kafka);

        consumer.subscribe(Arrays.asList(KafkaProvider.TOPIC, topicName));

        producer.send(new ProducerRecord<>(
                KafkaProvider.TOPIC,
                event.toString()));
    }

    public String getTopicName() {
        InetAddress localHost = null;
        try {
            localHost = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        return "replayAllFromStore" + localHost.getHostName();
    }

    public void handleKafkaEvent() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case KafkaProvider.TOPIC:
                        handeEvents(record);
                        break;
                    default:
                        handeEvents(record);
                        break;
                }
            }
        }
    }

    private void handeEvents(ConsumerRecord<String, String> record) {
        try {
            String eventText = record.value();
            System.out.println("eventText = " + eventText);
            List<CoreEvent> events = converter.convertToEvents(eventText);
            for (CoreEvent event : events) {
                handle(event);
            }
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
            e.printStackTrace();
        }
    }

    public void handle(CoreEvent event) {
        if (event instanceof UserCreated) {
            List<CoreEvent> events = new ArrayList<>();
            events.add(event);
            User user = new User(events);
            users.put(event.getId(), user);
        } else if (event instanceof UserDeleted) {
            User user = users.get(event.getId());
            if (user == null) {
                System.out.println("rejected!");
                return;
            }
            users.remove(user.getId());
        } else if (event instanceof UserEvent) {
            User user = users.get(event.getId());
            if (user == null) {
                System.out.println("rejected!");
                return;
            }
            user.mutate(event);
        } else {
            System.out.println("Event not found: " + event.toString());
        }
    }

}
package ninja.disruptor.battleapp;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("resources")
public class JaxRsConfiguration extends Application {
}
package ninja.disruptor.battleapp;

import ninja.disruptor.battleapp.eventstore.entity.CoreEvent;
import ninja.disruptor.battleapp.user.entity.event.*;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class JsonConverter {

    public JsonObjectBuilder convertToJson(CoreEvent event) {
        JsonObjectBuilder jsonEvent = Json.createObjectBuilder()
                .add("name", event.getClass().getName())
                .add("id", event.getId());
        if (event instanceof UserCreated) {
            // no more to do
        } else if (event instanceof UserDeleted) {
            // no more to do
        } else if (event instanceof UserFirstNameChanged) {
            UserFirstNameChanged changedEvent = (UserFirstNameChanged) event;
            jsonEvent = jsonEvent
                    .add("firstName", changedEvent.getFirstName());
        } else if (event instanceof UserLastNameChanged) {
            UserLastNameChanged changedEvent = (UserLastNameChanged) event;
            jsonEvent = jsonEvent
                    .add("lastName", changedEvent.getLastName());
        } else if (event instanceof UserNicknameChanged) {
            UserNicknameChanged changedEvent = (UserNicknameChanged) event;
            jsonEvent = jsonEvent
                    .add("nickname", changedEvent.getNickname());
        } else {
            throw new NotImplementedException();
        }
        return jsonEvent;
    }

    public List<CoreEvent> convertToEvents(String jsonAsString) {
        List<CoreEvent> events = new ArrayList<>();
        InputStream inputStream = new ByteArrayInputStream(jsonAsString.getBytes(Charset.forName("UTF-8")));
        JsonArray eventArray = Json.createReader(inputStream).readArray();
        for (int i = 0; i < eventArray.size(); i++) {
            JsonObject eventObj = eventArray.getJsonObject(i);
            String name = eventObj.getString("name");
            String id = eventObj.getString("id");
            if (UserCreated.class.getName().equals(name)) {
                UserCreated event = new UserCreated(id);
                events.add(event);
            } else if (UserDeleted.class.getName().equals(name)) {
                UserDeleted event = new UserDeleted(id);
                events.add(event);
            } else if (UserFirstNameChanged.class.getName().equals(name)) {
                String firstName = eventObj.getString("firstName");
                UserFirstNameChanged event = new UserFirstNameChanged(id, firstName);
                events.add(event);
            } else if (UserLastNameChanged.class.getName().equals(name)) {
                String lastName = eventObj.getString("lastName");
                UserLastNameChanged event = new UserLastNameChanged(id, lastName);
                events.add(event);
            } else if (UserNicknameChanged.class.getName().equals(name)) {
                String nickname = eventObj.getString("nickname");
                UserNicknameChanged event = new UserNicknameChanged(id, nickname);
                events.add(event);
            } else {
                throw new NotImplementedException();
            }
        }
        return events;
    }
}

Create also a Jenkins pipeline similar to the existing REST service pipeline.