Our Search for a better Search : ES Indexing Architecture

On our mission to ensure that millions meet their soulmates – ‘search‘ plays a crucial role in finding our perfect matches.

But when we wanted to introduce new fields in ElasticSearch(ES) to add new search feature requirements, it wasn’t scalable with our old architecture. After going live, it would have taken us 4-5 days for the field to be indexed in ES for millions of users. And this was affecting the velocity of our project.

There were also intermittent database load issues which impacted the systems that talk to databases and were affecting our users’ experience. 

These were long-standing issues and in instances where real-time data and velocity mattered it became crucial to think of an architecture which was more real-time, scalable, robust and one which would make intelligent use of resources.

Time to RETHINKour Indexing Architecture

At shaadi.com for all search features we use ElasticSearch!

But what is Elasticsearch ?
It is a distributed, open source search and analytics engine for all types of data – including textual, numerical, geospatial, structured, and unstructured data.  Indexing and Search are two main operations of Elasticsearch.
Indexing – Raw data flows into Elasticsearch from a variety of sources. Data ingestion is the process by which raw data is parsed, normalized, and enriched before it is written in Elasticsearch.
Search – Once indexed in Elasticsearch, we can run complex queries against this data and use aggregations to retrieve complex summaries of this data.

Let’s first understand the old architecture which led us to redesign ES indexing architecture.

THE Old Indexing Architecture
Our old architecture was struggling to survive and seriously needed some repairs
  • Traditional cron based architecture and not real-time
  • Scaling issues – With the ever increasing traffic it was painful to scale
  • Maintaining users pool table – Each service modifying profile info of user had to write to this table for it to be picked up by a cron and indexed to ES
  • Maintaining pointer of processed records and status log file
  • Controlling indexing rate was not easy – Either increasing or reducing indexing rate
  • Backpopulation of document was a big challenge
  • Database overhead

Following is the old indexing architecture flow which will give you a better idea of our challenges

When a member updates their profile information like city/state/profession/etc, their user id and timestamp gets tracked in the index_pool table waiting to be picked up by a cron(scheduled to run every 5 mins). 

The cron job then picks chunks of unprocessed members from the index_pool table and then pulls up all the profile demographics of each member from the db (joins over dozens of tables to select hundred of fields) and bulk indexes the entire document to Elasticsearch. 

On a successful index operation it will mark that chunk of members as processed in the index_pool table and updating status_log file with timestamp till members were processed. To overcome these drawbacks we conceptualized and implemented a new indexing style.

Before jumping to the new architecture, let’s understand some of its building components!

Components of THe new indexing architecture

Click to expand

Binlog

The binary log is a set of log files that contains information about data modifications made to a MySQL server instance. Read more about Binary logshow binlog event , python mysql replication here.

Binlog Events :
A single event is generated per insert/update/delete operation per row i.e. each event contains metadata of single row on which operation was performed. Common metadata of events are database name, table name, type of operation, row data, etc.

1. update event
 – Contains an old field which has a set of columns and values before the update operation, while data contains an entire row (column name and values) after update. In the below example member_status was updated from ‘Active’ to ‘Hidden’.

{"database":"fooDB","table":"member_data","type":"update","ts":1591341780,"xid":296392985,"commit":true,"data":{"id":23232,"name":"Vishal S","member_id":"SH123456","regdate":20190723152935,"member_status":"Hidden","logindate":20200522204607,"gender":"Male","country":"India"},"old":{"member_status":"Active"}}

2. insert event – It does not contain an old field like update event since new record is inserted so it has a data field which contains an entire row – (column name and its values) that was inserted.

{"database":"fooDB","table":"member_data","type":"insert","ts":1591342963,"xid":296461450,"commit":true,"data":{"id":23233,"name":"Vishal S","member_id":"SH123456","regdate":20190723152935,"member_status":"Active","logindate":20200522204607,"gender":"Male","country":"India"}}

