#KafkaProducer
Explore tagged Tumblr posts
daintilyultimateslayer · 2 days ago
Text
Kafka Management
LivestreamIQ
LivestreamIQ – Kafka-hosted, web-based GUI that offers intelligent alerting and monitoring tools to reduce the risk of downtime, streamline troubleshooting, surface key metrics, and accelerate issue resolution. It helps offload monitoring costs related to storing historical data and is built on Confluent’s deep understanding of data in motion infrastructure. LiveStreamIQ empowers businesses to proactively manage their Kafka infrastructure, ensuring optimal performance, reliability, and security. It is a niche product for Kafka Environment Management that provides Intelligent Alerting, Unified Notification Gateway with a scalable architecture ensuring the Messaging system is up and running as per Business critical Needs.
Our Features!
LiveStreamIQ empowers businesses to proactively manage their Kafka infrastructure, ensuring optimal performance, reliability, and security.
OUR ADDRESS
403, 4TH FLOOR, SAKET CALLIPOLIS, Rainbow Drive, Sarjapur Road, Varthurhobli East Taluk, Doddakannelli, Bengaluru Karnataka 560035
OUR CONTACTS
+91 97044 56015
0 notes
bigdataschool-moscow · 1 year ago
Link
0 notes
abhilashkrish · 6 years ago
Text
Kafka as Enterprise Service Bus [Scala]
We are modelling a system where customer consults stock price. We are using Apache Kafka as Enterprise Service Bus (ESB) which acts as the middleware.
Enterprise Service Bus (ESB) consists of taking one or more events from an event stream and applying actions over those events. The most common actions performed by ESB are:
Data Transformation
Event Handling
Protocol Conversion
Data Mapping
In our example we are performing Event Modelling through Apache Kafka as ESB.
The first step in the event modellling is to express the event in English in the following form:
Subject-verb-direct object
For this example we are modelling the event customer consults stock price
The subject in the sentence is customer
The verb in the sentence is consults
The direct object in the sentence is stock price
We represent our message in JSON format.
The sample message in JSON format is,
{ "event": "CUSTOMER_CONSULTS_STOCKPRICE", "customer": { "id": "13548310", "name": "Abhilash, Krishnan", "ipAddress": "185.86.151.11" }, "stock": { "name": "GOOG", "price": "USD" }, "timestamp": "2018-09-28T08:08:14Z" }
In our example we have Readers or Consumers, Writers or Producers and a Processing Engine.
The process flow can be depicted as:
Kafka Producer Console -> input-topic
Reader or Consumer <- input-topic
Process the received message
Writer or Producer -> output-topic
Kafka Consumer Console <- output-topic
Now let's develop the Scala application to receive messages from Kafka Producer Console and process the messages and write the messages to Kafka Consumer Console.
import java.util.Properties trait Consumer { def createConfig(servers: String): Properties def run(producer: Producer): Unit }
import java.time.Duration import java.util.{Collections, Properties} import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} class Reader(servers: String, groupId: String, topic: String) extends Consumer { val consumer: KafkaConsumer[String,String] = new KafkaConsumer[String, String](createConfig(servers)) override def createConfig(servers: String): Properties = { val config = new Properties(); config.setProperty("bootstrap.servers", servers) config.setProperty("group.id", groupId) config.setProperty("enable.auto.commit", "true") config.setProperty("auto.commit.interval.ms", "1000") config.setProperty("auto.offset.reset", "earliest") config.setProperty("session.timeout.ms", "30000") config.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") config.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") config } override def run(producer: Producer): Unit = { this.consumer.subscribe(Collections.singletonList(this.topic)) while (true) { val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofMillis(100)) records.forEach( record => { println("Read record value " + record.value() + " from topic " + topic) producer.process(record.value()) }) } } }
import java.util.Properties trait Producer { def createConfig(servers: String): Properties def process(message: String) def write(message: String) }
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} class Writer(servers: String, topic: String) extends Producer { val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](createConfig(servers)) override def createConfig(servers: String): Properties = { val config = new Properties() config.setProperty("bootstrap.servers", servers) config.setProperty("acks", "all") config.setProperty("retries", "0") config.setProperty("batch.size", "1000") config.setProperty("linger.ms", "1") config.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") config.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") config } override def process(message: String): Unit = { write(message) } override def write(message: String): Unit = { val record = new ProducerRecord[String, String](topic, message) producer.send(record) println("Write Message: " + message + " to topic " + topic) } }
object ProcessingEngine { def main(args: Array[String]): Unit = { val servers: String = "localhost:9093" val groupId: String = "stock" val sourceTopic: String = "input-topic" val targetTopic: String = "output-topic" val reader: Reader = new Reader(servers, groupId, sourceTopic) val writer: Writer = new Writer(servers, targetTopic) reader.run(writer) } }
To run this application do the following steps:
Start Zookeeper with the following command:
zookeeper-server.sh zookeeper.properties
Minimal zookeeper.properties file entries are,
# the directory where the snapshot is stored. dataDir=/kafka/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0
Start Kafka broker instance running on localhost at port 9093 with the following command:
kafka-server-start.sh server.properties
Minimal server.properties file entries are,
broker.id=1 port=9093 zookeeper.connect=localhost:2181 log.dirs=/tmp/kafka/server-1-logs offsets.topic.replication.factor=1
Start Kafka Producer console
kafka-console-producer.sh --broker-list localhost:9093 --topic input-topic
Start Kafka Consumer console
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic output-topic
Run the Scala application we have developed.
Send the following JSON message from Kafka Producer console opened in Step 3.
{ "event": "CUSTOMER_CONSULTS_STOCKPRICE", "customer": { "id": "13548310", "name": "Abhilash, Krishnan", "ipAddress": "185.86.151.11" }, "stock": { "name": "GOOG", "price": "USD" }, "timestamp": "2018-09-28T08:08:14Z" }
You will be able to see the JSON message in the Kafka Consumer console opened in Step 4.
In the next set of tutorials I will be explaing more on how we can validate the data, enrichment of messages, extracting the data, write custom serializer etc.
0 notes
vigneshwaranr · 6 years ago
Text
Digging further into KafkaProducer - Tuning batch.size and linger.ms
Hi. Several months ago, I posted a deep dive into the anatomy of Kafka Producer. (Much like what Christopher-Stoll had done to Pokemons.)
This is a follow-up to that post but focusing only on how the Producer accumulates data in memory and send it to the network. This is so that it can give you better clarity in tuning producer configurations for better scaling.
Let's get back to our curious monkey and elephant on their journey.
Master! We need more throughput from Kafka. How can we squeeze more out of it?
Hmm.. For any distributed system, it always helps to do the following:
Understand the overview of its system architecture
Have a look at every single configuration it has and spot what you can tune with safe assumptions at first and then keep tweaking by experimenting with different values.
If still not helping, then it's time to do such deep diving to find where is the bottleneck.
For Kafka, I found it helpful to tweak these three configurations:
batch.size
Kafka can accumulate multiple records of a particular topic-partition in batches and send multiple batches of records together instead of sending individual records.
batch.size sets the maximum size for a single batch.
Once this size is reached, the batch will be sent over the network if the Sender thread is free.
linger.ms
linger.ms determines how long since a batch was created must it linger in memory before being sent over the network IFF the batch hasn't reached batch.size yet.
This means we introduce a latency of at least linger.ms if the rate of producing is too slow for batches to reach batch.size within this time.
IF you have a high latency between your producer and broker nodes, you can increase linger.ms and batch.size to pile up and send bigger batches for efficient throughput.
max.request.size
This specifies the maximum size of a single request to a broker node.
Because a broker node can be leader for multiple topic-partitions, the producer will group the batches for those partitions together and send them in a single request.
This setting acts as an upper bound for a request containing multiple batches for a broker node.
That's was very informative.. .. but quite terse like all your previous explanations 😒..
Ok. I hear this complaint a lot. This time I will explain with diagrams. I tried my best to lay out the RecordAccumulator architecture.
A quick overview first
When you send a message using KafkaProducer, it stores them in RecordAccumulator in batches.
Whenever a batch reaches its batch.size, the RecordAccumulator will notify the Sender thread.
If the Sender thread is not busy sending data, it will look for batches that are ready to send (full size batches or smaller batches that have lingered longer than linger.ms)
The Sender thread will send the batch to appropriate broker node which is leader for that batch.
Configurations At Play..
In this diagram, I demonstrate a fictional situation over 60ms period in KafkaProducer for explaining purpose.
Kafka setup:
2 Topics
Topic1 has 2 partitions
Topic2 has 1 partition
2 Brokers
Broker1 is leader for Topic1-Partition1
Broker2 is leader for Topic1-Partition2 and Topic2-Partition1
Configurations:
batch.size = 15kB
linger.ms = 20ms
The RecordAccumulator holds a concurrent map where keys are TopicPartitions and values are Queues of associated batches.
Topic1-Partition1 between 0 and 30ms timelines:
Topic1-Partition1 continuously receives lot of messages (about 24kB) in this period.
Because batch.size is 15kB, it is accumulated as two batches in the queue for Topic1-Partition1
As soon as Batch1 is filled, RecordAccumulator notifies Sender thread just in case it is idle.
At 20ms timeline, the Sender is indeed idle, so it will start sending this batch over the network to its leader node Broker1.
Topic1-Partition2 between 0 and 30ms timelines:
Topic1-Partition2 receives a full batch of messages between 10 and 30ms timeline and RecordAccumulator notifies Sender that batch is full.
If the Sender had already finished sending Batch1, it'll pick up and send Batch4 to its leader Broker2
Topic2-Partition1 received only 9kB of messages so the Batch5 just lingers in memory similar to Batch2
Between 30 and 60ms timelines:
In this example, the Sender got free of things to do at 40ms timeline. So it looks around for any batches to send.
It notices that the Batch2 and Batch5 were lingering in memory longer than the configured limit of 20ms or more so it picks them up and sends them to their appropriate leaders.
I leave the possibilities of Batch3's fate to your imagination.
Wow! Now we can tweak these configurations with this understanding! Arigatou Sensei!
Great! It would also help to note the following learnings.
Measure how long it takes for your messages to be delivered between producer and consumer. If latency is negligible, you don't need to worry much.
I faced where there was high latency between Producer and Broker nodes because they are in separate faraway physical locations. The Producer frequently sent data to Broker and it hurt throughput.
To solve that, I set a large enough value for batch.size, linger.ms and max.request.size. This will make Producer wait, accumulate and send bigger bulk of data instead of being chatty. But not too large enough that it slows down everything.
Make sure the bandwith of your network is big enough to accommodate such large payload. And check your broker configurations too whether they can receive your payload and tweak as per your needs.
P.S. Influences for the extravagant style of this article come from the following :)
Head First Design Patterns
Why's (poignant) guide to ruby
Learn You a Haskell for Great Good!
Ruby Koans
0 notes
gaeraecom · 5 years ago
Link
Java Producer API인 KafkaProducer Client의 내부 구조를 설명하고, KafkaProducer의 주요 설정이 실제 내부 동작에서 어떻게 적용되는지 알려주는 글입니다.💻 #kafka
0 notes
gozealouscloudcollection · 5 years ago
Text
【Kafka】生产者客户端小结(java)
基本用法
实例化KafkaProducer
一个简单的生产端代码如下:
public class KafkaProducerDemo {
private static final …
from 【Kafka】生产者客户端小结(java) via KKNEWS
0 notes
theresawelchy · 6 years ago
Text
A Minimalist Guide to Apache Flume
Apache Flume is used to collect, aggregate and distribute large amounts of log data. It can operate in a distributed manor and has various fail-over and recovery mechanisms. I've found it most useful for collecting log lines from Kafka topics and grouping them together into files on HDFS.
The project started in 2011 with some of the earliest commits coming from Jonathan Hsieh, Hari Shreedharan and Mike Percy, all of whom either currently, or at one point, worked for Cloudera. As of this writing the code base is made up of 95K lines of Java.
The building blocks of any Flume agent's configuration is one or more sources of data, one or more channels to transmit that data and one or more sinks to send the data to. Flume is event-driven, it's not something you'd trigger on a scheduled basis. It runs continuously and reacts to new data being presented to it. This contrasts tools like Airflow which run scheduled batch operations.
In this post I'll walk through feeding Nginx web traffic logs into Kafka, enriching them using Python and feeding Flume those enriched records for storage on HDFS.
Installing Prerequisites
The following was run on a fresh Ubuntu 16.04.2 LTS installation. The machine I'm using has an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of RAM and 1 TB of mechanical storage capacity.
First I've setup a standalone Hadoop environment following the instructions from my Hadoop 3 installation guide. Below I've installed Kafkacat for feeding and reading off of Kafka, libsnappy as I'll be using Snappy compression on the Kafka topics, Python, Screen for running applications in the background and Zookeeper which is used by Kafka for coordination.
$ sudo apt update $ sudo apt install \ kafkacat \ libsnappy-dev \ python-pip \ python-virtualenv \ screen \ zookeeperd
I've created a virtual environment for the Python-based dependencies I'll be using. In it I've installed a web traffic log parser, MaxMind's IPv4 location lookup bindings, Pandas, Snappy bindings for Python and a browser agent parser.
$ virtualenv ~/.enrich $ source ~/.enrich/bin/activate $ pip install \ apache-log-parser \ geoip2 \ kafka-python \ pandas \ python-snappy \ user-agents
MaxMind's database is updated regularly. Below I've downloaded the latest version and stored it in my home folder.
$ wget -c http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz $ tar zxf GeoLite2-City.tar.gz $ mv GeoLite2-City_*/GeoLite2-City.mmdb ~/
Flume & Kafka Up & Running
Below I've installed Flume and Kafka from their respective binary distributions.
$ DIST=http://www-eu.apache.org/dist $ wget -c -O flume.tar.gz $DIST/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz $ wget -c -O kafka.tgz $DIST/kafka/1.1.1/kafka_2.11-1.1.1.tgz
I've stripped the documentation from Flume as it creates ~1,500 files. My view is that documentation should live anywhere but production.
$ sudo mkdir -p /opt/{flume,kafka} $ sudo tar xzvf kafka.tgz \ --directory=/opt/kafka \ --strip 1 $ sudo tar xvf flume.tar.gz \ --directory=/opt/flume \ --exclude=apache-flume-1.9.0-bin/docs \ --strip 1
I'll create and take ownership of the Kafka logs folder so that I can run the service without needing elevated permissions. Make sure to replace mark with the name of your UNIX account.
$ sudo mkdir -p /opt/kafka/logs $ sudo chown -R mark /opt/kafka/logs
I'll launch the Zookeeper service and for the sake of simplicity, I'll run Kafka in a screen. I recommend Supervisor for keeping Kafka up and running in production.
$ sudo /etc/init.d/zookeeper start
$ screen $ /opt/kafka/bin/kafka-server-start.sh \ /opt/kafka/config/server.properties
Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.
I'll create two Kafka topics. The first, nginx_log, will be fed the traffic logs as they were generated by Nginx. I'll then have a Python script that will parse, enrich and store the logs in CSV format in a separate topic called nginx_enriched. Since this is a standalone setup with a single disk I'll use a replication factor of 1.
$ for TOPIC in nginx_log nginx_enriched; do /opt/kafka/bin/kafka-topics.sh \ --zookeeper 127.0.0.1:2181 \ --create \ --partitions 1 \ --replication-factor 1 \ --topic $TOPIC done
Below is the configuration for the Flume agent. It will read messages off the nginx_enriched Kafka topic and transport them using a memory channel to HDFS. The data will initially live in a temporary folder on HDFS until the record limit has been reached, at which point it'll store the resulting files under a /kafka topic name/year/month/day naming convention for the folder hierarchy. The records are stored in CSV format. Later on Hive will have a table pointed at this folder giving SQL access to the data as it comes in.
$ vi ~/kafka_to_hdfs.conf
feed1.sources = kafka-source-1 feed1.channels = hdfs-channel-1 feed1.sinks = hdfs-sink-1 feed1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource feed1.sources.kafka-source-1.channels = hdfs-channel-1 feed1.sources.kafka-source-1.topic = nginx_enriched feed1.sources.kafka-source-1.batchSize = 1000 feed1.sources.kafka-source-1.zookeeperConnect = 127.0.0.1:2181 feed1.channels.hdfs-channel-1.type = memory feed1.channels.hdfs-channel-1.capacity = 1000 feed1.channels.hdfs-channel-1.transactionCapacity = 1000 feed1.sinks.hdfs-sink-1.channel = hdfs-channel-1 feed1.sinks.hdfs-sink-1.hdfs.filePrefix = hits feed1.sinks.hdfs-sink-1.hdfs.fileType = DataStream feed1.sinks.hdfs-sink-1.hdfs.inUsePrefix = tmp/ feed1.sinks.hdfs-sink-1.hdfs.path = /%{topic}/year=%Y/month=%m/day=%d feed1.sinks.hdfs-sink-1.hdfs.rollCount = 100 feed1.sinks.hdfs-sink-1.hdfs.rollSize = 0 feed1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true feed1.sinks.hdfs-sink-1.hdfs.writeFormat = Text feed1.sinks.hdfs-sink-1.type = hdfs
If you run into out of memory issues you can change the channel's type of "memory" to either "spillablememory" or "file". The Flume documentation covers how to tune these types of channels.
I'll launch the Flume agent in a screen. This is another candidate for running under Supervisor in production.
$ screen $ /opt/flume/bin/flume-ng agent \ -n feed1 \ -c conf \ -f ~/kafka_to_hdfs.conf \ -Dflume.root.logger=INFO,console
Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.
Feeding Data into Kafka
I've created a sample Nginx web traffic log file. Here are what the first three lines of content look like.
1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET / HTTP/1.1" 200 7032 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-" 1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET /theme/images/mark.jpg HTTP/1.1" 200 9485 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-" 1.2.3.4 - - [17/Feb/2019:08:41:55 +0000] "GET /architecting-modern-data-platforms-book-review.html HTTP/1.1" 200 10822 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-"
I'll feed these logs into the nginx_log Kafka topic. Each line will exist as an individual message in that topic.
$ cat access.log \ | kafkacat -P \ -b localhost:9092 \ -t nginx_log \ -z snappy
I can then check that the logs are stored as expected in Kafka.
$ kafkacat -C \ -b localhost:9092 \ -t nginx_log \ -o beginning \ | less -S
Enriching Nginx Logs
I'm going to use a Python script to read each of the log lines from Kafka, parse, enrich and store them back onto a new Kafka topic. The enrichment steps include attempting to look up the city of each visitor's IP address and parsing the user agent string into a simple browser name and version.
I've used a group identifier for consuming Kafka topics so that I can run multiple instances of this script and they can share the workload. This is handy for scaling out enrichment tasks that are bound by the compute resources of a single process.
I'll flush the newly created messages to Kafka every 500 messages. Note that this scripts expects there is always more data to push things along. If you have a finite ending to your dataset there would need to be logic in place to push the un-flushed records into Kafka.
from StringIO import StringIO import apache_log_parser import geoip2.database as geoip from kafka import (KafkaConsumer, KafkaProducer) import pandas as pd from urlparse import urlparse from user_agents import parse as ua_parse geo_lookup = geoip.Reader('GeoLite2-City.mmdb') log_format = r'%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"' line_parser = apache_log_parser.make_parser(log_format) group_id = 'nginx_log_enrichers' consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], group_id=group_id, auto_offset_reset='smallest') producer = KafkaProducer(bootstrap_servers=['localhost:9092'], retries=5, acks='all') consumer.subscribe(['nginx_log']) for msg_count, msg in enumerate(consumer): out = {} try: req = line_parser(msg.value) except apache_log_parser.LineDoesntMatchException as exc: print exc continue url_ = urlparse(req['request_url']) out['url_scheme'] = url_.scheme out['url_netloc'] = url_.netloc out['url_path'] = url_.path out['url_params'] = url_.params out['url_query'] = url_.query out['url_fragment'] = url_.fragment for key in ('remote_host', 'request_method', 'request_http_ver', 'status', 'response_bytes_clf',): out[key] = None if req.get(key, None): if type(req.get(key, None)) is bool: out[key] = req.get(key) elif len(req.get(key).strip()): out[key] = req.get(key).strip() agent_ = ua_parse(req['request_header_user_agent']) for x in range(0, 3): try: out['browser_%d' % x] = \ agent_.browser[x][0] if x == 1 else agent_.browser[x] except IndexError: out['browser_%d' % x] = None location_ = geo_lookup.city(req['remote_host']) out['loc_city_name'] = location_.city.name out['loc_country_iso_code'] = location_.country.iso_code out['loc_continent_code'] = location_.continent.code output = StringIO() pd.DataFrame([out]).to_csv(output, index=False, header=False, encoding='utf-8') producer.send('nginx_enriched', output.getvalue().strip()) if msg_count and not msg_count % 500: producer.flush()
The enriched log lines look like the following prior to being serialised into CSV format.
{'browser_0': 'Chrome', 'browser_1': 72, 'browser_2': '72.0.3626', 'loc_city_name': u'Tallinn', 'loc_continent_code': u'EU', 'loc_country_iso_code': u'EE', 'remote_host': '1.2.3.4', 'request_http_ver': '1.1', 'request_method': 'GET', 'response_bytes_clf': '7032', 'status': '200', 'url_fragment': '', 'url_netloc': '', 'url_params': '', 'url_path': '/', 'url_query': '', 'url_scheme': ''}
While the above script is running I can see the following being reported by the Flume agent.
.. kafka.SourceRebalanceListener: topic nginx_enriched - partition 0 assigned. .. hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false .. hdfs.BucketWriter: Creating /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp .. hdfs.HDFSEventSink: Writer callback called. .. hdfs.BucketWriter: Closing /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp .. hdfs.BucketWriter: Renaming /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp to /nginx_enriched/year=2019/month=02/day=20/hits.1550663242571
Setting Up Hive Tables
With the data landing in HDFS I'll create a table in Hive that will point to the CSV-formatted data. I'll also create a separate table that will hold a copy of that data in compressed, columnar form using ORC formatted-files. Presto will be used to convert the CSV-formatted data into ORC later on. Columnar form can be two orders of magnitude quicker to query and an order of magnitude smaller than row-oriented data.
CREATE EXTERNAL TABLE hits ( browser_0 STRING, browser_1 INTEGER, browser_2 STRING, loc_city_name STRING, loc_continent_code VARCHAR(4), loc_country_iso_code VARCHAR(3), remote_host VARCHAR(15), request_http_ver FLOAT, request_method VARCHAR(10), response_bytes_clf BIGINT, security_researcher STRING, status SMALLINT, url_fragment STRING, url_netloc STRING, url_params STRING, url_path STRING, url_query STRING, url_scheme STRING ) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/nginx_enriched/';
CREATE TABLE hits_orc ( browser_0 STRING, browser_1 INTEGER, browser_2 STRING, loc_city_name STRING, loc_continent_code VARCHAR(4), loc_country_iso_code VARCHAR(3), remote_host VARCHAR(15), request_http_ver FLOAT, request_method VARCHAR(10), response_bytes_clf BIGINT, security_researcher STRING, status SMALLINT, url_fragment STRING, url_netloc STRING, url_params STRING, url_path STRING, url_query STRING, url_scheme STRING ) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2)) STORED AS orc;
The data is partitioned by year, month and day on HDFS; both month and day can have leading zeros so I'll use the VARCHAR type to store them. I'll run the following to add any new partitions to the Hive metastore.
MSCK REPAIR TABLE hits; MSCK REPAIR TABLE hits_orc;
I can now check that Hive can see the existing partition.
year=2019/month=02/day=20
Converting CSVs to ORC Format
Finally, I'll convert the CSV-formatted table contents into a separate, ORC-formatted table using Presto. I've found Presto to be the fastest query engine for converting CSV data into ORC format.
$ presto \ --server localhost:8080 \ --catalog hive \ --schema default
INSERT INTO hits_orc SELECT * FROM hits;
With the data loaded into ORC format I can run aggregate queries on the dataset.
SELECT loc_city_name, COUNT(*) FROM hits_orc GROUP BY 1;
loc_city_name | _col1 ---------------+------- Tallinn | 119
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via
LinkedIn
.
DataTau published first on DataTau
0 notes
bigdataschool-moscow · 1 year ago
Link
0 notes
bigdataschool-moscow · 1 year ago
Link
0 notes
bigdataschool-moscow · 1 year ago
Link
0 notes
bigdataschool-moscow · 2 years ago
Link
0 notes
bigdataschool-moscow · 2 years ago
Link
0 notes
bigdataschool-moscow · 2 years ago
Link
0 notes
bigdataschool-moscow · 2 years ago
Link
0 notes
bigdataschool-moscow · 4 years ago
Text
Как повысить отказоустойчивость продюсера Kafka: 5 практик по настройке ТОП-10 конфигураций
Tumblr media
В этой статье поговорим про практическое обучение Apache Kafka и рассмотрим, как сделать продюсеров еще более отказоустойчивыми, чтобы улучшить общую надежность всей Big Data системы. Читайте далее про наиболее важные конфигурации продюсеров Kafka и эффективные рекомендации по их настройке.
10 самых важных параметров продюсера Apache Kafka
Из множества конфигурационных параметров для отправителей сообщений в топики Kafka – продюсеров (producer), наиболее значимыми в прикладном смысле являются следующие [1, 2]: Acks – подтверждения того, что лидер получил сообщение, прежде чем продюсер может считать запрос завершенным – нужно для контроля долговечности отправляемых записей;  replica.lag.time.max.ms – время, в течение которого лидер не удаляет follower’а из набора синхронизированных реплик (In-Sync Replicas, ISR), даже если он не отправлял никаких запросов на выборку или не использовал до конца смещения лог лидера. в Apache Kafka 7 этот параметр конфигурации относится не только ко времени, прошедшему с момента последнего запроса на выборку из реплики, но и ко времени с момента последней обработки реплики. Реплики, которые все еще получают сообщения от лидеров, но не догнали последние сообщения в течение replica.lag.time.max.ms, будут считаться рассинхронизированными.  insync.replicas - минимальное количество реплик, которые должны подтвердить запись, чтобы запись считалась успешной;  retries – разрешение повторных попыток отправить сообщения, которые ранее не удалось записать. Обычно вместо этой конфигурации пользователи задают параметр timeout.ms для управления поведением повторных попыток.  enable.idempotent – идемпотентность (свойство объекта или операции при повторном применении возвращать тот же результат, что и при первом), включается автоматически, если продюсер авторизован для определенного идентификатора транзакции (transactional.id). max.in.flight.requests.per.connection - максимальное количество неподтвержденных запросов, которые клиент отправит в одном соединении до блокировки. Когда значение этого параметра больше 1 и есть неудачные отправки, то возможно изменение порядка сообщений из-за повторных попыток, если они разрешены.  buffer.memory – общий объем памяти в байтах, который продюсер может использовать для буферизации записей, ожидающих отправки на сервер.  max.block.ms – время, в течение которого методы send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() и abortTransaction() будут блокироваться в KafkaProducer.  linger.ms – время искусственной задержки перед отправкой сообщений в топики Kafka, чтобы объединить записи в пакет;  batch.size – размер пакета, в который объединяются сообщения для записи в Kafka. Примечательно, что все эти конфигурации дают эффект не сами по себе, а тесно взаимосвязаны друг с другом. Поэтому чаще всего, настраивая один параметр, следует также изменить и другой. Как это сделать, чтобы повысить надежность продюсера Apache Kafka, мы рассмотрим далее.
Acks (acknowledges) и синхронизированные реплики
Acks (acknowledges) - это подтверждение, которое продюсер получает от брокера Kafka, чтобы гарантировать успешную передачу ему отправленного сообщения, прежде чем зафиксировать commit. Значение по умолчанию – 1 означает, что пока производитель получает подтверждение от ведущего брокера этого топика, он считает фиксацию успешной и переходит к следующему сообщению. Acks = 0 не дает никаких подтверждений о фиксации. А acks = all гарантирует, что продюсер получит подтверждения от всех синхронизированных реплик (ISR) этого топика, что обеспечивает максимальную долговечность сообщений, но требует много времени и увеличивает общую задержку. Максимальное количество реплик равно количеству брокеров в кластере Apache Kafka. Среди реплик есть лидер, который обрабатывает все запросы на чтение и запись, а последователи пассивно копируют лидера. Синхронизированная реплика полностью догоняет лидера за время, указанное в replica.lag.time.max.ms. Если брокер выйдет из строя или возникнут проблемы с сетью и последователь не сможет связаться с лидером, и через replica.lag.time.max.ms секунд этот брокер будет удален из ISR. Конфигурация min.insync.replicas определяет, сколько реплик должен получить producer, прежде чем считать фиксацию успешной. Этот параметр добавляется поверх acks = all и делает хранение сообщения более надежным за счет некоторого увеличения временной задержки. Т. е. в этом случае разработчик и администратор Apache Kafka должен искать приемлемый баланс.
Повторные попытки отправки сообщений в случае неудачи
Можно разрешить продюсеру повторно отправлять сообщения, задав конфигурации retries значение больше 0 – максимальное количество повторных попыток, которое производитель сделает, если фиксация не удалась. Например, установив retries = 5, producer попытается повторить отправку максимум 5 раз. При этом в логе продюсера не отразится количество повторных попыток, т.к. там запис��вается только факт успешной или неуспешной фиксации в общем итоге. Но заметить повторную попытку можно из лога на стороне брокера по инкременту параметра retries.
Дублирование сообщений и как его предупредить
Бывают ситуации, когда сообщение было фактически передано всем синхронизированным репликам, но брокер не смог отправить ответное подтверждение, к примеру, из-за проблем с сетью. А если вышерассмотренная конфигурация retries>0, producer будет повторно отправлять сообщения, что может привести к их дублированию в топике Kafka. Предупр��дить это позволяет семантика строго однократной доставки сообщений (exactly once), о которой мы рассказывали здесь. Это можно обеспечить с помощью идемпотентности продюсера, включив ее в конфигурации enable.idempotent. Сообщения отправляются пакетами, каждый из которых имеет порядковый номер. На стороне брокера он отслеживает наибольший порядковый номер для каждого раздела. Если приходит пакет с меньшим или равным порядковым номером, брокер не запишет его в топик Kafka, что также обеспечивает порядок.
Отправка сообщений по порядку
Еще одна важная конфигурация для обеспечения порядка - max.in.flight.requests.per.connection - количество неподтвержденных запросов, которые могут быть помещены в буфер на стороне продюсера. Если количество повторных попыток больше 1 и первый запрос завершился неудачно, но второй запрос был успешным, то первый запрос будет отправлен повторно, а сообщения будут в неправильном порядке. Однако, как мы уже отметили ранее, при значении этой конфигурации больше 1 и есть риск изменения порядка сообщений из-за повторных попыток, если они разрешены. Когда при отключенной идемпотентности продюсера нужно сохранить порядок записи сообщений, вам следует установить для параметра max.in.flight.requests.per.connection значение 1. А если включив идемпотентность через CLI-скрипт kafka-acls.sh, определять конфигурацию max.in.flight.requests.per.connection уже не нужно: фремворк сам выберет подходящие значения для этого параметра. При установке несовместимых значений будет выброшено исключение ConfigException.
Буфер памяти и быстрая отправка сообщений
Когда producer вызывает send(), сообщения не отправляются в топик Kafka немедленно, а добавляются во внутренний буфер, размер которого по умолчанию равен 32 МБ. Если продюсер отправляет сообщения быстрее, чем они могут быть переданы брокеру, или случились перебои с сетью, накопленные сообщения не вмещаются в buffer.memory. Тогда вызов send() будет заблокирован на время, указанное в конфигурации max.block.ms (по умолчанию 1 минута). Эту проблему можно решить, увеличив оба значения. А также можно настроить еще 2 конфигурации: linger.ms и batch.size. linger.ms - время задержки до того, как пакеты будут готовы к отправке. Увеличение linger.ms снижает количество запросов и повышает пропускную способность, но приводит к тому, что в памяти хранится больше сообщений. Batch.size – максимальный размер одного пакета предупреждает слишком долгое ожидание сообщений перед отправкой. Конфигурации linger.ms и batch.size дополняют друг друга: пакеты будут отправлены при достижении любого из этих 2 лимитов. Узнайте больше про администрирование кластеров Apache Kafka и разработку распределенных приложений потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: Администрирование кластера Kafka  Apache Kafka для разработчиков  Источники 1. https://towardsdatascience.com/10-configs-to-make-your-kafka-producer-more-resilient-ec6903c63e3f  2. https://kafka.apache.org/documentation/#producerapi Read the full article
0 notes
bigdataschool-moscow · 5 years ago
Text
Роль Python в мире Big Data: 5 причин освоить этот язык программирования
Tumblr media
Сегодня мы расскажем, почему каждый Big Data специалист должен знать этот язык программирования и как «Школа Больших Данных» поможет вам освоить его на профессиональном уровне. Читайте в нашей статье, кому и зачем нужны корпоративные курсы по Python в области Big Data, Machine Learning и других методов Data Science.
Чем хорош Python: 3 главных достоинства
При том, что Python считается универсальным языком программирования, который используется, в т.ч. для веб-разработки и создания специальных решений, наибольшую популярность он приобрел в области Big Data и Data Science благодаря следующим ключевым преимуществам [1]: ·       низкий порог входа из-за простоты и лаконичности даже сложных логических конструкций. Python в разы проще Java и Scala, а аналогичный код на этом языке будет намного короче; ·       множество готовых библиотек для машинного обучения и других методов искусственного интеллекта, статистических вычислений и интеллектуального анализа данных: TensorFlow, PyTorch, SKlearn, Matplotlib, Scipy, Pandas и пр.; ·       наличие Python-API в большинстве фреймворков для обработки и хранения больших данных, например, Apache Kafka, Spark, Hadoop и пр., что облегчает работу программиста Big Data решений и инженера данных. Подробнее об этом мы поговорим далее.
От администратора до аналитика больших данных: кому в Big Data нужен Python
Итак, благодаря вышеперечисленным достоинствам, Python необходим практически каждому специалисту Big Data и вот почему: ·       Data Scientist с помощью этого языка программирования может решать практически все свои профессиональные задачи, от подготовки датасета к анализу до интерпретации результатов ML-моделирования; ·       Аналитик данных имеет возможность быстро проанализировать большие объемы «сырой» информации за счет специальных библиотек и команд, например, исключить повторяющиеся значения в массиве или выявить тренды; ·       Инженер данных обеспечивает аналитика и Data Scientist’a данными, организуя конвейеры сборы, передачи и обработки информации (data pipelines) с помощью Python. В частности, можно написать собственный продюсер данных для Apache Kafka с использованием KafkaProducer API, создать скрипт обработчика потоковых распределенных данных в Apache Spark на PySpark [2] или считать данные из Hadoop HDFS посредством PyArrow [3]; ·       Разработчик распределенных приложений и других Big Data решений организует интеграцию данных и систем, используя Python API. Например, посылая логи из Apache Kafka в NoSQL-СУБД Cassandra через приложение Python [4]. ·       Администратор облачных или локальных кластеров может проверять подлинность конечных пользователей Data Lake по одному или нескольким факторов, используя приложения Python [5]. Аналогично возможна аутентификация между службами, например, в Azure Data Lake Storage [6]. Все эти нюансы рассматриваются в соответствующих курсах «Школы Больших Данных» по администрированию и разработке Big Data решений. Однако большинство наших курсов ориентировано на опытных профессионалов. Освоить все эти учебные программы без знания статистики, методов Data Mining, и навыков программирования на языках Python, Java или Scala, достаточно сложно. Чтобы восполнить этот пробел и подготовить вас к дальнейшему развитию в области технологий Big Data, мы запустили отдельный проект Python-School – специализированные курсы по языку Python в больших данных и машинном обучении. Курсы по Python ведут преподаватели-практики, специалисты по работе с большими данными в сфере Machine Learning и нейронных сетей, лично участвующие в реальных проектах Big Data и Data Science. Поэтому если вам нужны профессиональные знания и навыки Python для анализа больших данных, разработки ML-моделей и распределенных приложений, создания data pipelines или администрирования кластеров, приходите на специализированные курсы по Python в наш лицензированный учебный центр повышения квалификации и обучения ИТ-специалистов (разработчиков, архитекторов, инженеров и аналитиков Big Data) в Москве: ·       Введение в нейронные сети на Python ·       Введение в машинное обучение на Python ·       Подготовка данных для Data Mining на Python   Источники 1.       https://python-school.ru/why-you-need-python/ 2.       https://habr.com/ru/post/451160/ 3.       https://thegurus.tech/hadoop-python/ 4.       https://dzone.com/articles/data-pipeline-send-logs-from-kafka-to-cassandra 5.       https://docs.microsoft.com/ru-ru/azure/data-lake-store/data-lake-store-end-user-authenticate-python 6.       https://docs.microsoft.com/ru-ru/azure/data-lake-store/data-lake-store-service-to-service-authenticate-python Read the full article
0 notes