The command REST service
We're going to change the existing REST service to be the command service.
We delete the InMemoryCache
class because this class was only responsible
for querying.
Delete the getUsers
method in the UserResource
class.
Delete the getUsers
and the getUsersFilteredByNickname
methods in the
UserService
class.
Add Kafka
We use Kafka for pub sub messaging. Therefore we need the following dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
With the clients library we're able to implement a provide for the Kafka consumer and producer:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import javax.annotation.PostConstruct;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.inject.Produces;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
@Startup
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class KafkaProvider {
public static final String KAFKA_ADDRESS = System.getenv("KAFKA_ADDRESS");
public static final String TOPIC = "battleapp";
public static final String GROUP_ID = "battleapp";
private KafkaProducer<String, String> producer;
private KafkaConsumer<String, String> consumer;
@PostConstruct
public void init() {
this.producer = createProducer();
this.consumer = createConsumer();
}
@Produces
public KafkaProducer<String, String> getProducer() {
return producer;
}
@Produces
public KafkaConsumer<String, String> getConsumer() {
return consumer;
}
public KafkaProducer<String, String> createProducer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_ADDRESS);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(properties);
}
public KafkaConsumer<String, String> createConsumer() {
Properties properties = new Properties();
properties.put("bootstrap.servers", KAFKA_ADDRESS);
properties.put("group.id", GROUP_ID + UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(TOPIC));
return consumer;
}
}
And a worker thread that handles the initialization events of the query services:
import com.airhacks.porcupine.execution.boundary.Dedicated;
import ninja.disruptor.battleapp.eventstore.control.EventStore;
import ninja.disruptor.battleapp.eventstore.control.JsonConverter;
import ninja.disruptor.battleapp.user.entity.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.PostConstruct;
import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonObject;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@Startup
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class KafkaConsumerWorker {
public static final Charset CHARSET = Charset.forName("UTF-8");
public static final String TOPIC_NAME = "topicName";
@Dedicated
@Inject
ExecutorService kafka;
@Inject
JsonConverter converter;
@Inject
EventStore store;
@Inject
KafkaProducer<String, String> producer;
@Inject
KafkaConsumer<String, String> consumer;
@PostConstruct
public void init() {
CompletableFuture
.runAsync(this::handleKafkaEvent, kafka);
}
public void handleKafkaEvent() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case KafkaProvider.TOPIC:
try {
handleInitializationEvent(record);
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
break;
default:
System.out.println("ERROR: Illegal topic: " + record.topic());
}
}
}
}
private void handleInitializationEvent(ConsumerRecord<String, String> record) {
String jsonAsString = record.value();
InputStream inputStream = new ByteArrayInputStream(jsonAsString.getBytes(CHARSET));
JsonObject event = Json.createReader(inputStream).readObject();
if (event == null) {
return;
}
String topicName = event.getString(TOPIC_NAME);
if (topicName == null) {
return;
}
String eventsAsJsonString = converter
.convertToJson(store
.loadEventStream(User.class.getName())
.getEvents())
.toString();
producer.send(new ProducerRecord<>(
topicName,
eventsAsJsonString));
}
}
In the EventStore
we've to replace the JavaEE events with Kafka events.
Replace the Event<String> bus
with the KafkaProducer<String, String> producer
:
...
@Inject
Event<String> bus;
...
String eventsAsJsonString = converter
.convertToJson(events)
.toString();
bus.fire(eventsAsJsonString);
...
...
@Inject
KafkaProducer<String, String> producer;
...
String eventsAsJsonString = converter
.convertToJson(events)
.toString();
producer.send(new ProducerRecord<>(
KafkaProvider.TOPIC,
eventsAsJsonString));
...