Don't miss the upcoming webinar: Building Real-Time Data Pipelines with a 3rd Generation Stream Processing Engine - sign up now!

Introduction to Hazelcast Jet

Ultra Fast Stream and Batch Processing

Hazelcast Jet is an application embeddable, distributed computing engine for streaming and fast batch processing. Hazelcast Jet is built on top of Hazelcast In-Memory Data Grid (IMDG). With Hazelcast IMDG providing storage functionality, Hazelcast Jet performs parallel execution to enable data-intensive applications to operate in near real-time.

Hazelcast Jet Vertex and Sink diagram

Fast Big Data

Events happen continuously in the real world. It’s very natural to handle and process them as continuous, potentially unbounded and infinite sequences of event records – the data streams. There are numerous use cases that benefit from data presented as streams:

  • Real-time analytics and predictions
  • Fast business insights
  • Fraud and anomaly detection
  • Monitoring and computing stats
  • Complex event processing (If A happened after B do C)
  • Continuous ETL (Maintaining derived data)
  • Implementing Event-Driven Architectures
  • IoT data ingestion, processing and storage

In these application examples, processing data quickly is as important as processing large volumes of data. With continuous stream processing, one can process incoming records as soon as possible, as opposed to accumulating records into batches or micro-batches, consequently. This results in very low latency (time difference between the event origination and the resulting availability).

Hazelcast Jet is built on top of a low latency continuous streaming core. Moreover, the processing tasks are distributed across a Jet cluster to parallelize computation. Therefore, Jet is able to scale and process big data volumes quickly.

Hazelcast Jet starts fast, scales automatically, handles failures and communicates with the data processing pipeline using asynchronous messages.

As Hazelcast IMDG is embedded in Hazelcast Jet, all the services of an IMDG are available for Jet jobs without any additional deployment effort.

This introduction will cover the techniques Hazelcast Jet uses to process data at scale in near real-time:


Why Hazelcast Jet?

There are plenty of solutions which can solve some of these issues, so why choose Hazelcast Jet?

When speed and simplicity is important

Hazelcast Jet gives you all the infrastructure you need to build a distributed data processing pipeline within one 10Mb Java JAR: processing, storage and clustering.

As it is built on top of Hazelcast IMDG, Hazelcast Jet comes with in-memory operational storage that’s available out-of-the box. This storage is partitioned, distributed and replicated across the Hazelcast Jet cluster for capacity and resiliency. It can be used as an input data buffer, to publish the results of a Hazelcast Jet computation, to connect multiple Hazelcast Jet jobs or as a lookup cache for data enrichment. See Integration with Hazelcast IMDG.

Jet 3.0 Architecture Diagram

You can still use other storage such as HDFS, Kafka or even another remote IMDG for data input and output. However, you don’t have to. Keep it simple.

The more parts of the data pipeline which are kept in-memory, the bigger performance gain Hazelcast Jet provides making use of a data locality. See performance to understand the secret sauce.

Hazelcast Jet Word Count Benchmark

In Hazelcast Jet cluster set up is simple. When a Hazelcast Jet node starts it discovers the running Hazelcast Jet cluster automatically and joins it. Running computations are up-scaled to make use of the new computing resource. If a node fails, a fault tolerance mechanism kicks in and computations keep running.

Hazelcast Jet provides all of these features in a single Java library. Embed it into your application to simplify distribution, or run it from the command line. All of this literally works out-of-the-box. Try it!

Hazelcast Jet… Distributed Computing, Simplified.

Streaming Execution Model

Hazelcast Jet uses a stream processing paradigm to enable low latency and a continuous stream of results.

Once started, the streaming application runs continuously until terminated. When it sees a new event on an input, it processes it immediately. Hazelcast Jet can achieve millisecond latencies for millions of events/gigabytes per second with streaming as there is no need to start new processes or “batch” the data before processing.

Hazelcast Jet applies a streaming execution model for both bounded (often referred to as “batch” workloads) and unbounded (infinite streams) data.

Fault Tolerance and Elasticity

One challenge when streaming is fault tolerance. The streaming application usually has a state that’s changed continuously by the data flowing through it, e.g. offset of last item read from the stream, rolling state of the computation. This state has to stay consistent even in the case of a failure.

