Wednesday 5 September 2018

Apache Zookeeper Tutorial

ZooKeeper is a distributed co-ordination service to manage large set of hosts. Co-ordinating and managing a service in a distributed environment is a complicated process. ZooKeeper solves this issue with its simple architecture and API. ZooKeeper allows developers to focus on core application logic without worrying about the distributed nature of the application.
The ZooKeeper framework was originally built at “Yahoo!” for accessing their applications in an easy and robust manner. Later, Apache ZooKeeper became a standard for organized service used by Hadoop, HBase, and other distributed frameworks. For example, Apache HBase uses ZooKeeper to track the status of distributed data. This tutorial explains the basics of ZooKeeper, how to install and deploy a ZooKeeper cluster in a distributed environment, and finally concludes with a few examples using Java programming and sample applications.

Distributed Application

A distributed application can run on multiple systems in a network at a given time (simultaneously) by coordinating among themselves to complete a particular task in a fast and efficient manner. Normally, complex and time-consuming tasks, which will take hours to complete by a non-distributed application (running in a single system) can be done in minutes by a distributed application by using computing capabilities of all the system involved.
The time to complete the task can be further reduced by configuring the distributed application to run on more systems. A group of systems in which a distributed application is running is called a Cluster and each machine running in a cluster is called a Node.
A distributed application has two parts, Server and Client application. Server applications are actually distributed and have a common interface so that clients can connect to any server in the cluster and get the same result. Client applications are the tools to interact with a distributed application.

Benefits of Distributed Applications

  • Reliability − Failure of a single or a few systems does not make the whole system to fail.
  • Scalability − Performance can be increased as and when needed by adding more machines with minor change in the configuration of the application with no downtime.
  • Transparency − Hides the complexity of the system and shows itself as a single entity / application.

Challenges of Distributed Applications

  • Race condition − Two or more machines trying to perform a particular task, which actually needs to be done only by a single machine at any given time. For example, shared resources should only be modified by a single machine at any given time.
  • Deadlock − Two or more operations waiting for each other to complete indefinitely.
  • Inconsistency − Partial failure of data.

What is Apache ZooKeeper Meant For?

Apache ZooKeeper is a service used by a cluster (group of nodes) to coordinate between themselves and maintain shared data with robust synchronization techniques. ZooKeeper is itself a distributed application providing services for writing a distributed application.
The common services provided by ZooKeeper are as follows −
  • Naming service − Identifying the nodes in a cluster by name. It is similar to DNS, but for nodes.
  • Configuration management − Latest and up-to-date configuration information of the system for a joining node.
  • Cluster management − Joining / leaving of a node in a cluster and node status at real time.
  • Leader election − Electing a node as leader for coordination purpose.
  • Locking and synchronization service − Locking the data while modifying it. This mechanism helps you in automatic fail recovery while connecting other distributed applications like Apache HBase.
  • Highly reliable data registry − Availability of data even when one or a few nodes are down.
Distributed applications offer a lot of benefits, but they throw a few complex and hard-to-crack challenges as well. ZooKeeper framework provides a complete mechanism to overcome all the challenges. Race condition and deadlock are handled using fail-safe synchronization approach. Another main drawback is inconsistency of data, which ZooKeeper resolves with atomicity.

Benefits of ZooKeeper

Here are the benefits of using ZooKeeper −
  • Simple distributed coordination process
  • Synchronization − Mutual exclusion and co-operation between server processes. This process helps in Apache HBase for configuration management.
  • Ordered Messages
  • Serialization − Encode the data according to specific rules. Ensure your application runs consistently. This approach can be used in MapReduce to coordinate queue to execute running threads.
  • Reliability
  • Atomicity − Data transfer either succeed or fail completely, but no transaction is partial.

ZooKeeper Components

Client

Clients, one of the nodes in our distributed application cluster, access information from the server. For a particular time interval, every client sends a message to the server to let the sever know that the client is alive.
Similarly, the server sends an acknowledgement when a client connects. If there is no response from the connected server, the client automatically redirects the message to another server.
Server:
Server, one of the nodes in our ZooKeeper ensemble, provides all the services to clients. Gives acknowledgement to client to inform that the server is alive.

Ensemble:
Group of ZooKeeper servers. The minimum number of nodes that is required to form an ensemble is 3.

Leader:
Server node which performs automatic recovery if any of the connected node failed. Leaders are elected on service startup.

Follower:
Server node which follows leader instruction.


Monday 3 September 2018

Kafka Commands Cheat sheet for beginners

Topics

1) Creating a New Topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic my-topic

2) Verify the topic
kafka-topics --list --zookeeper localhost:2181

3) Adding Partitions
kafka-topics --zookeeper localhost:2181 --alter --topic my-topic --partitions 16

4) Deleting a Topic
kafka-topics --zookeeper localhost:2181 --delete --topic my-topic