3. delete event – This event like insert has only data field and doesn’t have old field, data field is an entire row (column name and values) that was deleted from mysql db table.

{"database":"fooDB","table":"member_data","type":"delete","ts":1591343073,"xid":296469225,"commit":true,"data":{"id":23233,"name":"Vishal S","member_id":"SH123456","regdate":20190723152935,"member_status":"Active","logindate":20200522204607,"gender":"Male","country":"India"}}

Maxwell

We used Maxwell’s daemon to publish binlog event to kafka. Maxwell daemon is an application that reads MySQL binlogs and writes row updates as JSON to Kafka, Kinesis, or other streaming platforms. It has low operational overhead and provides bootstrap(replay binlog events) functionality. Read more about Maxwell here

1. Run maxwell via cli (or config file)
Prerequisite is you should create a Maxwell user in db granting replication privileges. With the below Maxwell config we produce binlog events from db and tables that are included in the filter options to kafka. client_id has to be unique for each maxwell daemon
bin/maxwell
–user=’maxwell’ –password=’xxxxxx’ –host=’127.0.0.1′ –producer=kafka –kafka.bootstrap.servers=kafka-broker1.example:9092,kafka-broker2.example:9092 –kafka_topic=sample_db_stream –replica_server_id 1 –client_id db_realtime –filter=’exclude: *.*, include: fooDB.member_data, include: fooDB.profession_table, include: fooDB.subscription_table’

2. Bootstrapping
With the below SQL insert in Maxwell.bootstrap table we trigger Maxwell to bootstrap records/rows events on Maxwell utility running with a client id max_client_backfill from member_data table of fooDB database, only the records of members which are active and having lastlogindate less than 20200528172000

insert into maxwell.bootstrap (database_name, table_name, where_clause, client_id) values (‘fooDB’, ‘member_data’, ‘member_status = “Active” and login_date <= 20200528172000’,  ‘max_client_backfill’);

es-data-service

Binlog gives information about an operation on a single row. But what if we need to index a field in Elasticsearch which is dependant on various tables or multiple rows of the same table? ES fields can be categorized as below into two types – simple and derived.

Simple Fields: are fields which are not dependant on any other tables or rows. Such fields can be indexed with/without requiring simple logic like type-casting or text-id transformation.

Example:  marital_status_code and location_id are simple fields as there are no joins or group by

SELECT marital_status_code, location_id FROM member_data where member_id = 'SH123456';

Derived Fields: are the fields for which the value needs to be derived from multiple tables or multiple rows of the same table.

Example: photo_rank can be termed as a derived field as it requires to join over other tables to derive its value.

SELECT if(fooBazTable.photo="Yes" and BazBarTable.is_visible="Yes",1,0) as photo_rank
FROM fooBazTable
LEFT JOIN BazBarTable ON fooBazTable.member_id = BazBarTable.member_login
WHERE fooBazTable.member_id = 'SH123456';


HOW do we INDEX DERIVED FIELDs ? 

The answer is es data service!
It is a microservice written in Golang which will take the db_name.column_name as input and in response returns a list of ES fields with its values fetched from the database.

Kafka

Apache Kafka is a distributed streaming platform.

A streaming platform has three key capabilities:
– Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system
– Store streams of records in a fault-tolerant durable way
– Process streams of records as they occur
Kafka is generally used for two broad classes of applications:
– Building real-time streaming data pipelines that reliably get data between systems or applications
– Building real-time streaming applications that transform or react to the streams of data

More about Kafka here.

our new Indexing architecture

The new architecture has two flows – real-time and backpopulation(non-realtime)

Real-time indexing flow
In real-time flow, data is available in ES within milliseconds of data being updated.

USER UPDATES profile data  –> MYSQL  –> BINLOG –> MAXWELL publishes binlog event to –> KAFKA -> KAFKA CONSUMER reads binlog event(enriches data, may/may not call ES-DATA-SERVICE) –> ELASTICSEARCH