A Hazelcast Jet job keeps its state in-memory to maintain low latency.

The job state is periodically saved using a distributed state snapshot algorithm. The snapshots can be created to support exactly-once or at-least-once processing to trade-off between correctness and performance.

In Hazelcast Jet snapshots are distributed across the cluster and held in multiple replicas to provide redundancy. Hazelcast Jet is able to tolerate multiple faults, such as node failure, network partition or job execution failure. Snapshots are periodically created and backed up. If there is a node failure, Jet uses the latest state snapshot and automatically restarts all jobs that contain the failed node as a job participant.

The same approach can be used to up-scale the cluster: when a new node joins the cluster, running jobs can be restarted from the last snapshot using the new cluster topology.

Hazelcast Jet uses Hazelcast IMaps to store job metadata and snapshots. Note, IMap is replicated across the cluster, so the state remains available in the case of failure. Moreover, no additional infrastructure, such as a distributed file system, is necessary. Jet is fault-tolerant out-of-the-box.

Jet Enterprise contains the Lossless Restart feature for full cluster recovery from either planned shutdowns or cluster-wide crashes. When enabled, Jet continuously persists the state of the cluster members on disk in a format specifically designed for restart performance and to work in concert with SSDs.

Windowing and Late Events

The typical way to deal with unbounded data is to look at it in terms of “windows”, where a window represents a slice of the data stream defined by time or count. This allows you to run grouping operations such as sum or order by on an infinite input. Types of windowing supported by Hazelcast Jet can be summarized as follows:

  • Fixed/Tumbling, time is partitioned into same-length, non-overlapping chunks. Each event belongs to exactly one window.
  • Sliding, windows have fixed length, but are separated by a time interval (step) which can be smaller than the window length. Typically the window interval is a multiplicity of the step.
  • Session, windows have various sizes and are defined based on data, which should carry some session identifiers.
Windowing and Late Events diagram

When dealing with time series data, users can simply timestamp events as they arrive (referred to as processing time) or use the timestamps contained within each event itself (referred to as event time). In both cases, the timestamp is used to assign events into windows.

As events can arrive out of order, the user needs to take this into account when using event time. Crucially, a decision needs to be taken on how long to wait for late data before the window can be closed and sent to the downstream computation.

Windowing and Late Events diagram 2

Hazelcast Jet takes a simple approach and strictly triages events into “still on time” and “late”, discarding the latter. It uses heuristics combined with an allowed lateness setting.

Implementing Data Pipelines in Jet

The general shape of any data processing pipeline is draw From Source -> transform -> drain To Sink. The Pipeline API in Hazelcast Jet allows users to implement computations by linking the respective transforms, then connecting those to the upstream data sources and downstream sinks.

The Pipeline API is a Java 8 API. It contains a set of transform, source and sink connectors which are used in the pipeline (see full list). The library can be extended by custom transforms, sources or sinks.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.fileWatcher(tempDir.toString()))
         .map(LogLine::parse)
         .filter(line -> line.getResponseCode() >= 200 && line.getResponseCode() < 400)
         .addTimestamps(LogLine::getTimestamp, 1000)
         .window(WindowDefinition.sliding(10_000, 1_000))
         .groupingKey(LogLine::getEndpoint)
         .aggregate(AggregateOperations.counting())
         .drainTo(Sinks.logger());

This pipeline continuously parses the HTTP server log files and counts accesses per resource. See AccessStreamAnalyzer.java

In a more complex scenario, you’ll have several sources, each starting its own pipeline branch. Then you can join them. Symmetrically, you can fork the pipeline to output to more than one destination.

For more examples, see demos.

Parallel Pipeline Execution

After the Pipeline is submited for execution, it’s translated into a DAG representing the data flow and is executed by the distributed computation engine of Hazelcast Jet. The DAG is replicated to the whole Hazelcast Jet cluster and executed in parallel on each member.

Jet Distribution

Each operator in the pipeline can specify how many parallel instance will run per cluster member; every member will have the same number of parallel instances – Processors.

