How to Install, Tune, and Run Apache Kafka
Tuning Guide
Apache Kafka | BareMetal or any Cloud Instance |
OS (Operating System) | Ubuntu 20.04 + Oracle Linux 8 |
Version (Kafka and Scala) | 3.2.0 |
Nodes in Kafka Cluster | 1 & 3 |
Producer VM | Larger or equal to Single Node configuration |
Note:
1. Character in red are hyperlinks 2. Characters in italics are configurations you can change on your setup 3. Use 1 ip address while using 1 node whereas while using 3 ip addresses make use of 3 node cluster configurations as per the comments in the code.
Attach a disk to the instance. After Attaching format the disk to “xfs” (or preferred filesystem)
Create a folder to install the bits
Mount the disk to /data directory created above.
Change the owner of the disk so that the kafka service can read/write to it.
Copy/Download the kafka and jdk latest do “/data” directory and untar it (use the hyper link to go to the download page and download the bits).
Instead of untar’ing the JDK and providing the path, its simpler to “apt” or “dnf” install the JDK
Apache Kafka Broker Network settings for the OS
Run the following commands:
sysctl net.ipv4.ip_local_port_range=”1024 65535 sysctl net.ipv4.tcp_max_syn_backlog=65535 sysctl net.core.rmem_max=8388607 sysctl net.core.wmem_max=8388607 sysctl net.ipv4.tcp_rmem=”4096 8388607 8388607” sysctl net.ipv4.tcp_wmem=”4096 8388607 8388607” sysctl net.core.somaxconn=65535 sysctl net.ipv4.tcp_autocorking=0
To verify the settings are applied you can run the following example for viewing net.ipv4.ip:
run “sysctl net.ipv4.ip_ local_port_range”
Copy/Download kafka and jdk to any directory and untar it.
Instead of untar’ing the JDK its easier to install the JDK via “apt” or “dnf”
Changes to the Zookeeper and the Server property files:
config/zookeeper.properties
dataDir=/data/zookeeper/data clientPort=2181 #server-1 server.1=0.0.0.0:2888:3888 server.2=10.0.1.168:2888:3888 # for 3 node cluster configurations server.3=10.0.1.58:2888:3888 # for 3 node cluster configurations #server-2 server.1=10.0.1.105:2888:3888 # for 3 node cluster configurations server.2=0.0.0.0:2888:3888 # for 3 node cluster configurations server.3=10.0.1.58:2888:3888 # for 3 node cluster configurations #server-3 server.1=10.0.1.105:2888:3888 # for 3 node cluster configurations server.2=10.0.1.168:2888:3888 # for 3 node cluster configurations server.3=0.0.0.0:2888:3888 # for 3 node cluster configurations tickTime=2000 initLimit=10 syncLimit=5
Create a myid file under each of the zookeeper server with unique id
Server 1: echo “1”> /data/zookeeper/data/myid Server 2: echo "2" > /data/zookeeper/data/myid # for 3 node cluster configurations Server 3: echo "3" > /data/zookeeper/data/myid # for 3 node cluster configurations
Config/server.properties
broker.id=0 listeners=PLAINTEXT://:9082 log.dirs=/data/kafka-logs host.name=< Host IP name> zookeeper.connect=10.0.1.105:2181,10.0.1.168:2181,10.0.1.58:2181 # Use only 1 ip address for 1 node. While using 3 ip addresses use 3 nodes. delete.topic.enable=true Num.network.threads=24 Num.io.threads=32 Socket.send.buffer.bytes=-1 Socket.receive.buffer.bytes=-1
Java Heap Changes
File: bin/kafka-server-start.sh export KAFKA_HEAP_OPTS=”-Xmx8G -Xms8G”
File: bin/zookeeper-server-start.sh export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
Note: If you do have available memory, you can increase the heap size
Copy/Download kafka and jdk to any directory and untar it.
Instead of untar’ing the JDK its easier to install the JDK via “apt” or “dnf” .
Changes to the Producer/Consumer property files:
config/producer.properties
bootstrap.servers=<Kafka Server IP>:9092 compression.type=none #The producer will wait for up to the given delay to allow other records to be send so that the sends can be batched together linger.ms=3 #The default batch size in bytes when batching multiple records sent to a partition batch.size=8192 #The total bytes of memory the producer can use to buffer records waiting to be sent to the server buffer.memory=67108864 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 acks=1
Note: Making this value to ‘0’ did degrade the latency, in some case changing this value to 5 did help with latency
config/consumer.properties
bootstrap.servers=10.0.1.105:2181,10.0.1.168:2181,10.0.1.58:2181 # Use only 1 ip address for 1 node. While using 3 ip addresses use 3 nodes. fetch.min.bytes=10240 fetch.max.wait.ms=50
Please edit the /etc/hosts file to resolve the Kafka VM hostname:
$ vi /etc/hosts <Kafta VM host IP > \t <Kafka VM hostname> Example: 10\. 0.1.25 kafkaServer
On the Kafka server
Command: bin/zookeeper-server-start.sh config/zookeeper.properties
Command: bin/kafka-server-start.sh config/server.properties
Command: bin/kafka-topics.sh --create --topic psrtest --partitions 16 --replication-factor 3 --config retention.ms=86400000 --config min.insync.replicas=2 –bootstrap-server
On the Producer/Consumer VM
Command: bin/kafka-producer-perf-test.sh --topic psrtest --num-records 100000000 --throughput -1 --producer.config config/producer.properties --print-metrics –record-size 100.
Command: bin/kafka-consumer-perf-test.sh --bootstrap-server
Recommended Testing Methodology / Benchmark Measure
Run 4 Producers against the Kafka cluster and collect the Mbps or RPS output of each Producer and present the “Sum” of the values.
Run 1 Producer against the Kafka cluster and collect the Avg and 99th Percentile Latency from the output.
Sample Output
1000000 records sent, 16285.584001 records/sec (139.78 MB/sec), 449.00 ms avg latency, 2207.00 ms , max latency, 410 ms 50th, 982 ms 95th, 1356 ms 99th, 2013 ms 99.9th.
Apache Kafka is an open-source distributed streaming platform that provides a highly scalable and fault-tolerant system for handling real-time data streams. Developed by the Apache Software Foundation, Kafka is designed to efficiently handle high volumes of data, making it an ideal solution for use cases such as event sourcing, real-time analytics, log aggregation, and messaging systems.
At its core, Kafka follows a publish-subscribe model, where data is organized into topics, and producers publish messages to these topics, while consumers subscribe to specific topics to receive and process the data. The architecture of Kafka is distributed, allowing it to handle massive data streams across multiple nodes or clusters, ensuring high availability and fault tolerance.
Kafka's design emphasizes durability and reliability, as it stores data in a fault-tolerant manner, using distributed commit logs and replication mechanisms. This ensures that data is persisted and available even in the event of hardware failures or system crashes.
One of Kafka's key features is its ability to process data in real-time, enabling low-latency data streaming and continuous data processing. It also provides strong support for data integration, offering connectors for various data systems and frameworks, allowing seamless integration with existing data pipelines and applications.
With its robust architecture, scalability, fault tolerance, and real-time processing capabilities, Apache Kafka has become a popular choice for building modern data-intensive applications, enabling organizations to efficiently handle and process large volumes of data streams in a reliable and scalable manner.