– For every DML operation Maxwell daemon ‘maxwell-streaming-etl-shaadi-db1’ produces its binlog events as a message to Kafka topic ‘streaming_realtime’.
– Kafka consumer ‘es-indexing-service’ written in Python subscribes to Kafka topic ‘streaming_realtime’ reads and enriches data or applies some business rules before indexing it to ES.
– If consumer ‘es-indexing-service’ receives update events of a simple field we directly index to ES. No SQL queries are needed here! 
– 
If consumer ‘es-indexing-service’ consumer receives binlog events of a derived field we call ‘es-data-service’ which in response returns a list of ES field-value to be indexed in ES.

Backpopulation indexing flow

Let me first explain the purpose of backpopulation indexing. 
Suppose you want to introduce a new field ‘education’ in ES, in the real-time flow ‘education’ will get indexed in ES when education is updated by user. How do you index ‘education’ of an existing user who is not updating his/her education? Backpopulation of field will do that!
We used separate set of maxwell-kafka topic-consumer so when we backpopulate for millions of users it won’t lag and impact real-time events


Now how does backpopulation work?

 – Maxwell daemon ‘maxwell-shaadi-es-backfill’ connects to DB1 and based on bootstrapping condition given, replays and produces binlog events to Kafka topic ‘streaming_non_realtime’.
 – ‘es-backfill-service’ written in Python acts as a consumer to Kafka topic ‘streaming_db1_backfill’.
– ‘es-backfill-service’ calls es-data-service to get values of fields that need to be backpopulated and indexes it to ES.

THE New Architecture is Awesome!
  • Near to real-time indexing – Data is available in ES within milliseconds of being updated
  • Single true source of data – Since all DML events are read from the binlog irrespective of which application updates profile data, it gets written to ES. Incase of old architecture, if any application misses writing to pool table than update won’t go to ES
  • Robust and scalable
  • Containerized CI/CD deployment
  • Can easily control indexing rate – We can increase/reduce consumer service containers to control our indexing rate
  • Exponential backoff technique
    Exponential backoff is a technique where we add incremental delay in the Fibonacci fashion, having an upper limit when underlying systems/infra like Elasticsearch/DB/etc. are facing downtime or are under stress so that we do not overload them and allow them to recover by throwing less/no traffic. After each delay we check if the subsequent request succeeds or not, if the request fails again delay is increased, if it succeeds the delay is reduced.

    For example when the number of requests in the ES queue reaches its maximum capacity, ES starts throwing write rejections. In this scenario, if we keep sending requests at usual speed then ES will keep throwing rejections, so it’s better to offload ES with lesser number of requests till it recovers.

    We have implemented an auto-recovery logic where once the underlying systems are healthy again, on every subsequent successful request we reduce delay exponentially and indexing rate is restored to normal.
  • Lesser database overhead and reduced load on db 
    Since updates are read from binlog events there is no need to separately query db to get the profile info, hence reduced load on db. In the new architecture we require querying db but only for derived fields but they are very few in number as compared to simple fields.
  • Multiple data sources – We can have various data sources like Mysql, DynamoDb, Mongo or some application producing to Kafka for it to be indexed in ES
IT’s now Time to sit back and relax

* Our real-time data in ES – available within milliseconds of update
* Backpopulation of a field for millions of user is done within 8-10 hours compared to the 4-5 days it used to take earlier
* Apart from this, as explained above we now a have scalable, robust and less db intensive architecture

How are our services performing?

~18 million indexing operations/day(~200 req/sec) and new indexing pipeline scales well to handle surge in traffic

es-indexing-service(real-time indexing) latency distribution – 50% of docs are indexed within 40ms, 75% request done in 60ms, 90% within 180ms

Latency distribution of es-data-service(written in Golang) – 50% request done within 2ms, 95% within 4ms and 99% within 10ms

I hope this article helps if you are having similar issues with your legacy systems. You can pick some ideas from here which solve your use case and build interesting applications.
😀