Job execution is done on a user-configurable number of threads. Each worker thread has a list of processors that it is in charge of. As the processors complete at different rates, the unutilized processors back off and stay idle, allowing busy workers to catch up to keep the load balanced.

Jet Parallelism Model

This approach, called cooperative multi-threading, is one of the core features of Hazelcast Jet and can be roughly compared to green threads; each processor can do a small amount of work each time it is invoked, then yield back to the Hazelcast Jet engine. The engine manages a thread pool of fixed size, and on each thread the instances take their turn in a round-robin fashion.

The purpose of cooperative multi-threading is to improve performance. Several factors contribute to this:

  • The overhead of context switching is much lower since the operating system’s thread scheduler is not involved.
  • The worker thread driving the instances stays on the same core for longer periods, preserving the CPU cache lines.

Clustering

There are plenty of cluster and resource management tools available. Hazelcast Jet isn’t associated with any of them, or tries to replicate what they do. Instead, Hazelcast Jet focuses on issues relating to running jobs and dealing with the topology changes. Cluster resource allocation is left to a tool of your choice.

Hazelcast Jet automatically discovers new nodes joining the cluster and rescales running jobs to use all the resource available. The nodes in a Hazelcast Jet cluster are symmetrical – there is no dedicated master node or worker node. The Hazelcast Jet cluster assigns these roles dynamically. Roles may change over time as the cluster topology changes.

Delegating resource management to different layers allows you to do simple deployments where you simply launch Jet nodes manually by copying necessary JARs and running Jet from a command line. You can also use an orchestrator such as Ansible.

Cloud Native

To make your infrastructure more dynamic and automated, you can deploy Hazelcast Jet to many cloud environments and easily extend it via Cloud Discovery Plugins.

A Hazelcast supported Docker image is available on Docker Hub for container deployments. Hazelcast Jet Docker images are Kubernetes-ready. There is a Helm chart available that bootstraps Hazelcast Jet deployments on a Kubernetes cluster using the Helm package manager.

Jet Enterprise includes the certified native images from Hazelcast enabling you to run Jet Enterprise in the leading enterprise cloud-container environments: Pivotal Cloud Foundry and Red Hat OpenShift.

Embedding Jet

Hazelcast Jet supports two modes of deployment: “embedded member”, where the JVM containing application code participates in the Hazelcast Jet cluster directly, and “client-server”, whereby a secondary JVM (which may be on the same host, or on a different one) is responsible for joining the Hazelcast Jet cluster.

Hazelcast Jet Deployment Options

The main advantage of using embedded topology is simplicity. There are no separate moving parts to manage. This applies especially when the Hazelcast Jet cluster is tied directly to the embedded application making it a great fit for microservices and OEMs.

The client-server topology brings an isolation — the Hazelcast Jet cluster will use its own resources instead of sharing it with an application.

Integration with Hazelcast IMDG

As Hazelcast Jet is built on top of the Hazelcast IMDG platform, there is a tight integration between the two.

Hazelcast Jet embeds Hazelcast IMDG. Therefore, Hazelcast Jet can use IMDG maps, caches and lists on the embedded cluster as sources and sinks of data and make use of data locality. Hazelcast Jet can also use any remote IMDG instance via an IMDG connector.

Moreover, Hazelcast Jet is able to process the stream of changes (Event Journal) of an IMap and ICache. This allows you to process the IMap/ICache data in a streaming way.

Using IMDG with Jet

The in-memory data structures of Hazelcast IMDG are handy for stream processing for:

  • Data ingestion: Use Hazelcast clients (available for all major programming languages) to load data into Hazelcast prior to processing.
  • Messaging: Connect multiple Jet jobs using the IMDG as a buffer to keep the jobs loosely coupled.
  • Caching: Cache data, such as fact tables from a remote database, in Hazelcast for fast in-memory enrichment making use of data locality.
  • Distributing processed data: Using the elastic in-memory NoSQL storage of Hazelcast with low-latency access, querying and event listeners
  • Coordination: Hazelcast offers a linearizable and distributed implementation of the Java concurrency primitives backed by the Raft consensus algorithm (locks, atomics, semaphores, latches).

Hazelcast Jet

Main Menu