# Apache Kafka "A high-throughput distributed messaging system." [site](http://kafka.apache.org/) Notes taken from [source](http://www.slideshare.net/miguno/apache-kafka-08-basic-training-verisign) ## Overview * Created at LinkedIn (open sourced in 2011) * Implemented in scala and some java Design Requirements * High throughput to support high volume event feeds * Support real-time processing of these feeds to create new, derived feeds * Support large data backlogs to handle periodic ingestion from offline systems * Support low-latency delivery to handle more traditional messaging use cases * Guarantee fault-tolerance in the presence of machine failure Writes * writes go to the page cache of OS, i.e. RAM Reads * direct transfer from page cache to network socket with `sendfile`, avoids coping data into kafka application for sending * A healthy kafka cluster will show mimimal read activity to disk as a is served primarily from cache. LinkedIn Usage Stats * 300+ brokers * 18K+ topics * 140k+ partitions * 220 bil messages per day * 40 TB in * 160 TB out * peak 3.25 mil msg/sec * 5.5 Gbit/sec in * 18 Gbit/sec out General Use Case * kafka + (Storm, Samza, Spark Streaming, homegrown) ## Who uses * LinkedIn: activity streams, ops metrics, data bus * Netflix: real-time monitoring, event processing * Twitter: w/ storm for real-time data pipelines * Spotify: log delivery to hadoop * loggly: log collection and processing * Mozilla: telemetry data ## Architecture * kafka brokers handle reads and writes * Hierarchy (Topic -> Partition -> Replication) * ZooKeeper manages and shares state for brokers and consumers (brokers only in kafka v0.9) * Topics/Partitions are append only and immutable sequence * Length of Topics/Partitions are governed by `age`, `max size`, or `key` * Offset determines location in the queue (monotonically increasing integer) * Consumers use the offset to track position in the queue * Replicas are solely for data loss prevention (never read from or written to) * Servers with large RAM used for serving data from cache (LinkedIn 64GB = 60 for cache and 4 for brokers) * RAID10 with 14 spindles * more spindles -> higher disk throughput * Cache on RAID with battery backup * 1 Gig ethernet * zookeeper servers -> run **only** zookeeper, 1 zookeeper per host * recommend SSDs * kafka clusters don't span data centers (latency to zookeeper servers?) * zookeeper cluster tolerates n/2 - 1 failures * LinkedIn runs 5 node ensembles * Twitter runs 13 node ensembles ## Interface ### Topic Creation cli ```bash kafka-topics.sh --zookeeper 1.2.3.4:2181 --create --topic topic.name \ --partitions 3 --replication-factor 2 --config x=y ``` auto create ``` auto.create.topics.enable = true ``` ### Current Topics ```bash kafka-topics.sh --zookeeper 1.2.3.4:2181 --describe --topic topic.name ``` ## Producers * Synchronous or Asynchronous * Sync blocks client on `send()` * Async sends message in background * Allows for batching of messages * Pools of Sync producers * Possible to drop messages if queue is full * Write Message ack * message considered committed when "any required" in-sync replicas for that partition have applied the message to their data log * "any required" defined by producers in `request.required.acks` * `0` - fire and forget (no ack required) * `1` - wait for leader to ack * `-1` - wait for all ISR (in-sync replicas) ack * only committed messages are given to consumers * `request.timeout.ms` if too low can send error back to the client and then an ack from a broker * batch writes * higher risk for data loss if client dies before pending messages sent * default partitioner selects based on hash of key * so default behavior, kafka guarantees that all data for the same key will go to the same partition * if key is not selected, client will push to a partition **and stick to it** for a random period of time then switch to another random partition. * key is retained in the message in the broker * current list of topic/partition leaders provided to producers by brokers `cp.metadata.broker.list` ## Brokers * Some brokers are designated for "bootstrap" * At least 1 "bootstrap" broker is required up or producers will stop working * Recommended to use VIP for "bootstrap" brokers ## Consumers * storm 0.9.2 uses simple consumer API to integrate well with storm's model of guaranteed message processing * Consumers pull from kafka * Consumers are responsible for tracking their offset position * High level consumer: stores the offsets is zookeeper * Simple consumer: responsible for tracking offset * Consumers can rewind time back to the max time or max size of the topic/partition * Consumer can select subset of a topic's partitions to consume * config: * `group.id` assigns consumer to a "group" * `zookeeper.connect` broker/topic/etc discovery * ``: ## Monitoring * [System Tools](https://cwiki.apache.org/confluence/display/KAFKA/System+Tools) * [Replication tools](https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools) * [stormkafkamon](https://github.com/otoolep/stormkafkamon) * [JMX](https://kafka.apache.org/documentation.html#monitoring) * [dropwizard/metrics](https://github.com/dropwizard/metrics) * garbage collection time * data size on disk should be balanced across brokers/disks * data balance is more important than partition balance * Leader partition count should be balanced (avoid hot node since all reads/writes go against the associated leader topic/partition) * track network utilization * common cause for under-replicated partition * request latency (if on SSDs should be esentially <1 ms) * outstanding requests (increasing means items backing up) * LinkedIn working on adding "Auditing" functionality * not yet released ## Performance Turning * `vm.swappiness = 0` * Allow more dirty pages but less dirty cache `vm.dirty_*_ratio` * Lengthen flush interval (linked in uses 120s !? They can tolerate 2 mins worth of data loss with replicas) * Additional spindles (RAID10 with 14 disks) * `num.io.threads`: should be >= #disks (initial to # disks) * `num.network.threads`: adjust based on concurrent # producers, consumers, replication factor * Compression * snappy and gzip are supported. Snappy recommended. Lower compression, but not as cpu intensive. ## Ops * [FAQ](https://cwiki.apache.org/confluence/display/KAFKA/FAQ) * [Operations](https://kafka.apache.org/documentation.html#operations) * [Replication tools](https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools) ## Tutorials * [Preducer Example](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example) * [Multi-broker on a single node](http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/) * [Synchronous Producers](https://kafka.apache.org/documentation.html) (blocks on send) ## Tales of Caution ### General * Don't break up into separate topics unless data is truly independent * Keep time related messages in the same partition * Async producers can drop messages send queue is full (defualt queue.buffering.max.messages 10,000) * producer `request.timeout.ms` if too low can send error back to the client and then an ack from a broker * Use VIPs for bootstrap broker list to help with up/down of "bootstrap" brokers `metadata.broker.list` ### Aphyr - Call Me Maybe blog [kafka post](http://aphyr.com/posts/293-call-me-maybe-kafka) ### Security * Kafka was not designed originally with security in mind * june 2014 adding security features (TLS, data encryption at rest) ### Common Initial Issues * Garbage collection * suggest "G1 garbage first" gc * Educating and coaching on kafka use * Expanding/reducing size of kafka cluster * Monitoring consumer apps ("My stuff stopped working, what did kafka do?") * Consumer lag * consumers too slow * too much GC * Loss of connection to zookeeper servers or brokers * bug or design flaw * Rebalancing * Multiple new nodes trigger repeated rebalancing that hit kafka's rebalancing limit `cf.rebalance.max.retries` (default 4) * New script in v0.8.1 to balance data/partitions across brokers * Problems with partition healing (CAP) ## Questions * Measure replication lag? * Does kafka fsync on each write? if not flush interval? * Election algorithm * How do you ensure unique broker ids in automated environment? * Measure health of zookeeper server * review puppet-kafka recipe * What happens to consumer that restarts and looses offset value, start at beginning or end of queue?