Event Sourcing & CQRS ... & Android

GDG NL DevFest Season 2017
Kalin Maldzhanski @djodjoni

Architectural Patterns ?

GUI Architectural Patterns ?

  • MVC
  • MVP
  • MVVM
  • MVI

Their Purpose:

  • Model interaction between a human and a computer
  • Do it in an effective way

Event Driven ?

What do we use already

  • Functional Programming
  • Event Streams/Reactive streams
  • Push Notifications
  • HTTP SSE text/event-stream
  • WebSocket, Firebase, Deepstream
  • Kafka, Spark, RabbitMQ
  • Actor Model, Akka, Vertx

How are we using events?

Broadcast/Fire and Forget

  • Notify "post viewed"
  • Notify "http request failed"
  • Request to send email confirmation

  • Push Notifications
  • UI events Notifications

State Transfer

  • Live Data
  • BehaviorSubject (RxBus)
  • Room, SQLBrite
  • Firebase

Request Data

  • Query via event (event bus) and receive response

Event Sourcing?

No Wiki Page ?!?!
A pattern where the state of the domain model is represented as a sequence of events(changes) rather than a single snapshot.
Memento
Memento poster.jpg
50 First Dates
50FirstDates.jpg
Actually there is a wiki page :)
History of accounting ...
"more than 7,000 years ago in Mesopotamia"
Do you use event sourced system?

Not just Simple events

  • Events are the result of a successfully executed command
  • Events are immutable
  • Events are things that happened in the past
  • Events cannot be deleted
  • Events store enough information to recover the complete system state
  • Events are published to subscribers once they are stored
  • Events have a single issuer/producer that belongs to a particular domain

ES - The Good

  • Simplicity
  • Flexibility
  • Performance
  • Audit Trail

ES - The Bad

  • Not trivial to query
  • Design of domain events required
  • More storage space needed
  • increased complexity when doing cross domain operations
  • loading large event logs

ES - The Cool

  • Intent Trail
  • Extra Business Value, BI, machine learning, temporal queries
  • Time Travel (undo)
  • Debugging
  • Testing
  • Helps capturing more complete business domain

CQRS

Command Query Responsibility Segregation
Before
After

Wait a minute I know that from DDD?

Domain Driven Design

Domain-driven design (DDD) is an approach to developing software for complex needs by deeply connecting the implementation to an evolving model of the core business concepts.
http://dddcommunity.org/learning-ddd/what_is_ddd/
Some more DDD terminology:
  • Aggregate - autonomous cluster of associated objects
  • Aggregate Root - the entry point to access the aggregate
  • Value Object - immutable, identity-free data
  • Entity - An object defined by its identity
  • Bounded Context - isolated part of a larger system with its own Domain Model
  • SAGA - coordinates long running process (process manager)

ES Considerations

How to delete and edit events?

  • retroactove events - reject or correct previous events
  • Rewrite Event Log

What if there are 1 billion events to replay?

  • Snapshots
  • Rewrite event log
  • Better design aggregates

Event types and versioning?

  • Rewrite Event Log
  • design and document good event taxonomy
  • ex: "https://myapp.io/tax/incrementQty"

CQRS & Command handling

  • Commands can be rejected or accepted
  • Command validation
  • Domain validation
  • Validation across multiple domains/aggregates
    • Use Query module
    • Use Process Managers
  • distribute validation workload across shards
When to use CQRS and ES?
  • Complex Domains
  • Distributed Systems
  • Task Based Applications
Refactoring my Android App?
todo-mvp-rxjava
Peek Inside Repository

     override fun completeTask(task: Task) {
        checkNotNull(task)
        mTasksRemoteDataSource.completeTask(task)
        mTasksLocalDataSource.completeTask(task)

        val completedTask = Task(task.title, task.description, task.id, true)

        // Do in memory cache update to keep the app UI up to date
        if (mCachedTasks == null) {
            mCachedTasks = LinkedHashMap()
        }
        mCachedTasks!!.put(task.id, completedTask)
    }
            
Peek Inside Repository

    override fun getTasks(): Flowable<List<Task>> {
        // Respond immediately with cache if available and not dirty
        if (mCachedTasks != null && !mCacheIsDirty) {
            return Flowable.fromIterable(mCachedTasks!!.values).toList().toFlowable()
        } else if (mCachedTasks == null) {
            mCachedTasks = LinkedHashMap()
        }

        val remoteTasks = andSaveRemoteTasks

        if (mCacheIsDirty) {
            return remoteTasks
        } else {
            // Query the local storage if available. If not, query the network.
            val localTasks = andCacheLocalTasks
            return Flowable.concat(localTasks, remoteTasks)
                    .filter { tasks -> !tasks.isEmpty() }
                    .firstOrError()
                    .toFlowable()
        }
    }
            
