Chapter 2
Ray: Design, Architecture, and Core Abstractions
Ray is more than just another distributed system-it's an ambitious leap in how we orchestrate large-scale computing. This chapter pulls back the curtain on Ray's architectural blueprint: how thousands of tasks, actors, and resources coordinate with stunning efficiency. Discover the ingenious design decisions, flexible abstractions, and cluster-leveraging features that allow Ray to run cutting-edge applications across clouds and data centers with minimal friction.
2.1 Ray Architecture Deep Dive
Ray's architecture is a meticulously engineered composition of modular components designed to provide a unified platform for distributed computing, with a strong emphasis on scalability, fault tolerance, and low-latency task scheduling. At its core, Ray decomposes the system into a hierarchy of layers: individual worker nodes, a distributed object store, a decentralized scheduler, and a fault-tolerant runtime. Each layer plays a specialized role with clear interfaces and responsibilities, enabling robust execution of complex, dynamic workloads.
Node-Level Infrastructure and Object Store
Each Ray node consists primarily of two foundational elements: a set of local worker processes and a shared memory object store. Workers execute Python tasks or actors, processing units of work either scheduled locally or received from remote nodes. The object store operates as a zero-copy shared-memory repository that stores immutable data objects, facilitating high-throughput data access and efficient inter-worker communication on the same node.
This object store implementation exploits techniques like plasma, a shared-memory object store built upon the Apache Arrow Plasma core, enabling fine-grained memory sharing with minimal serialization overhead. Objects stored are reference counted, allowing for safe, transparent data lifecycle management. Once an object is no longer needed, it is automatically garbage collected, preventing memory leaks across distributed tasks.
Moreover, the locality-awareness inherent in the object store reduces network bandwidth consumption and latency by favoring intra-node data access where possible. When remote access is required, Ray employs a gRPC-based mechanism to transfer objects between stores across nodes asynchronously, optimizing distributed task output retrieval.
Decentralized Scheduling and Task Distribution
Ray's scheduler follows a decentralized design pattern, avoiding single points of failure and bottlenecks. The decentralized scheduler consists of a global scheduler (or scheduler cluster) and multiple local schedulers co-located with the worker nodes. The local schedulers make admission control and placement decisions for tasks submitted on that node, executing lightweight scheduling functions to assign tasks to workers locally when resources permit.
When local nodes experience resource contention or overload, task assignment requests are escalated to the global scheduler. The global scheduler maintains a cluster-wide view of available computational resources and can reassign or offload tasks to less utilized nodes. This hierarchical approach combines the scalability of distributed task handling with centralized coordination for global optimization.
Ray's scheduling algorithm leverages a dataflow graph representation of task dependencies. Tasks are queued and prioritized according to resource availability, task readiness, and locality constraints elucidated by object dependencies in the distributed object store. This guarantees that tasks execute as soon as their inputs become available, minimizing latency and maximizing resource utilization.
State Management and Fault-Tolerant Runtime
Ray's runtime orchestrates system-wide state through a distributed control plane implemented atop a fault-tolerant key-value store (such as etcd). This control plane persistently records cluster metadata, task statuses, resource allocations, and object lineage, enabling recovery and consistent scheduling decisions despite node failures.
The design choice to separate control state from worker and scheduler processes is deliberate: it facilitates modularity and resilience. Ray can restart or reschedule worker processes without jeopardizing global system integrity, as the critical state resides consistently in the backing store. This enables Ray's ability to automatically recover from machine failure by reconstructing lost objects and resubmitting unfinished tasks based on lineage records.
Lineage-based fault recovery is a pivotal innovation. Every remote object's lineage-tracking the original task and its dependencies-is maintained within the system state. Should an object be lost due to worker failure, Ray transparently recomputes only the necessary subset of tasks required to regenerate lost data, avoiding excessive recomputation. This supports both fine-grained fault tolerance and efficient use of cluster resources.
Component Separation and Design Philosophy
Ray's modular component separation aligns with contemporary distributed system design principles that emphasize loosely coupled, highly cohesive subsystems. The object store, schedulers, workers, and control plane collectively form a microservices-like architecture, each encapsulating specific concerns and communicating via well-defined protocols.
This strategy achieves several systemic objectives. First, it enhances scalability: each component can be independently scaled or upgraded, preventing monolithic bottlenecks. Second, it improves maintainability and extensibility, allowing future enhancements to one component (e.g., upgrading the scheduler with smarter algorithms) without disrupting others. Third, it fortifies fault isolation, whereby failure containment boundaries limit propagation and simplify recovery procedures.
The separation also supports heterogeneous workloads inherent in modern AI and data science applications. By isolating task execution from control state and data transport, Ray gracefully manages diverse computation patterns-stateless parallel tasks, stateful actors, and streaming pipelines-within a unified framework.
Integration and Performance Implications
The interplay between Ray's components ensures that distributed computations achieve low latency and high throughput while maintaining robustness. Data locality awareness minimizes remote data fetches, and local schedulers reduce scheduling latency for high-frequency small tasks. Decentralized scheduling balances load effectively across large clusters, avoiding central bottlenecks.
Persistent control state and lineage tracking enable rapid restart and fault recovery without user intervention, essential for reliable long-running machine learning training jobs or interactive analytic workloads. This level of automation significantly simplifies application development and operational management.
Ray's layered architecture thereby supports a spectrum of deployment configurations-from a single machine harnessing multiple cores and GPUs to sprawling cloud clusters executing millions of tasks-demonstrating a profound synergy between architectural modularity, system reliability, and computational efficiency.
2.2 Ray Remote Functions and Task Model
At the core of Ray lies a model for distributed task parallelism that abstracts the complexity of parallel and distributed computing behind a straightforward programming interface. Ray remote functions, or tasks, encapsulate stateless computations that are executed asynchronously across a heterogeneous cluster, enabling fine-grained parallelism and scalability. This section delves into the semantics of Ray remote functions, the futures mechanism for expressing task dependencies, and the architectural principles that govern task scheduling and execution efficiency.
A remote function in Ray is a Python function or method decorated with @ray.remote, which marks it for execution on a remote worker process. When such a function is invoked, instead of immediate execution, it returns a future-a placeholder for the eventual result. This future encapsulates the task's execution state and result, allowing the program to continue asynchronously. The following snippet demonstrates a simple remote function definition and invocation:
import ray @ray.remote def square(x): ...