5) Listing All Topics in a Cluster
kafka-topics --zookeeper localhost:2181 --list

6) Describing Topic Details
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe

7) Show Under-replicated Partitions for topics
kafka-topics --zookeeper localhost:2181/kafka-cluster --describe --under-replicated-partitions

8)  Change topic retention i.e set SLA
kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=28800000*

9) Purge a Topic
kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --delete-config retention.ms

10) Get the earliest offset still in a topic
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -2

Get the latest offset still in a topic
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1

Producers

1) Produce messages standard input
kafka-console-producer --broker-list localhost:9092 --topic my-topic

2) Produce messages file
kafka-console-producer --broker-list localhost:9092 --topic test < messages.txt

3) Produce Avro messages
kafka-avro-console-producer --broker-list localhost:9092 --topic my.Topic --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property schema.registry.url=http://localhost:8081

And enter a few values from the console:
{"f1": "value1"}

Consumers
Consume messages
1) Start a consumer from the beginning of the log
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

2) Consume 1 message
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic  --max-messages 1

3) Consume 1 message from __consumer_offsets
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter' --max-messages 1

4) Consume, specify consumer group
kafka-console-consumer --topic my-topic --new-consumer --bootstrap-server localhost:9092 --consumer-property group.id=my-group

5) Consume Avro messages
kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081 --max-messages 10

kafka-avro-console-consumer --topic position-reports --new-consumer --bootstrap-server localhost:9092 --from-beginning --property schema.registry.url=localhost:8081

Consumers admin operations
1) List Groups
kafka-consumer-groups --new-consumer --list --bootstrap-server localhost:9092

2) Describe Groups
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testgroup

Config
1) Set the retention for the topic
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

2) Show all configuration overrides for a topic
kafka-configs --zookeeper localhost:2181 --describe --entity-type topics --entity-name my-topic

3) Delete a configuration override for retention.ms for a topic
kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

Performance
Producer
kafka-producer-perf-test --topic position-reports --throughput 10000 --record-size 300 --num-records 20000 --producer-props bootstrap.servers="localhost:9092"

ACLs
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic topicA --group groupA

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic topicA

List the ACLs
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list --topic topicA

Zookeeper
Enter zookeepr shell:
zookeeper-shell localhost:2182 ls /

kafkacat
Getting the last five message of a topic
kafkacat -C -b localhost:9092 -t mytopic -p 0 -o -5 -e

Apache Kafka Tutorial | Part 1

Apache Kafka Tutorial

What is Kafka ?
A) Apache Kafka is a distributed streaming platform which is capable to handle trillions of events a day. Initially it is conceived as messaging queue.
Kafka is based on an abstraction of a distributed commit log.
As a streaming platform, Apache Kafka provides low-latency, high-throughput. fault-tolerant publish & subscribe pipelines and is able to process steams of events. Kafka provides reliable, millisecond responses to support both customer-facing applications and connecting downstream systems with real-time data.

What is Streaming platform ?
A) Streaming analytics means doing analytics in real time as the data comes in as opposed to running analytics on data that is permanently stored somewhere (such as data lake ). Many data-driven organizations that are pursuing the development of use cases like recommendations engines, predictive maintenance, or fraud detection are moving toward streaming analytics.

How Kafka will work ?
A)  Kafka’s defining feature is its scalability. But even if you don’t need the scaling (yet), there is no other solution available that matches its performance and reliability. Kafka is a distributed, partitioned, and replicated log, and it's optimized for massive throughput. Basically, it stores data in the order it comes in, and it makes these logs redundant across the nodes of the cluster. Data expires in Kafka, so you need to use it or store it elsewhere; otherwise it will eventually disappear.

Kafka serves and stores data using a publish-subscribe pattern in topics (more or less the equivalent of a folder in a file system) with built-in replication and partition. A Kafka cluster can have many topics, and each topic can be configured with different replication factors and numbers of partitions. In Kafka parlance, a producer is an inbound data connection that writes data into a topic, whereas a consumer is an outbound data connection. For example, a program that listens to IoT sensors and writes to a Kafka topic would be a producer, and an application making decisions based on this data would be a consumer.
Why Kafka ?
A) Kafka often gets used in the real-time streaming data architectures to provide real-time analytics. Since Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system, Kafka is used in use cases where JMS, RabbitMQ, and AMQP may not even be considered due to volume and responsiveness. Kafka has higher throughput, reliability and replication characteristics which make it applicable for things like tracking service calls (tracks every call) or track IOT sensors data where a traditional MOM might not be considered.
Kafka can works with Flume/Flafka, Spark Streaming, Storm, HBase, Flink and Spark for real-time ingesting, analysis and processing of streaming data. Kafka is a data stream used to feed Hadoop BigData lakes. Kafka brokers support massive message streams for low-latency follow-up analysis in Hadoop or Spark. Also, Kafka Streaming (a subproject) can be used for real-time analytics.
What are different Kafka API's ?
The Producer API allows an application to publish a stream of records to one or more Kafka topics.
The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.

