Don't miss the upcoming webinar: Building Real-Time Data Pipelines with a 3rd Generation Stream Processing Engine - sign up now!

Introduction to Hazelcast Jet

Ultra Fast Stream and Batch Processing

Hazelcast Jet is an application embeddable, distributed computing engine built on top of Hazelcast In-Memory Data Grid (IMDG). With Hazelcast IMDG providing storage functionality, Hazelcast Jet performs parallel execution to enable data-intensive applications to operate in near real-time.

Hazelcast Jet Vertex and Sink diagram

Fast Big Data

Data streams are potentially unbounded and infinite sequences of events. Typical application use cases include online trades, sensor updates in internet-of-things (IoT) architectures, system log events, in-store e-commerce systems and social media platforms. In these application examples, processing data quickly is as important as processing large volumes of data. Hazelcast Jet processing tasks are distributed across a Jet cluster to parallelize computation. Therefore, Jet is able to scale and process big data volumes.

Hazelcast Jet is built on top of a low latency streaming core. Hazelcast Jet processes incoming records as soon as possible, opposed to accumulating records into micro-batches, consequently lowering latency for applications. As Hazelcast IMDG is embedded in Hazelcast Jet, all the services of an IMDG are available for Jet jobs without any additional deployment effort.

Hazelcast Jet starts fast, scales automatically, handles failures, and communicates with the data processing pipeline using asynchronous messages. There are numerous use cases that benefit from processing data fast including:

  • Real-time analytics
  • In-store e-commerce systems
  • Complex event processing
  • Monitoring and stats
  • Social media platforms
  • IoT data ingestion, processing and storage
  • Keeping systems of record in sync
  • Data processing microservice architectures
  • Implementing Event Sourcing and CQRS (maintaining derived data, “materialised views”)

This introduction will cover the techniques Hazelcast Jet uses to process data at scale in near real-time:


Why Hazelcast Jet?

There are plenty of solutions which can solve some of these issues, so why choose Hazelcast Jet?

When speed and simplicity is important

Hazelcast Jet gives you all the infrastructure you need to build a distributed data processing pipeline within one 10Mb Java JAR: processing, storage and clustering.

As it is built on top of Hazelcast IMDG, Hazelcast Jet comes with in-memory operational storage that’s available out-of-the box. This storage is partitioned, distributed and replicated across the Hazelcast Jet cluster for capacity and resiliency. It can be used as an input data buffer, to publish the results of a Hazelcast Jet computation, to connect multiple Hazelcast Jet jobs or as a lookup cache for data enrichment. See Integration with Hazelcast IMDG.

Hazelcast Jet Features

You can still use other storage such as HDFS, Kafka or even another remote IMDG for data input and output. However, you don’t have to. Keep it simple.

The more parts of the data pipeline which are kept in-memory, the bigger performance gain Hazelcast Jet provides making use of a data locality. See performance to understand the secret sauce.

Hazelcast Jet Word Count Benchmark

In Hazelcast Jet cluster set up is simple. When a Hazelcast Jet node starts it discovers the running Hazelcast Jet cluster automatically and joins it. Running computations are up-scaled to make use of the new computing resource. If a node fails, a fault tolerance mechanism kicks in and computations keep running.

Hazelcast Jet provides all of these features in a single Java library. Embed it into your application to simplify distribution, or run it from the command line. All of this literally works out-of-the-box. Try it!

Hazelcast Jet… Distributed Computing, Simplified.

Streaming Execution Model

Hazelcast Jet uses a stream processing paradigm to enable low latency and a continuous stream of results.

Once started, the streaming application runs continuously until terminated. When it sees a new event on an input, it processes it immediately. Hazelcast Jet can achieve millisecond latencies for millions of events/gigabytes per second with streaming as there is no need to start new processes or “batch” the data before processing.

Hazelcast Jet applies a streaming execution model for both bounded (often referred to as “batch” workloads) and unbounded (infinite streams) data.

Fault Tolerance and Elasticity

One challenge when streaming is fault tolerance. The streaming application usually has a state that’s changed continuously by the data flowing through it, e.g. offset of last item read from the stream, rolling state of the computation. This state has to stay consistent even in the case of a failure.

A Hazelcast Jet job keeps its state in-memory to maintain low latency.

The job state is periodically saved using a distributed state snapshot algorithm. The snapshots can be created to support exactly-once or at-least-once processing to trade-off between correctness and performance.

In Hazelcast Jet snapshots are distributed across the cluster and held in multiple replicas to provide redundancy. Hazelcast Jet is able to tolerate multiple faults such as node failure, network partition or job execution failure. Snapshots are periodically created and backed up. If there is a node failure Jet uses the latest state snapshot and automatically restarts all jobs that contain the failed node as a job participant.

The same approach can be used to up-scale the cluster: when a new node joins the cluster, running jobs can be restarted from the last snapshot using the new cluster topology.

Hazelcast Jet uses Hazelcast IMaps to store job metadata and snapshots. Note, IMap is replicated across the cluster, so the state remains available in the case of failure. Moreover, no additional infrastructure, such as distributed file system, is necessary.

Windowing and Late Events

The typical way to deal with unbounded data is to look at it in terms of “windows”, where a window represents a slice of the data stream defined by time or count. This allows you to run grouping operations such as sum or order by on an infinite input. Types of windowing supported by Hazelcast Jet can be summarized as follows:

  • Fixed/Tumbling, time is partitioned into same-length, non-overlapping chunks. Each event belongs to exactly one window.
  • Sliding, windows have fixed length, but are separated by a time interval (step) which can be smaller than the window length. Typically the window interval is a multiplicity of the step.
  • Session, windows have various sizes and are defined based on data, which should carry some session identifiers.
