Introduction to Hazelcast Jet
Ultra Fast Stream and Batch Processing
Hazelcast Jet is an application embeddable, distributed computing engine built on top of Hazelcast In-Memory Data Grid (IMDG). With Hazelcast IMDG providing storage functionality, Hazelcast Jet performs parallel execution to enable data-intensive applications to operate in near real-time.
Fast Big Data
Data streams are potentially unbounded and infinite sequences of events. Typical application use cases include online trades, sensor updates in internet-of-things (IoT) architectures, system log events, in-store e-commerce systems and social media platforms. In these application examples, processing data quickly is as important as processing large volumes of data. Hazelcast Jet processing tasks are distributed across a Jet cluster to parallelize computation. Therefore, Jet is able to scale and process big data volumes.
Hazelcast Jet is built on top of a low latency streaming core. Hazelcast Jet processes incoming records as soon as possible, opposed to accumulating records into micro-batches, consequently lowering latency for applications. As Hazelcast IMDG is embedded in Hazelcast Jet, all the services of an IMDG are available for Jet jobs without any additional deployment effort.
Hazelcast Jet starts fast, scales automatically, handles failures, and communicates with the data processing pipeline using asynchronous messages. There are numerous use cases that benefit from processing data fast including:
- Real-time analytics
- In-store e-commerce systems
- Complex event processing
- Monitoring and stats
- Social media platforms
- IoT data ingestion, processing and storage
- Keeping systems of record in sync
- Data processing microservice architectures
- Implementing Event Sourcing and CQRS (maintaining derived data, “materialised views”)
This introduction will cover the techniques Hazelcast Jet uses to process data at scale in near real-time:
- Streaming Execution Model for low latency
- Regular system backups using distributed snapshots for fault tolerance and elasticity
- Windowing to manage unbounded (infinite) and unordered data streams
- Data pipelines modeled as flow graphs
- Cooperative threading model for ultra low latency
- Separating processing and deployment concerns
- Resilient in-memory storage reducing the number of moving parts
Why Hazelcast Jet?
There are plenty of solutions which can solve some of these issues, so why choose Hazelcast Jet?
When speed and simplicity is important
Hazelcast Jet gives you all the infrastructure you need to build a distributed data processing pipeline within one 10Mb Java JAR: processing, storage and clustering.
As it is built on top of Hazelcast IMDG, Hazelcast Jet comes with in-memory operational storage that’s available out-of-the box. This storage is partitioned, distributed and replicated across the Hazelcast Jet cluster for capacity and resiliency. It can be used as an input data buffer, to publish the results of a Hazelcast Jet computation, to connect multiple Hazelcast Jet jobs or as a lookup cache for data enrichment. See Integration with Hazelcast IMDG.
You can still use other storage such as HDFS, Kafka or even another remote IMDG for data input and output. However, you don’t have to. Keep it simple.
The more parts of the data pipeline which are kept in-memory, the bigger performance gain Hazelcast Jet provides making use of a data locality. See performance to understand the secret sauce.
In Hazelcast Jet cluster set up is simple. When a Hazelcast Jet node starts it discovers the running Hazelcast Jet cluster automatically and joins it. Running computations are up-scaled to make use of the new computing resource. If a node fails, a fault tolerance mechanism kicks in and computations keep running.
Hazelcast Jet provides all of these features in a single Java library. Embed it into your application to simplify distribution, or run it from the command line. All of this literally works out-of-the-box. Try it!
Hazelcast Jet… Distributed Computing, Simplified.
Streaming Execution Model
Hazelcast Jet uses a stream processing paradigm to enable low latency and a continuous stream of results.
Once started, the streaming application runs continuously until terminated. When it sees a new event on an input, it processes it immediately. Hazelcast Jet can achieve millisecond latencies for millions of events/gigabytes per second with streaming as there is no need to start new processes or “batch” the data before processing.
Hazelcast Jet applies a streaming execution model for both bounded (often referred to as “batch” workloads) and unbounded (infinite streams) data.
Fault Tolerance and Elasticity
One challenge when streaming is fault tolerance. The streaming application usually has a state that’s changed continuously by the data flowing through it, e.g. offset of last item read from the stream, rolling state of the computation. This state has to stay consistent even in the case of a failure.
A Hazelcast Jet job keeps its state in-memory to maintain low latency.
The job state is periodically saved using a distributed state snapshot algorithm. The snapshots can be created to support exactly-once or at-least-once processing to trade-off between correctness and performance.
In Hazelcast Jet snapshots are distributed across the cluster and held in multiple replicas to provide redundancy. Hazelcast Jet is able to tolerate multiple faults such as node failure, network partition or job execution failure. Snapshots are periodically created and backed up. If there is a node failure Jet uses the latest state snapshot and automatically restarts all jobs that contain the failed node as a job participant.
The same approach can be used to up-scale the cluster: when a new node joins the cluster, running jobs can be restarted from the last snapshot using the new cluster topology.
Hazelcast Jet uses Hazelcast IMaps to store job metadata and snapshots. Note, IMap is replicated across the cluster, so the state remains available in the case of failure. Moreover, no additional infrastructure, such as distributed file system, is necessary.
Windowing and Late Events
The typical way to deal with unbounded data is to look at it in terms of “windows”, where a window represents a slice of the data stream defined by time or count. This allows you to run grouping operations such as
order by on an infinite input. Types of windowing supported by Hazelcast Jet can be summarized as follows:
- Fixed/Tumbling, time is partitioned into same-length, non-overlapping chunks. Each event belongs to exactly one window.
- Sliding, windows have fixed length, but are separated by a time interval (step) which can be smaller than the window length. Typically the window interval is a multiplicity of the step.
- Session, windows have various sizes and are defined based on data, which should carry some session identifiers.
When dealing with time series data, users can simply timestamp events as they arrive (referred to as processing time) or use the timestamps contained within each event itself (referred to as event time). In both cases, the timestamp is used to assign events into windows.
As events can arrive out of order, the user needs to take this into account when using event time. Crucially, a decision needs to be taken on how long to wait for late data before the window can be closed and sent to the downstream computation.
Hazelcast Jet takes a simple approach and strictly triages events into “still on time” and “late”, discarding the latter. It uses heuristics combined with an allowed lateness setting.
Implementing Data Pipelines in Jet
The general shape of any data processing pipeline is
draw From Source -> transform -> drain To Sink. The Pipeline API in Hazelcast Jet allows users to implement computations by linking the respective transforms, then connecting those to the upstream data sources and downstream sinks.
The Pipeline API is a Java 8 API. It contains a set of transform, source and sink connectors which are used in the pipeline (see full list). The library can be extended by custom transforms, sources or sinks.
Pipeline p = Pipeline.create(); p.drawFrom(Sources.fileWatcher(tempDir.toString())) .map(LogLine::parse) .filter(line -> line.getResponseCode() >= 200 && line.getResponseCode() < 400) .addTimestamps(LogLine::getTimestamp, 1000) .window(WindowDefinition.sliding(10_000, 1_000)) .groupingKey(LogLine::getEndpoint) .aggregate(AggregateOperations.counting()) .drainTo(Sinks.logger());
This pipeline continuously parses the HTTP server log files and counts accesses per resource. See AccessStreamAnalyzer.java
In a more complex scenario, you’ll have several sources, each starting its own pipeline branch. Then you can join them. Symmetrically, you can fork the pipeline to output to more than one destination.
For more examples, see demos.
Parallel Pipeline Execution
After the Pipeline is submited for execution, it’s translated into a DAG representing the data flow and is executed by the distributed computation engine of Hazelcast Jet. The DAG is replicated to the whole Hazelcast Jet cluster and executed in parallel on each member.
Each operator in the pipeline can specify how many parallel instance will run per cluster member; every member will have the same number of parallel instances – Processors.
Job execution is done on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread. Each worker thread has a list of processors that it is in charge of, and as the processors complete at different rates, the remaining ones are moved between workers to keep the load balanced.
This approach, called cooperative multi-threading, is one of the core features of Hazelcast Jet and can be roughly compared to green threads; each processor can do a small amount of work each time it is invoked, then yield back to the Hazelcast Jet engine. The engine manages a thread pool of fixed size, and on each thread the instances take their turn in a round-robin fashion.
The purpose of cooperative multi-threading is to improve performance. Several factors contribute to this:
- The overhead of context switching is much lower since the operating system’s thread scheduler is not involved.
- The worker thread driving the instances stays on the same core for longer periods, preserving the CPU cache lines.
There are plenty of cluster and resource management tools available. Hazelcast Jet isn’t associated with any of them, or tries to replicate what they do. Instead, Hazelcast Jet focuses on issues relating to running jobs and dealing with the topology changes. Cluster resource allocation is left to a tool of your choice.
Hazelcast Jet automatically discovers new nodes joining the cluster and rescales running jobs to use all the resource available. The nodes in a Hazelcast Jet cluster are symmetrical – there is no dedicated master node or worker node. The Hazelcast Jet cluster assigns these roles dynamically. Roles may change over time as the cluster topology changes.
Delegating resource management to different layers allows you to do simple deployments where you simply launch Jet nodes manually by copying necessary JARs and running Jet from a command line. You can also use an orchestrator such as Ansible.
Hazelcast Jet supports two modes of deployment: “embedded member”, where the JVM containing application code participates in the Hazelcast Jet cluster directly, and “client-server”, whereby a secondary JVM (which may be on the same host, or on a different one) is responsible for joining the Hazelcast Jet cluster.
The main advantage of using embedded topology is simplicity. There are no separate moving parts to manage. This applies especially when the Hazelcast Jet cluster is tied directly to the embedded application making it a great fit for microservices and OEMs.
The client-server topology brings an isolation — the Hazelcast Jet cluster will use its own resources instead of sharing it with an application.
Integration with Hazelcast IMDG
As Hazelcast Jet is built on top of the Hazelcast IMDG platform, there is a tight integration between the two.
Hazelcast Jet embeds Hazelcast IMDG. Therefore, Hazelcast Jet can use IMDG maps, caches and lists on the embedded cluster as sources and sinks of data and make use of data locality. Hazelcast Jet can also use any remote IMDG instance via an IMDG connector.
Moreover, Hazelcast Jet is able to process the stream of changes (Event Journal) of an IMap and ICache. This allows you to process the IMap/ICache data in a streaming way.