Design a Robust Solr Data Indexing Pipeline and Solr Cloud Setup Best Practices
At ASDA we recently moved from using Endeca Search Engine to Solr for our Search.
Through this blog, I will explain the architecture of our indexing pipeline, how we went on designing the architecture considering the challenges, and finally, the best practices that need to be followed while setting up Solr and Index/Collections.
Key Challenges/Points
- Read/Write Scalability.
- Disaster Recovery.
- Data Replication.
- Index Data Backup.
- Solr Schema Changes, without downtime.
- Handling full indexing — Delete all data and reingest.
- Solr Commit Settings during indexing — full-indexing.
Data Replication and Disaster Recovery
Data replication is a critical aspect of any modern application. Data loss, Network issues across data centers, etc are unavoidable.
At any point, Search Service and Search Engine must remain accessible and functioning after any such event, without any or minimal manual intervention.
How did we solve this?
Solr Cluster Architecture: We have 2 Solr Clusters one in West US and the other in South Central US(SC-US).
In each cluster, we have 9 replicas.
Both the clusters are in active-active mode, meaning both will be serving the live traffic.
Similarly, we have deployed our search service in both SC-US and West US. SC-US Search Service points to SC-US Solr cluster and the same way West US service points to West US Cluster.
What happens if one of the Solr clusters is down or unreachable?
- Each service maintains the /health API that checks the health of the Solr cluster that they are pointing to.
- When Solr clusters go down, /health throws 500.
- Load Balancer does a health check on this /health API at regular intervals. So, when one of the clusters go down, the traffic from the respective services are cut off from the load balancer automatically.
The ideal way to solve this would be to implement the Circuit Breaker Pattern and redirect the request to the fallback cluster if a certain percentage of requests fails. So, for the search service in the South Central US cluster fallback cluster will be the West US and vice versa.
We currently have both the implementations in our service.
Solr Index/Collection — Data Back-Up/Snapshot Storage
Backup/Snapshot of indexed data is essential in case of any disaster. In case of any disaster, data needs to be re-ingested to Solr collections quickly.
We can use any DB/File System for snapshots that provides faster read/write throughput.
Example: HDFS, HBase, Cassandra, MongoDB, etc. Each has its own advantages.
We chose to use Cassandra as our snapshot store. The following reasons were the key factors in picking Cassandra.
- Parallel reads and faster writes.
- Key-based lookup to support partial updates in Solr. We use sku_id as the partition key to support indexed document lookup.
- Cassandra was already part of our tech stack and easily available in Walmart Managed services.
Full Indexing and Schema Changes — Zero Downtime
In most cases when there are Solr Schema changes, data re-ingestion is needed. Read more here to know more about when re-indexing is needed.
Full indexing — deleting and reingesting data is a good practice. As and when data is updated in Solr, it doesn’t update the existing data/segment. Instead, it appends the new data and marks the previous document as deleted. Hence, increasing the index size and query time with every update. Query time is impacted as searches are done on these segment files sequentially.
Deleting all documents will drop the whole index and stale data. Read more about the strategy here.
How do we drop the index and keep serving read traffic?
Create 2 copies of a Collection/index:
- One will serve the live read traffic and
- The other one will be open to any change in schema or re-indexing.
Now the question is, where do we maintain the 2 copies of the same Collection?
- In the different cluster or
- In the same cluster
Both approaches have their own advantages and disadvantages.
Different Solr Cluster Approach:
In this approach, we maintain 2 Solr clusters: say PROD1 cluster and PROD2 cluster. These clusters can be either be in the same datacenter or in the completely different datacenters.
We can have a load balancer that forwards the read and real-time updates request to say PROD1 Solr cluster and meanwhile, we can do all the changes (Full Indexing or Schema Changes) in the PROD2 cluster.
Once the changes in the PROD2 cluster are done and tested, we can point the load balancer to forward all read traffic to the PROD2 Solr cluster, which has new changes.
Same Solr Cluster Approach:
Solr has Collection Aliasing feature, which allows you to create an alias and links it to any collection.
In the same cluster, we can create:
- Two collections catalog_a and catalog_b and
- Two aliases catalog_live and catalog_shadow.
Live alias may point to catalog_a or catalog_b at any point in time.
The idea is to run full indexing(delete-reload) and always make schema changes in the “shadow” alias and always read from “live” alias.
Below diagram depicts the same:
At ASDA we chose the Same Cluster approach as we figured,
- It would be less complex to implement such architecture.
- We wanted to run both Solr clusters in active-active mode. Meaning we will be in operation even if one DC goes down.
- Solr itself has APIs that support this feature.
Full Indexing Pipeline — Batch Job
Our Indexing Pipeline Architecture is an implementation of Lambda Architecture. Where we make use of both, batch and real-time data processing.
Orchestrator App
The Orchestrator App is a Spring Boot Container App that provides all the necessary APIs to support the Batch pipeline and the real-time data feed.
The Orchestrator App:
- Has Kafka consumers for the real-time data update to Solr and Cassandra.
- Provides API for starting and stopping Kafka Consumers on demand.
- Triggers/Kills Spark Batch and streaming Jobs on demand, by calling Livy APIs
- Maintains the status of all the jobs at each stage and saves the state of the job to Status DB(My SQL).
Apache Livy
Apache Livy is an open-source library that has APIs to start/kill Spark Batch/Streaming jobs.
Spark Aggregator and Cassandra Tables
Our Cassandra Cluster is set up across 2 datacenters(West US and South Central).
We maintain 2 tables in our snapshot DB Cassandra:
- Catalog table: stores all common attributes across all ASDA stores
- Store-Inventory table: stores inventory and Store-specific data
Spark Aggregator takes care of merging the Catalog table and Store-Inventory table and pushes it to Solr.
Here Store-Inventory data will be stored as nested documents inside Solr.
Why did we use Spark?
We needed Apache Spark for its distributed computing capability to read records from Cassandra and process the records parallelly.
In Spark, we throttle writes to Solr by controlling the number of parallel tasks created. For each task we decide on the Docs writes per second to Solr.
Key Steps in Full Indexing — In the following order
- Stop all the real-time time update Kafka Consumers — Done by Orchestrator App
- Create a new Kafka Consumer to process data from Batch Topics.
- Process data in the new Kafka Consumer and push to Snapshot/Key-Value storage (Cassandra).
- Once all data is pushed to Cassandra, Spark job is triggered by the Orchestrator app with the help of Apache Livy Rest APIs.
- Now inside the Spark Job,
- Turn Off All commit settings(Soft and hard Commit) in Solr for the SHADOW Alias. Use Config overlay Solr API.
- Drop all data in SHADOW alias inside Solr and commit the changes.
- Pull data from Cassandra, merge Parent and Nested docs, and push to SHADOW alias of both the Solr Clusters (West US and SC-US).
- Run sanity tests API on newly indexed data in both clusters.
- Verify if all Solr Replicas are healthy. Use Solr Ping API.
- Switch aliases — Point the Shadow Collection to the Live alias and vice versa.
6. After the data push, re-enable commit settings and Start Kafka Consumers for real-time updates.
Near Real-Time Index Update
Our Catalog and Inventory services take care of pushing any changed data in the separate Kafka topics that we have for real-time updates.
Components in the real-time data update
- Messaging — Kafka
- Separate real-time Kafka Topics are created for real-time updates.
- Data Processor — Listeners to the RTP topics— Could be Kafka Consumers, Kafka Connect, Kafka Streams, or Spark Streaming jobs.
Data Processor Tasks
- Transforms document to Solr indexable format using DataTransformer
- Publishes data to the registered subscribers, synchronously
- Push notification in case of any failure while processing a record and continue processing.
Throttle your writes per second to all subscribers.
Registered Subscribers in Data Processor are:
- Cassandra Observer — Snapshot DB
- Solr West US Observer
- Solr Southcentral US Observer
We update both the LIVE and SHADOW collections in each clusters.
SHADOW collection is also updated. For any reason, if we need to switch back to SHADOW collection, we need the most up to date data.
Below Diagram represents Real-time data flow in Solr and Cassandra
Wrap up
In this blog, I talked about indexing architecture and Solr Practices during indexing.
Our architecture went through many changes over time and we kept introducing new changes to make it a robust and reliable system.
There is still a lot to improve from this point. Our next goal is to support
- Solr in-place updates
- Improve the throughput of the ingestion pipeline from the current 15k writes/second.
- Convert to Kappa Architecture(only real-time update) and do full load only from Snapshot DB.