Introduction to Hazelcast Jet
Hazelcast Jet is a distributed computing platform for fast stream and batch processing of data. With Hazelcast IMDG providing storage functionality, Jet performs parallel execution to enable data-intensive applications to operate in near real-time. Using directed acyclic graphs (DAG) to model relationships between individual steps in the data processing pipeline, Hazelcast Jet can execute both batch and stream-based data processing applications. Jet handles the parallel execution using the green thread approach to optimize the utilization of the computing resources.
Breakthrough application speed is achieved by the execution model of Jet and by keeping both the computation and data storage in memory. The embedded Hazelcast IMDG provides elastic in-memory storage and is a great tool for storing the results of a computation or as a cache for data sets to be used during the computation. Extremely low end-to-end latencies can be achieved this way. See Jet Performancee for the architecture aspects that drive high performance.
It is extremely lightweight – in particular Jet can be fully embedded for OEMs and for Microservices – making it is easier for manufacturers to build and maintain next generation systems. Also, Jet uses Hazelcast discovery for finding the members in the cluster, which can be used in both on-premise and cloud environments.
Directed Acyclic Graphs
At the core of Jet is the distributed computation engine based on the paradigm of dataflow programming and a directed acyclic graphs (DAG). In the graph, vertices are units of data processing and edges are units of data routing and transfer.
A Vertex is the main unit of work in a Jet computation. Conceptually, it receives input from its inbound edges and emits data to its outbound edges. Jet uses only one kind of vertex, but in practice there is an important distinction between:
Computational vertex which accepts input and transforms it into output;
Source vertex which generates output without receiving anything;
Sink vertex which consumes input and doesn’t emit anything.
Each vertex’s computation is implemented by a Processor. On each member there are one or more instances of the processor running in parallel for a single vertex; (see Parallel Execution). Practically, it is a number of Processor instances which receive each its own part of the full stream traveling over the inbound edges, and likewise emits its own part of the full stream going down the outbound edges. Generally the processor is implemented by the user, but there are numerous ready-made implementations in Jet’s library.
Data sources and sinks are implemented as Processors as well and are used for the terminal vertices of the DAG. A source can be distributed, which means that on each member of the Jet cluster a different slice of the full data set will be read. Similarly, a sink can also be distributed so each member can write a slice of the result data to its local storage. Data partitioning is used to route each slice to its target member. Examples of distributed sources supported by Jet are HDFS files and Hazelcast’s IMap/IList.
Edges transfer data from one vertex to the next and contain the partitioning logic which ensures that each item is sent to its target processor.
A Job is the unit of work which is executed. A Job is described by a DAG, which describes the computation to be performed, and the inputs and outputs of the computation. Job is a handle to the execution of a DAG.
After a Job is created, the DAG is replicated to the whole Jet cluster and executed in parallel on each member (see Parallel Execution).
When a request to execute a Job is made, the corresponding DAG and additional resources are deployed to the Jet cluster. An execution plan for the DAG is built on each node, which creates the associated tasklets for each Vertex and connects them to their inputs and outputs.
Using Hazelcast Jet
There are two ways to use Jet: build the DAG using the Core API or use java.util.stream as a high-level API. The java.util.stream operations are mapped to a DAG and then executed, then the result is returned to the user in the same manner as in JDK’s implementation.
Whereas java.util.stream provides more convenience, only the Core API exposes the full potential of Jet. There are plans to develop higher-level batching and streaming APIs that would be better at exploiting Jet’s full potential while also being more expressive and convenient than the Core API.
Batch and Stream Processing
The data processing is traditionally divided to batch and stream. Batch processing refers to running a job on a data set that is available in a data center – it is bounded, finite. Stream processing deals with the in-flight data prior to it’s storage. Stream datasets are generally unbounded and infinite.
Bounded data (“batch”) processing benefits from optimizations resulting from knowing the data set. Unbounded data (“stream”) processing offers much lower latency – data are processed on-the-fly, so one doesn’t have to wait for the whole data set to arrive in order to run a computation. This is a significant benefit for use-cases where data tend to lose it’s value with time (real-time business insights, stats, ad-placement, anomaly and fraud detection).
Jet is based on a low-latency execution engine built for high-performance stream processing. Infinite data streams are first class citizens. Bounded (“batch”) data sets are treated as data streams that accidentally end.
The typical way to deal with unbounded, streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.
Jet contains several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarized as follows:
Tumbling Windows: Fixed-size, non-overlapping and contiguous intervals. For example a tumbling window of 1 minute would mean that each window is of 1 minute length, are not overlapping, and contiguous to each other.
Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
Session Windows: Typically captures a burst of events which are separated by a period of inactivity. This can be used to window events by a user session, for example.
Jet supports the notion of “event-time” where events can have their own timestamp and can arrive out of order. This is achieved by watermarks. Watermark is a timestamped item inserted into the stream that tells us “from this point on there will be no more items with timestamp less than this”.
Computing the watermark is a matter of educated guessing and there is always a chance some items will arrive that violate its claim. If we do observe such an offending item, we categorize it as “too late” and just filter it out.
Hazelcast Jet takes a simple approach and strictly triages stream items into “still on time” and “late”, discarding the latter based on WatermarkPolicy selected.
Each vertex is implemented by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member; every member will have the same number of processors.
Job execution is done on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread.
Each instance of a Processor is wrapped in one Tasklet which is repeatedly executed until it reports it is done. A vertex with a parallelism of 8 running on 4 nodes would have a total of 32 tasklets running at the same time. Each node will have the same number of tasklets running.
Each worker thread has a list of tasklets it is in charge of and as tasklets complete at different rates, the remaining ones are moved between workers to keep the load balanced.
Cooperative multi-threading is one of the core features of Jet and can be roughly compared to “green threads”. It is purely a library-level feature and does not involve any low-level system or JVM tricks; the Processor API is simply designed in such a way that the processor can do a small amount of work each time it is invoked, then yield back to the Jet engine. The engine manages a thread pool of fixed size and on each thread, the processors take their turn in a round-robin fashion.
The point of cooperative multi-threading is better performance. Several factors contribute to this:
The overhead of context switching between processors is much lower since the operating system’s thread scheduler is not involved.
The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.
The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).
Integration with Hazelcast IMDG
As Hazelcast Jet is built on top of the Hazelcast IMDG platform, there is a tight integration between Jet and IMDG. Hazelcast Operations are used for different actions that can be performed on a job. Jet can also be used with the Hazelcast Client, which uses the Hazelcast Open Binary Protocol to communicate different actions to the server instance.
Jet embeds Hazelcast IMDG. Therefore, Jet can use Hazelcast IMDG maps, caches and lists on the embedded cluster as sources and sinks of data and make use of data locality. Jet can also use any remote Hazelcast IMDG instance via Hazelcast IMDG connector.
Members and Clients
A Hazelcast Jet instance is a unit where the processing takes place. There can be multiple instances per JVM, however this only makes sense for testing. An instance becomes a member of a cluster: it can join and leave clusters multiple times during its lifetime. Any instance can be used to access a cluster, giving an appearance that the entire cluster is available locally.
On the other hand, a client instance is just an accessor to a cluster and no processing takes place in it.
In its current version, Hazelcast Jet can only detect a failure in one of the cluster members that was running the computation, and abort the job. A feature planned for the future is fault tolerance: the ability to go back to a saved snapshot of the computation state and resume the computation without the failed member.
Hazelcast Jet supports the scenario where a new member joins the cluster while a job is running. Currently the ongoing job will not be re-planned to start using the member, though; this is on the road map for a future version. The new member can also leave the cluster while the job is running and this won’t affect its progress.
Handling Back Pressure
Jet uses single producer/single consumer Ringbuffers to transfer the data between processors on the same member. Ringbuffers, being bounded Queues, introduce natural back pressure into the system; if a consumer’s Ringbuffer is full, the producer will have to back off until it can enqueue the next item. When data is sent to another member over the network, there is no natural back pressure, so Jet uses explicit signaling in the form of adaptive receive windows.
Discovery and Networking
Jet uses Hazelcast IMDG discovery for finding the members in the cluster. On the wire Jet uses custom lightweight serialization for small and frequently used types (primitive types and strings) and delegates to Hazelcast serialization for the rest.