Windowing and Late Events diagram

When dealing with time series data, users can simply timestamp events as they arrive (referred to as processing time) or use the timestamps contained within each event itself (referred to as event time). In both cases, the timestamp is used to assign events into windows.

As events can arrive out of order, the user needs to take this into account when using event time. Crucially, a decision needs to be taken on how long to wait for late data before the window can be closed and sent to the downstream computation.

Windowing and Late Events diagram 2

Hazelcast Jet takes a simple approach and strictly triages events into “still on time” and “late”, discarding the latter. It uses heuristics combined with an allowed lateness setting.

Implementing Data Pipelines in Jet

The general shape of any data processing pipeline is draw From Source -> transform -> drain To Sink. The Pipeline API in Hazelcast Jet allows users to implement computations by linking the respective transforms, then connecting those to the upstream data sources and downstream sinks.

The Pipeline API is a Java 8 API. It contains a set of transform, source and sink connectors which are used in the pipeline (see full list). The library can be extended by custom transforms, sources or sinks.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.fileWatcher(tempDir.toString()))
         .map(LogLine::parse)
         .filter(line -> line.getResponseCode() >= 200 && line.getResponseCode() < 400)
         .addTimestamps(LogLine::getTimestamp, 1000)
         .window(WindowDefinition.sliding(10_000, 1_000))
         .groupingKey(LogLine::getEndpoint)
         .aggregate(AggregateOperations.counting())
         .drainTo(Sinks.logger());

This pipeline continuously parses the HTTP server log files and counts accesses per resource. See AccessStreamAnalyzer.java

In a more complex scenario, you’ll have several sources, each starting its own pipeline branch. Then you can join them. Symmetrically, you can fork the pipeline to output to more than one destination.

For more examples, see demos.

Parallel Pipeline Execution

After the Pipeline is submited for execution, it’s translated into a DAG representing the data flow and is executed by the distributed computation engine of Hazelcast Jet. The DAG is replicated to the whole Hazelcast Jet cluster and executed in parallel on each member.

Jet Distribution

Each operator in the pipeline can specify how many parallel instance will run per cluster member; every member will have the same number of parallel instances – Processors.

Job execution is done on a user-configurable number of threads which use work stealing to balance the amount of work being done on each thread. Each worker thread has a list of processors that it is in charge of, and as the processors complete at different rates, the remaining ones are moved between workers to keep the load balanced.

Jet Parallelism Model

This approach, called cooperative multi-threading, is one of the core features of Hazelcast Jet and can be roughly compared to green threads; each processor can do a small amount of work each time it is invoked, then yield back to the Hazelcast Jet engine. The engine manages a thread pool of fixed size, and on each thread the instances take their turn in a round-robin fashion.

The purpose of cooperative multi-threading is to improve performance. Several factors contribute to this:

  • The overhead of context switching is much lower since the operating system’s thread scheduler is not involved.
  • The worker thread driving the instances stays on the same core for longer periods, preserving the CPU cache lines.

Clustering

There are plenty of cluster and resource management tools available. Hazelcast Jet isn’t associated with any of them, or tries to replicate what they do. Instead, Hazelcast Jet focuses on issues relating to running jobs and dealing with the topology changes. Cluster resource allocation is left to a tool of your choice.

Hazelcast Jet automatically discovers new nodes joining the cluster and rescales running jobs to use all the resource available. The nodes in a Hazelcast Jet cluster are symmetrical – there is no dedicated master node or worker node. The Hazelcast Jet cluster assigns these roles dynamically. Roles may change over time as the cluster topology changes.

Delegating resource management to different layers allows you to do simple deployments where you simply launch Jet nodes manually by copying necessary JARs and running Jet from a command line. You can also use an orchestrator such as Ansible.

To make your infrastructure more dynamic and automated, you can deploy Hazelcast Jet to resource managers such as YARN, Mesos, Kubernetes or Pivotal Cloud Foundry.

Embedding Jet

Hazelcast Jet supports two modes of deployment: “embedded member”, where the JVM containing application code participates in the Hazelcast Jet cluster directly, and “client-server”, whereby a secondary JVM (which may be on the same host, or on a different one) is responsible for joining the Hazelcast Jet cluster.

Hazelcast Jet Deployment Options

The main advantage of using embedded topology is simplicity. There are no separate moving parts to manage. This applies especially when the Hazelcast Jet cluster is tied directly to the embedded application making it a great fit for microservices and OEMs.

The client-server topology brings an isolation — the Hazelcast Jet cluster will use its own resources instead of sharing it with an application.

Integration with Hazelcast IMDG

As Hazelcast Jet is built on top of the Hazelcast IMDG platform, there is a tight integration between the two.

Hazelcast Jet embeds Hazelcast IMDG. Therefore, Hazelcast Jet can use IMDG maps, caches and lists on the embedded cluster as sources and sinks of data and make use of data locality. Hazelcast Jet can also use any remote IMDG instance via an IMDG connector.

Moreover, Hazelcast Jet is able to process the stream of changes (Event Journal) of an IMap and ICache. This allows you to process the IMap/ICache data in a streaming way.

Using IMDG with Jet

Hazelcast Jet

Main Menu