Introduction to Hazelcast Jet
Hazelcast Jet is a distributed computing platform for fast stream and batch processing of data. With Hazelcast IMDG providing storage functionality, Jet performs parallel execution to enable data-intensive applications to operate in near real-time. Using directed acyclic graphs (DAG) to model relationships between individual steps in the data processing pipeline, Jet can execute both batch and stream-based data processing. Jet handles the parallel execution using the green thread approach to optimize the utilization of computing resources.
Breakthrough application speed is achieved by the execution model of Hazelcast Jet and by keeping both the computation and data storage in memory. The embedded Hazelcast IMDG provides elastic in-memory storage and is a great tool for storing the results of a computation, or acting as a cache for data sets to be used during the computation. Extremely low, end-to-end latencies can be achieved this way. See Jet Performance for the architecture aspects that drive high performance.
Hazelcast Jet is extremely lightweight – in particular Jet can be fully embedded for OEMs and for microservices – making it easier for manufacturers to build and maintain next generation systems. Also, Jet uses Hazelcast discovery for finding members in the cluster, which can be used both on-premise and in cloud environments.
Directed Acyclic Graphs
Hazelcast Jet provides high-performance, in-memory data processing by modeling the computation as a directed acyclic graph (DAG) where vertices represent computation, and edges represent data connections. A vertex receives data from its inbound edges, performs a step in the computation, and then emits data to its outbound edges. A single vertex’s computation work is performed in parallel by many instances of the
Processor type around the cluster.
One of the major reasons to divide the full computation task into several vertices is data partitioning; the ability to split the data stream traveling over an edge into slices which can then be processed independently of each other. To make this work, a function must be defined which computes the partitioning key for each item and makes all related items map to the same key. The computation engine can then route all such items to the same processor instance. This makes it easy to parallelize the computation; each processor will have the full picture for its slice of the entire stream.
Edges determine how the data is routed from individual source processors to individual destination processors. Different edge properties offer precise control over the flow of data.
A job is the unit of work which is executed. A job is described by a DAG, which describes the computation to be performed, and the inputs and outputs of the computation. Job is a handle to the execution of a DAG.
After a job is created, the DAG is replicated to the whole Jet cluster and executed in parallel on each member (see Parallel Execution).
When a request to execute a job is made, the corresponding DAG and additional resources are deployed to the Jet cluster. An execution plan for the DAG is built on each node, which creates the associated tasklets for each vertex and connects them to their inputs and outputs.
Using Hazelcast Jet
Jet Job in a Data Processing Pipeline
The general shape of any data processing pipeline is
drawFromSource -> transform -> drainToSink. The Pipeline API of Jet follows this pattern. You implement your computation by linking the respective transforms, then connecting those to the upstream data sources and downstream sinks. The Jet library contains a set of transform, source and sink adapters to be used in your pipeline. The library can be extended by custom transforms, sources or sinks.
Under the hood, the Pipeline API is translated into a DAG representing the Jet job and it is executed by the distributed computation engine of Jet.
Pipeline p = Pipeline.create(); p.drawFrom(Sources.<String>readList("input")) .map(String::toUpperCase) .drainTo(Sinks.writeList("result");
In a more complex scenario, you’ll have several sources, each starting its own pipeline branch. Then you can merge them in a multi-input transformation such as joining. Symmetrically, the output of a stage can be sent to more than one destination.
Data Sources and Sinks
Hazelcast Jet supports these data sources and sinks:
- Hadoop Distributed File System (HDFS)
- Kafka topic
- TCP socket
- A directory on the filesystem
The simplest kind of transformation is one that can be done on each item individually and independent of other items. Examples are:
maptransforms each item to another item
filterdiscards items that don’t match its predicate
flatMaptransforms each item into zero or more output items.
Stepping up from the simplest transforms, we come to
quintessential finite stream transform. It groups the data items by a
key computed for each item and performs an aggregate operation over all
the items in a group.
A more complex variety of pipeline transforms are those that merge several input stages into a single resulting stage. In Hazelcast Jet there are two such transforms of special interest:
coGroupis a generalization of
groupByto more than one contributing data stream. For example, your goal may be correlating (grouping) events coming from different systems.
hashJoinis a specialization of a general “join” operation, optimized for the use case of data enrichment.
Core API and java.util.stream
Pipeline API is the primary API of Hazelcast Jet. Introduced in Jet 0.5, it is still under construction and we plan to add more transforms in 0.6. The major missing feature is the windowing of infinite streams (sliding, tumbling, session windows). We also plan to add more batch transforms (
distinct for example). This may be a reason to build the DAG using the Core API, or to use
java.util.stream as a high-level API.
The Core API exposes the full potential of Jet. Developers have to deal with the complexity of a distributed DAG computation in order to implement the processing job properly.
java.util.stream provides more convenience to Java developers as it has been a Java SE API since version 8. The java.util.stream operations are mapped to a DAG and then executed, with the result returned to the user in the same manner as a JDK implementation. Nevertheless, it is limited to filter-map-reduce operations on top of a bounded data set.
Batch and Stream Processing
Data processing is traditionally divided into batch and stream. Batch processing refers to running a job on a data set that is available in a data center – it is bounded and finite. Stream processing deals with the in-flight data prior to its storage. Stream datasets are generally unbounded and infinite.
Bounded data (“batch”) processing benefits from optimizations because it knows the data set. Unbounded data (“stream”) processing offers much lower latency – data is processed on-the-fly, so it doesn’t have to wait for the whole data set to arrive in order to run a computation. This is a significant benefit for use cases where data loses its value with time (real-time business insights, stats, ad placement, anomaly and fraud detection).
Jet is based on a low latency execution engine built for high performance stream processing. Infinite data streams are first class citizens. Bounded (“batch”) data sets are treated as data streams that accidentally end.
The typical way to deal with unbounded, streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.
Jet contains several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarized as follows:
- A very basic windowing policy is the tumbling window, which splits the data into batches based on event time. Each event gets assigned to the batch whose time interval covers the event’s timestamp. The result of this is very similar to running a sequence of batch jobs, one per time interval.
- Sliding Window aggregates over a period of given size extending from now into the recent past. As the time passes, the window continuously slides forward and the fresh data gradually displaces the old.
- Session Windows typically capture a burst of events which are separated by a period of inactivity. For example, this can be used to window events by a user session.
Hazelcast Jet supports the notion of “event time” where events can have their own timestamp and can arrive out of order. This is achieved by watermarks. A Watermark is a timestamped item inserted into the stream that tells us “from this point on there will be no more items with timestamp less than this”.
Computing the watermark is a matter of educated guessing and there is always a chance some items will arrive that violate its claim. If we observe such an item, we categorize it as “too late” and just filter it out.
Hazelcast Jet takes a simple approach and strictly triages stream items into “still on time” and “late”, discarding the latter based on WatermarkPolicy selected.
Each vertex is implemented by one or more instances of Processor on each member. Each vertex can specify how many of its processors will run per cluster member; every member will have the same number of processors.
Job execution is done on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread.
Each instance of a Processor is wrapped in one Tasklet which is repeatedly executed until it reports it’s done. A vertex with a parallelism of 8 running on 4 nodes would have a total of 32 tasklets running at the same time. Each node will have the same number of tasklets running.
Each worker thread has a list of tasklets that it is in charge of, and as tasklets complete at different rates, the remaining ones are moved between workers to keep the load balanced.
Cooperative multi-threading is one of the core features of Jet and can be roughly compared to “green threads”. It is purely a library-level feature and does not involve any low-level system or JVM tricks; the Processor API is simply designed in such a way that the processor can do a small amount of work each time it is invoked, then yield back to the Jet engine. The engine manages a thread pool of fixed size and on each thread, the processors take their turn in a round-robin fashion.
The point of cooperative multi-threading is better performance. Several factors contribute to this:
The overhead of context switching between processors is much lower since the operating system’s thread scheduler is not involved.
The worker thread driving the processors stays on the same core for longer periods, preserving the CPU cache lines.
The worker thread has direct knowledge of the ability of a processor to make progress (by inspecting its input/output buffers).
Integration with Hazelcast IMDG
As Hazelcast Jet is built on top of the Hazelcast IMDG platform, there is a tight integration between Jet and IMDG. Hazelcast operations are used for different actions that can be performed on a job. Jet can also be used with the Hazelcast client, which uses the Hazelcast Open Binary Protocol to communicate different actions to the server instance.
Hazelcast Jet embeds Hazelcast IMDG. Therefore, Jet can use IMDG maps, caches and lists on the embedded cluster as sources and sinks of data and make use of data locality. Jet can also use any remote IMDG instance via IMDG connector.
Moreover, Jet is able to process the stream of changes (Event Journal) of an IMap and ICache. This allows you to process the IMap/ICache data in a streaming way.
Members and Clients
A Hazelcast Jet instance is a unit where the processing takes place. There can be multiple instances per JVM, however this only makes sense for testing. An instance becomes a member of a cluster. It can join and leave clusters multiple times during its lifetime. Any instance can be used to access a cluster, giving an appearance that the entire cluster is available locally.
On the other hand, a client instance is just an accessor to a cluster and no processing takes place in it.
A Hazelcast Jet job keeps its internal state in memory. The state contains information such as values of running aggregations, content of open windows or a reading offset for sources. In the case of failure, the state is lost. The job has to be restarted from the beginning to recompute its state. This can be complicated, especially for long running stream jobs.
In order to prevent the state loss, the job state has to be periodically saved.
Hazelcast Jet supports distributed state snapshots and automatic restarts. Snapshots are periodically created and backed up. When there is a failure in the cluster, Jet uses the latest state snapshot and automatically restarts all jobs that contain the failed member as a job participant from this snapshot.
Jet internally uses Hazelcast
IMaps to store job metadata and snapshots. Remember, IMap is replicated accross the cluster, so the state remains available in the case of failure. Moreover, no additional infrastructure such as distributed file system is necessary.
You can choose between exactly-once and at-least-once processing. The trade-off is between correctness and performance. It’s configured per job.
Hazelcast Jet supports the scenario where a new member joins the cluster while a job is running. Currently, the ongoing job will not be replanned to start using the member, although this is on the road map for future versions. The new member can also leave the cluster while the job is running and this won’t affect its progress.
Handling Back Pressure
Hazelcast Jet uses single producer/single consumer Ringbuffers to transfer the data between processors on the same member. Ringbuffers, being bounded Queues, introduce natural back pressure into the system; if a consumer’s Ringbuffer is full, the producer will have to back off until it can enqueue the next item. When data is sent to another member over the network, there is no natural back pressure, so Jet uses explicit signaling in the form of adaptive receive windows.
Discovery and Networking
Hazelcast Jet uses Hazelcast IMDG discovery for finding members in the cluster. On the wire, Jet uses custom lightweight serialization for small and frequently used types (primitive types and strings) and delegates to Hazelcast serialization for the rest.