To Be ... "simpler"
Command:

data class Command(
        val subjectID: String?,
        val userID: String?,
        val datetime: Long,
        val cmdID: String,
        val cmdData: Any?
)
                       
Event:

data class Event(
        val subjectID: String?,
        val userID: String?,
        val datetime: Long,
        val eventHash: String,
        val eventID: String,
        val eventData: Any?
)
                       
Identify Commands & Events

            class TaskDetailPresenter(...
             ...
                fun deleteTask()
                //deleteTask / taskDeleted
                fun activateTask()
                //activateTask / taskActivated
                fun completeTask()
                //completeTask / taskCompleted
            
            

            class AddEditTaskPresenter
             ...
                fun saveTask(title: String, description: String)
                //updateTitle / titleUpdated
                //updateContent / contentUpdated
            
            

              class TasksPresenter(...
              ...
                    fun addNewTask()
                    //createTask / taskCreated
            
            

class TasksAR {
    fun createTask() {}
    fun updateTitle(taskID:Int, title: String) {}
    fun updateContent(taskID:Int, content: String) {}
    fun activateTask(taskID:Int) {}
    fun completeTask(taskID:Int) {}
    fun deleteTask(taskID:Int) {}
}
            
            
mTasksRepository.XXX --> mTasksAR.XXX
Persist events locally

@Entity(tableName = "events")
data class Event(
        @ColumnInfo(name = "subjectID")
        var subjectID: String?,
        @ColumnInfo(name = "userID")
        var userID: String?,
        @ColumnInfo(name = "datetime")
        var datetime: Long,
        @ColumnInfo(name = "eventHash")
        var eventHash: String,
        @ColumnInfo(name = "eventID")
        var eventID: String,
        @ColumnInfo(name = "eventData")
        var eventData: Any?
)
                

@Dao interface EventsPublisher {
    @Insert
    fun publishEvent(task: Event)

}
                

class TasksAR
@Inject constructor(val eventsPublisher:EventsPublisher) {
    fun createTask() {
        eventsPublisher.publishEvent(Event("createTask"))
    }
    fun updateTitle(taskID:Int, title:String) {
        doValidate("updateTitle", taskID, title)
        eventsPublisher.publishEvent(Event("updateTitle", taskID, title))
    }
    fun updateContent(taskID:Int, content:String) {
        doValidate("createTask", taskID, content)
        eventsPublisher.publishEvent(Event("createTask", taskID, content))
    }
    fun activateTask(taskID:Int) {
        doValidate("createTask", taskID)
        eventsPublisher.publishEvent(Event("createTask", taskID))
    }
    fun comcompleteTask(taskID:Int) {
        doValidate("createTask", taskID)
        eventsPublisher.publishEvent(Event("createTask", taskID))
    }
    fun deleteTask(taskID:Int) {
        doValidate("createTask", taskID)
        eventsPublisher.publishEvent(Event("createTask", taskID))
    }
}
                
Firebase as remote event store
  • Easy and Serverless
  • Reactive - can subscribe to events
  • Can easily hook up to Cloud Functions
Side effect: things like this will not be needed in your app

  viewHolder.bindToPost(model, new View.OnClickListener() {
                    @Override
                    public void onClick(View starView) {
                        // Need to write to both places the post is stored
                        DatabaseReference globalPostRef = mDatabase.child("posts").child(postRef.getKey());
                        DatabaseReference userPostRef = mDatabase.child("user-posts").child(model.uid).child(postRef.getKey());
                        // Run two transactions
                        onStarClicked(globalPostRef);
                        onStarClicked(userPostRef);
                    }
                });
            
            
We need to modify publisher listener to post to firebase
and we need to listen for incoming events

FirebaseDatabase.getInstance().getReference().child("events")
    .addChildEventListener(new ChildEventListener()
    {
        fun onChildAdded(dataSnapshot: DataSnapshot, s: String) {
            postCommand(Command(dataSnapshot))
        }
    }
                
Use Cloud Functions

  functions.database.ref('/events/{eventID}')
    .onWrite(event => {
      const newVal = process(event.data);
      makeHttpCall(newVal);
      return readModel.ref.child(event.params.subjectId).set(newVal);
    });
            

           // TODO: Many more options and scenarios to consider.
        

Takeaways

  • Be pragmatic but keep your eyes open
  • Model your domain not your data storage
  • Think functional when things are functional
  • Actor Model is cool
  • Divide only when needed, but always consider it
  • Process managers are great when they can fit in your head

Happy Event Sourcing!

Thank You!