Change the REST service to use event sourcing and Cassandra
Event Sourcing in the REST service
In the book
Implementing Domain Driven Design
there's a chapter with a C# implementation of Event Sourcing. I've implemented
my version of the Event Sourcing based on this example from the book.
The implementation is under the following package
ninja.disruptor.battleapp.eventstore
in
this repository.
Under ninja.disruptor.battleapp.user.entity.event
we've to create the needed events for the User
.
Our user has the following properties:
private String id;
private String nickname;
private String firstName;
private String lastName;
Therefore we need these event classes:
@AllArgsConstructor
@Getter
public class UserEvent implements CoreEvent {
private final String id;
}
public class UserCreated extends UserEvent {
public UserCreated(String id) {
super(id);
}
}
public class UserDeleted extends UserEvent {
public UserDeleted(String id) {
super(id);
}
}
@Getter
public class UserFirstNameChanged extends UserEvent {
private final String firstName;
public UserFirstNameChanged(String id, String firstName) {
super(id);
this.firstName = firstName;
}
}
@Getter
public class UserLastNameChanged extends UserEvent {
private final String lastName;
public UserLastNameChanged(String id, String lastName) {
super(id);
this.lastName = lastName;
}
}
@Getter
public class UserNicknameChanged extends UserEvent {
private final String nickname;
public UserNicknameChanged(String id, String nickname) {
super(id);
this.nickname = nickname;
}
}
At next we've to adapt our User
entity itself:
@NoArgsConstructor
@ToString
@XmlAccessorType(XmlAccessType.FIELD)
@Data
public class User {
private String id;
private String nickname;
private String firstName;
private String lastName;
@XmlTransient
private final List<CoreEvent> changes = new ArrayList<>();
public User(User copy) {
this.id = copy.getId();
this.nickname = copy.getNickname();
this.firstName = copy.getFirstName();
this.lastName = copy.getLastName();
}
public User(List<CoreEvent> events) {
for (CoreEvent event : events) {
mutate(event);
}
}
public void changeFirstName(String firstName) {
apply(new UserFirstNameChanged(id, firstName));
}
public void changeLastName(String lastName) {
apply(new UserLastNameChanged(id, lastName));
}
public void changeNickname(String nickname) {
apply(new UserNicknameChanged(id, nickname));
}
public void create(String id) {
apply(new UserCreated(id));
}
public void delete() {
apply(new UserDeleted(id));
}
public void mutate(CoreEvent event) {
when(event);
}
public void apply(CoreEvent event) {
changes.add(event);
mutate(event);
}
public void when(CoreEvent event) {
if (event instanceof UserCreated) {
this.id = event.getId();
} else if (event instanceof UserFirstNameChanged) {
this.firstName = ((UserFirstNameChanged) event).getFirstName();
} else if (event instanceof UserLastNameChanged) {
this.lastName = ((UserLastNameChanged) event).getLastName();
} else if (event instanceof UserNicknameChanged) {
this.nickname = ((UserNicknameChanged) event).getNickname();
}
}
}
Implement a createUser
method in our UserService
EJB:
...
@Inject
EventStore store;
public Supplier<Response> save(User user, UriInfo info) {
User saved = create(user.getId());
if (user.getFirstName() != null) {
saved = changeFirstName(saved.getId(), user.getFirstName());
}
if (user.getLastName() != null) {
saved = changeLastName(saved.getId(), user.getLastName());
}
if (user.getNickname() != null) {
saved = changeNickname(saved.getId(), user.getNickname());
}
String id = saved.getId();
URI uri = info
.getAbsolutePathBuilder()
.path("/" + id)
.build();
final User postSaveUser = new User(saved);
return () -> Response
.created(uri)
.entity(new User(postSaveUser))
.build();
}
public User create(String id) {
User player = new User(new ArrayList<>());
player.create(id);
store.appendToStream(new EventIdentity(User.class, id), 0L, player.getChanges());
return player;
}
public User changeFirstName(String id, String firstName) {
EventStream stream = store.loadEventStream(new EventIdentity(User.class, id));
User user = new User(stream.getEvents());
user.changeFirstName(firstName);
store.appendToStream(new EventIdentity(User.class, id), stream.getVersion(), user.getChanges());
return user;
}
public User changeLastName(String id, String lastName) {
EventStream stream = store.loadEventStream(new EventIdentity(User.class, id));
User user = new User(stream.getEvents());
user.changeLastName(lastName);
store.appendToStream(new EventIdentity(User.class, id), stream.getVersion(), user.getChanges());
return user;
}
public User changeNickname(String id, String nickname) {
EventStream stream = store.loadEventStream(new EventIdentity(User.class, id));
User user = new User(stream.getEvents());
user.changeNickname(nickname);
store.appendToStream(new EventIdentity(User.class, id), stream.getVersion(), user.getChanges());
return user;
}
...
To expose the create
method we've to create a REST service method in the UserResource
as well.
@POST
public void save(@Suspended AsyncResponse response, @Context UriInfo info, User user) {
CompletableFuture
.supplyAsync(service.save(user, info), usersPool)
.thenAccept(response::resume);
}
Our event sourcing implementation uses Cassandre therefore we've to add the following dependencies:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.2</version>
</dependency>
To configure Cassandra also have to create a CassandraProvider
:
@Singleton
public class CassandraProvider {
public static final String CASSANDRA_ADDRESS = "CASSANDRA_ADDRESS";
public static final String KEYSPACE = "battleapp";
private Session session;
@Produces
public Session getSession() {
if (session == null) {
String address = "localhost";
String cassandraEnv = System.getenv(CASSANDRA_ADDRESS);
if (cassandraEnv != null && !cassandraEnv.isEmpty()) {
address = cassandraEnv;
}
Cluster cluster = Cluster.builder()
.addContactPoints(address)
.build();
session = cluster.connect(KEYSPACE);
}
return session;
}
}
To display the created events we need a database or in our case just a simple Map
to store the
users.
@Startup
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
public class InMemoryCache {
@Getter
private Map<String, User> users = new HashMap<>();
@Inject
JsonConverter converter;
@Inject
EventRepository repository;
@PostConstruct
public void init() {
repository
.findAllUsers()
.parallelStream()
.forEach(u -> users.put(u.getId(), u));
}
public void onEvent(@Observes String jsonEventString) {
List<CoreEvent> events = converter.convertToEvents(jsonEventString);
for (CoreEvent event : events) {
handle(event);
}
}
public void handle(CoreEvent event) {
if (event instanceof UserCreated) {
List<CoreEvent> events = new ArrayList<>();
events.add(event);
User user = new User(events);
users.put(event.getId(), user);
} else if (event instanceof UserDeleted) {
User user = users.get(event.getId());
if (user == null) {
System.out.println("rejected!");
return;
}
users.remove(user.getId());
} else if (event instanceof UserEvent) {
User user = users.get(event.getId());
if (user == null) {
System.out.println("rejected!");
return;
}
user.mutate(event);
} else {
System.out.println("Event not found: " + event.toString());
throw new NotImplementedException();
}
}
}
Now we can change our users
REST service to use the users from the db instead the hardcoded
test users. In the UserService
we've to include the following parts:
...
@Inject
InMemoryCache cache;
public Supplier<GenericEntity<Set<User>>> getUsersFilteredByNickname(String nickname) {
if (nickname == null || nickname.isEmpty()) {
return () -> new GenericEntity<Set<User>>(getUsers()) {
};
} else {
return () -> new GenericEntity<Set<User>>(getUsers()
.parallelStream()
.filter(u -> u.getNickname().toLowerCase().contains(nickname.toLowerCase()))
.collect(Collectors.toSet())) {
};
}
}
public Set<User> getUsers() {
return new HashSet<>(cache.getUsers().values());
}
...
And in our UserResource
:
@GET
public void getUsers(@Suspended AsyncResponse response, @QueryParam("nickname") String nickname) {
CompletableFuture
.supplyAsync(service.getUsersFilteredByNickname(nickname), usersPool)
.thenAccept(response::resume);
}
Changes to the test environment
That we can use the Docker repository we've to create the same
docker-registry
secret
from the production environment for the
test environment too.
kc --namespace test create secret docker-registry registrykey --docker-username=rob --docker-password=1234 --docker-email=brem_robert@hotmail.com --docker-server=disruptor.ninja:30500
The following changes have to be made to the start.js
:
...
var name = "battleapp";
var cassandraHost = "cassandra";
var namespace = "test";
...
var deleteDeployment = kubectl + " --namespace " + namespace + " delete deployment " + name;
...
dfw.write("metadata:\n");
dfw.write(" namespace: " + namespace + "\n");
...
dfw.write(" - name: CASSANDRA_ADDRESS\n");
dfw.write(" value: \"" + cassandraHost + "\"\n");
...
var deleteService = kubectl + " --namespace " + namespace + " delete service " + name;
...
sfw.write("metadata:\n");
sfw.write(" name: " + name + "\n");
sfw.write(" namespace: " + namespace + "\n");
...
Changes to the canary release
The following changes have to be made to the start.js
:
...
var cassandraHost = "cassandra";
...
dfw.write(" - name: CASSANDRA_ADDRESS\n");
dfw.write(" value: \"" + cassandraHost + "\"\n");
...
Changes to the consumer driven contract test
We need the following two dependencies that we can call the create method of the REST service:
<dependency>
<groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-processing</artifactId>
<version>2.12</version>
<scope>test</scope>
</dependency>
The test itself has to be replaced with the following code:
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class BattleAppIT {
public static final String ID = "id";
public static final String NICKNAME = "nickname";
public static final String FIRST_NAME = "firstName";
public static final String LAST_NAME = "lastName";
public static final String LOCATION = "Location";
@Rule
public JAXRSClientProvider provider = buildWithURI("http://" + System.getenv("HOST") + ":" + System.getenv("PORT") + "/battleapp/resources/users");
private String nickname = "rob";
private String firstName = "Robert";
private String lastName = "Brem";
@Test
public void a01_shouldCreateRob() throws IOException {
JsonObjectBuilder userBuilder = Json.createObjectBuilder();
JsonObject playerToCreate = userBuilder
.add(ID, UUID.randomUUID().toString())
.add(NICKNAME, nickname)
.add(FIRST_NAME, firstName)
.add(LAST_NAME, lastName)
.build();
String token = KeycloakTokenCreator
.getTokenResponse(
System.getenv("APPLICATION_USER_NAME"),
System.getenv("APPLICATION_PASSWORD"))
.getToken();
Response postResponse = provider
.target()
.request()
.header("Authorization", "Bearer " + token)
.post(Entity.json(playerToCreate));
System.out.println("postResponse = " + postResponse);
assertThat(postResponse.getStatus(), is(201));
String location = postResponse.getHeaderString(LOCATION);
}
@Test
public void a02_shouldReturnRob() throws IOException {
String token = KeycloakTokenCreator
.getTokenResponse(
System.getenv("APPLICATION_USER_NAME"),
System.getenv("APPLICATION_PASSWORD"))
.getToken();
String response = provider
.target()
.request()
.header("Authorization", "Bearer " + token)
.get(String.class);
assertThat(response, containsString(nickname));
}
}