Kafka use cases ?

In short, Kafka gets used for stream processing, website activity tracking, metrics collection and monitoring, log aggregation, real-time analytics, CEP, ingesting data into Spark, ingesting data into Hadoop, CQRS, replay messages, error recovery, and guaranteed distributed commit log for in-memory computing (microservices).

Who uses Kafka?

A lot of large companies who handle a lot of data use Kafka. LinkedIn, where it originated, uses it to track activity data and operational metrics. Twitter uses it as part of Storm to provide a stream processing infrastructure. Square uses Kafka as a bus to move all system events to various Square data centers (logs, custom events, metrics, and so on), outputs to Splunk, Graphite (dashboards), and to implement an Esper-like/CEP alerting systems. It gets used by other companies too like Spotify, Uber, Tumbler, Goldman Sachs, PayPal, Box, Cisco, CloudFlare, NetFlix, and much more.

Kafka has operational simplicity. Kafka is to set up and use, and it is easy to reason how Kafka works. However, the main reason Kafka is very popular is its excellent performance. It has other characteristics as well, but so do other messaging systems. Kafka has great performance, and it is stable, provides reliable durability, has a flexible publish-subscribe/queue that scales well with N-number of consumer groups, has robust replication, provides Producers with tunable consistency guarantees, and it provides preserved ordering at shard level (Kafka Topic Partition). In addition, Kafka works well with systems that have data streams to process and enables those systems to aggregate, transform & load into other stores. But none of those characteristics would matter if Kafka was slow. The most important reason Kafka is popular is Kafka’s exceptional performance.

Why is Kafka so Fast?

Kafka relies heavily on the OS kernel to move data around quickly. It relies on the principals of Zero Copy. Kafka enables you to batch data records into chunks. These batches of data can be seen end to end from Producer to file system (Kafka Topic Log) to the Consumer. Batching allows for more efficient data compression and reduces I/O latency. Kafka writes to the immutable commit log to the disk sequential; thus, avoids random disk access, slow disk seeking. Kafka provides horizontal Scale through sharding. It shards a Topic Log into hundreds potentially thousands of partitions to thousands of servers. This sharding allows Kafka to handle massive load.

Kafka: Streaming Architecture

Kafka gets used most often for real-time streaming of data into other systems. Kafka is a middle layer to decouple your real-time data pipelines. Kafka core is not good for direct computations such as data aggregations, or CEP. Kafka Streaming which is part of the Kafka ecosystem does provide the ability to do real-time analytics. Kafka can be used to feed fast lane systems (real-time, and operational data systems) like Storm, Flink, Spark Streaming and your services and CEP systems. Kafka is also used to stream data for batch data analysis. Kafka feeds Hadoop. It streams data into your BigData platform or into RDBMS, Cassandra, Spark, or even S3 for some future data analysis. These data stores often support data analysis, reporting, data science crunching, compliance auditing, and backups.

Kafka Record Retention

Kafka cluster retains all published records and if you don’t set a limit, it will keep records until it runs out of disk space. You can set time-based limits (configurable retention period), size-based limits (configurable based on size), or use compaction (keeps the latest version of record using key). You can, for example, set a retention policy of three days or two weeks or a month. The records in the topic log are available for consumption until discarded by time, size or compaction. The consumption speed not impacted by size as Kafka always writes to the end of the topic log.

Kafka Use Cases ?
  • Website activity tracking: The web application sends events such as page views and searches Kafka, where they become available for real-time processing, dashboards and offline analytics in Hadoop
  • Operational metrics: Alerting and reporting on operational metrics. One particularly fun example is having Kafka producers and consumers occasionally publish their message counts to a special Kafka topic; a service can be used to compare counts and alert if data loss occurs.
  • Log aggregation: Kafka can be used across an organization to collect logs from multiple services and make them available in standard format to multiple consumers, including Hadoop and Apache Solr.
  • Stream processing: A framework such as Spark Streaming reads data from a topic, processes it and writes processed data to a new topic where it becomes available for users and applications. Kafka’s strong durability is also very useful in the context of stream processing.
What is Topic ?
A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

For each topic, the Kafka cluster maintains a partitioned log

Each partition is an ordered, immutable sequence of records that is continually appended to—a structured commit log. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

The Kafka cluster durably persists all published records—whether or not they have been consumed—using a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem.

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.

The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.


What is Producer ?

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

What is Consumer ?

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.

More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.

The way consumption is implemented in Kafka is by dividing up the partitions in the log over the consumer instances so that each instance is the exclusive consumer of a "fair share" of partitions at any point in time. This process of maintaining membership in the group is handled by the Kafka protocol dynamically. If new instances join the group they will take over some partitions from other members of the group; if an instance dies, its partitions will be distributed to the remaining instances.

Kafka only provides a total order over records within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over records this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.