Survive fixing data on PROD (Golang)
Sooner or later any system in production requires data fixes. The most popular reasons for this are:
- data corruption due to internal bug
- breaking changes when a new data model becomes incompatible with the previous one
- massive errors due to improper use of the system by the end users
A few years ago it was common practice to write SQL-scripts that fixies the data. But as time goes by, things get more complicated, and this approach no longer works in general for the following reasons:
- modern applications usually have several different storages at the same time (relational database, full-text search, distributed cache, OLAP storage)
- in a micro-service environment corrupted data can span multiple services
- complex data fixes with SQL-script are very difficult to write and test, not reliable and lead to other errors and data corruptions
- execution of SQL-scripts is difficult to trace and monitor in a production environment
- execution of SQL-scripts isn’t always possible due to the customer policy
Requirements
First, let’s list the basic requirements for a solution that makes data fixes reliable and efficient :
- ability to fix data across multiple storages
- ability to write complex logic in a high-level programming language
- testability with unit tests as well as integration tests
- control over execution remotely without physical access to the production environment
- tracing and monitoring: storing execution history, checking progress, checking execution results
Solution
Here I‘ll show you the approach I usually use in my projects.
These are architectural highlights:
- data fixes are implemented in a separate micro-service. Let’s name it “datafix”. It has its own CI/CD pipeline, authorization mechanism, REST API and storage
- every execution is uniquely identified. The same data fix can be executed several times
- execution can take input parameters
- REST API provides endpoints to initiate, interrupt and monitor executions
- the service supports immediate execution as well as scheduled execution described by the cron expression
- execution history is stored permanently and accessible via the API. Storing errors and execution details is mandatory
- support for interrupting long-running executions
- “datafix" service has access to all the micro-services as well as direct access to storages, although direct interaction with storages should be avoided
- using an existent API of the business services in data fix logic instead of developing new code is the preferred way whenever possible
Implementation
As you can see on the figure below, “datafix” is implemented as a separate service with the ability to connect directly to all other services and storages
Internally it consists of the following parts:
- task and jobs
- repositories
- implementation of the data fixes
- REST API
Tasks and jobs
Task is something that can be executed with support all the features described above. There should be a task for each data fix that contains basic info like tracking number and changes description. A task must be registered in the TaskManager
which is responsible for tasks dispatching.
To schedule task execution there is a job object which contains execution details. Each execution has status and progress information
You can see basic types and interfaces in the code snippet
type Task interface {
// GetId is a unique ID of a task
GetId() string
// GetJiraTaskNumber is a JIRA number which is related to the task
GetJiraTaskNumber() string
// GetDescription is a task description (describe what you are doing and why)
GetDescription() string
// Execute executes by task scheduler
Execute(ctx context.Context, job *Job, execution *JobExecution) (map[string]interface{}, error)
}
type ProgressHandler func(stage string, val uint64)
// TaskProgress allows a task to support execution progress
type TaskProgress interface {
// SetHandlers must be implemented in a task which supports progress info
SetHandlers(onInit, onProgress ProgressHandler)
}
// TaskManager is responsible for task registration
type TaskManager interface {
// Register registers a task
Register(task Task)
// GetTask retrieves a task by ID
GetTask(ctx context.Context, taskId string) (Task, error)
}
// JobSchedule defines scheduling params
type JobSchedule struct {
StartAt *time.Time // StartAt specifies a timestamp when execution has to be started
CronExp *string // CronExp specifies a cron expression
}
// JobRequest is a request for a new job
type JobRequest struct {
TaskId string // TaskId is id of the task to be executed
Schedule *JobSchedule // Schedule specifies scheduling params
Params map[string]interface{} // Params defines arbitrary input params
}
// Job is a job instance
type Job struct {
Id string // Id job ID
TaskId string // TaskId ID of a task to execute
Schedule *JobSchedule // Schedule specifies scheduling params
CancelledAt *time.Time // CancelledAt is a timestamp of job cancellation
NextExecution *time.Time // NextExecution is the next execution time
LastExecution *time.Time // LastExecution is the last execution time
Params map[string]interface{} // Params arbitrary input params
}
// JobExecution is a job execution
type JobExecution struct {
Id string // Id execution ID
JobId string // JobId job ID
TaskId string // TaskId task ID
StartedAt *time.Time // StartedAt when execution started
FinishedAt *time.Time // FinishedAt when execution finished
Status string // Status execution status
Details map[string]interface{} // Details execution details
Error string // Error populated if execution failed
Progress JobExecutionProgressStages // Progress represents progress info
}
// ProgressStage is a stage of progress info
type ProgressStage struct {
TotalItems uint64 // TotalItems is a total number of items to process
ProcessedItems uint64 // ProcessedItems is a number of items that has been already processed
}
// JobExecutionProgressStages is a multiple stages per execution
type JobExecutionProgressStages map[string]*ProgressStage
Repositories
Repositories allow data fixes to interact with the outside world (internal and external services, storages, whatever).
Implementation of data fixes
Implementation of a new data fix requires going through the following steps:
- creating a new task object by implementing
Task
interface - writing data fix logic according to the requested changes
- writing unit tests by mocking repository interfaces
- registration a new task in the
TaskManager
. Once task is registered, it becomes available for execution - writing integration tests with sampled data
REST API
Tasks and jobs provide REST API for job execution and monitoring
.../datafixes/tasks/{taskId} GET - retrieves a task by ID
.../datafixes/tasks/{taskId}/jobs POST - creates a new job for the task
.../datafixes/jobs/{jobId} GET - retrieves a job by ID
.../datafixes/jobs/{jobId}/executions GET - retrieves job executions
.../datafixes/executions/{execId} GET - retrieves an execution
.../datafixes/executions/{execId}/progress GET - retrieves an execution progress
.../datafixes/executions/{execId}/interrupt POST - interrupts ongoing execution
Tracing progress
It’s up to task implementation how to populate the progress info. Typically, the task should calculate the total number of items to be processed before the processing and then periodically update the number of processed items.
Having progress info is very useful for someone who is tracing a job execution to make sure that the execution isn’t stuck.
Conclusion
At first glance, fixing data is nothing more than writing and executing SQL- scripts. But this is no longer the case, at least in a micro-service environment that requires a more sophisticated and reliable technique.
I hope my experience will give someone new ideas on how to improve their own systems and processes….