際際滷

際際滷Share a Scribd company logo
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
KStreamBuilder builder =
new KStreamBuilder();
KStreamBuilder builder =
new KStreamBuilder();
KStream<String, String> textLines =
builder.stream(inputTopic);
KStreamBuilder builder =
new KStreamBuilder();
KStream<String, String> textLines =
builder.stream(inputTopic);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> toWords(value))
.groupBy((key, word) -> word)
.count("Counts");
KStreamBuilder builder =
new KStreamBuilder();
KStream<String, String> textLines =
builder.stream(inputTopic);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> toWords(value))
.groupBy((key, word) -> word)
.count("Counts");
wordCounts.toStream().to(outputTopic);
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
KStreamBuilder builder =
new KStreamBuilder();
KStreamBuilder builder =
new KStreamBuilder();
StateStoreSupplier store =
Stores.create(Store")
.persistent()
.build();
KStreamBuilder builder =
new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
KStreamBuilder builder =
new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",
new ProcessorSupplier(store.name()),
@Override public void process(byte[] key, String value) {
Optional<Long> count =
Optional.ofNullable(stateStore.get(value));
Long incrementedCount = count.orElse(0L) + 1;
stateStore.put(value, incrementedCount);
processorContext.forward(value, incrementedCount);
processorContext.commit();
}
KStreamBuilder builder =
new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",
new ProcessorSupplier(store.name()),
.addStateStore(wordCountsStore, "WORDS_COUNT")
KStreamBuilder builder =
new KStreamBuilder();
builder.addSource("SOURCE", inputTopic)
.addProcessor("WORDS_COUNT",
new ProcessorSupplier(store.name()),
.addStateStore(wordCountsStore, "WORDS_COUNT")
.addSink("OUTPUT", outputTopic, "WORDS_COUNT");
elasticity and scalability
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Local/Remote state in
stream processing
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)
Kafka streams - power without weight (jeeconf)

More Related Content

Kafka streams - power without weight (jeeconf)

  • 26. KStreamBuilder builder = new KStreamBuilder();
  • 27. KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(inputTopic);
  • 28. KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(inputTopic); KTable<String, Long> wordCounts = textLines .flatMapValues(value -> toWords(value)) .groupBy((key, word) -> word) .count("Counts");
  • 29. KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(inputTopic); KTable<String, Long> wordCounts = textLines .flatMapValues(value -> toWords(value)) .groupBy((key, word) -> word) .count("Counts"); wordCounts.toStream().to(outputTopic);
  • 35. KStreamBuilder builder = new KStreamBuilder();
  • 36. KStreamBuilder builder = new KStreamBuilder(); StateStoreSupplier store = Stores.create(Store") .persistent() .build();
  • 37. KStreamBuilder builder = new KStreamBuilder(); builder.addSource("SOURCE", inputTopic)
  • 38. KStreamBuilder builder = new KStreamBuilder(); builder.addSource("SOURCE", inputTopic) .addProcessor("WORDS_COUNT", new ProcessorSupplier(store.name()), @Override public void process(byte[] key, String value) { Optional<Long> count = Optional.ofNullable(stateStore.get(value)); Long incrementedCount = count.orElse(0L) + 1; stateStore.put(value, incrementedCount); processorContext.forward(value, incrementedCount); processorContext.commit(); }
  • 39. KStreamBuilder builder = new KStreamBuilder(); builder.addSource("SOURCE", inputTopic) .addProcessor("WORDS_COUNT", new ProcessorSupplier(store.name()), .addStateStore(wordCountsStore, "WORDS_COUNT")
  • 40. KStreamBuilder builder = new KStreamBuilder(); builder.addSource("SOURCE", inputTopic) .addProcessor("WORDS_COUNT", new ProcessorSupplier(store.name()), .addStateStore(wordCountsStore, "WORDS_COUNT") .addSink("OUTPUT", outputTopic, "WORDS_COUNT");

Editor's Notes

  1. Requestresponse, orrequestreply, is one of the basic methods computers use to communicate with each other, in which the first computer sends a request for some data and the second computer responds to the request. Usually, there is a series of such interchanges until the complete message is sent; browsing aweb pageis an example of requestresponse communication. Important thing to remember that such application possible serve only future requests.
  2. Important thing to remember that such application possible serve only historical data.
  3. Astreamis the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set, where unbounded means of unknown or of unlimited size. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where adata recordis defined as a key-value pair.
  4. The whos who:Kafka distinguishesproducers,consumers, andbrokers. In short, producers publish data to Kafka brokers, and consumers read published data from Kafka brokers. Producers and consumers are totally decoupled. A Kafkaclusterconsists of one or more brokers. The data:Data is stored intopics. The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or morepartitions, which are replicated across Kafka brokers for fault tolerance. Parallelism:Partitions of Kafka topics, and especially their number for a given topic, are also the main factor that determines the parallelism of Kafka with regards to reading and writing data. Because of their tight integration the parallelism of Kafka Streams is heavily influenced by and depending on Kafkas parallelism.
  5. For each topic, the Kafka cluster maintains a partitioned log that looks like this: Each partition is an ordered, immutable sequence of records that is continually appended toa structured commit log. The records in the partitions are each assigned a sequential id number called theoffsetthat uniquely identifies each record within the partition.
  6. The Kafka cluster retains all published recordswhether or not they have been consumedusing a configurable retention period. For example, if the retention policy is set to two days, then for the two days after a record is published, it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so storing data for a long time is not a problem. In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now". This combination of features means that Kafka consumers are very cheapthey can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers. The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelismmore on that in a bit.
  7. First option is to process streams if you have Kafka its do it yourself no frameworks, no libraries. Consume some messages from kafka Process Produce output
  8. Using the Kafka APIs directly works well for simple things. It doesnt pull in any heavy dependencies to your app. We called this hipster stream processing since it is a kind of low-tech solution that appealed to people who liked to roll their own. This works well for simple one-message-at-a-time processing, but the problem comes when you want to do something more involved, say compute aggregations or join streams. In this case inventing a solution on top of the Kafka consumer APIs is fairly involved.
  9. Its hard, you need to deal with a lot of things and reinvent the wheel. Ordering? Partitioning? Fault tolerance? State management? Window operations? How to reprocess data?
  10. Pulling in a full-fledged stream processing framework gives you easy access to these more advanced operations. But the cost for a simple application is an explosion of complexity. This makes everything difficult, from debugging to performance optimization to monitoring to deployment. This is even worse if your app has both synchronous and asynchronous pieces as then you end up splitting your code between the stream processing framework and whatever mechanism you have for implementing services or apps. Its just really hard to build and operationalize a critical part of your business in this way. This isnt such a problem in all domainsafter all, if you are already using Spark to build a batch workflow, and you want to add a Spark Streaming job into this mix for some real-time bits, the additional complexity is pretty low and it reuses the skills you already have. However if you are deploying a Spark cluster for the sole purpose of this new application, that is definitely a big complexity hit.
  11. Pulling in a full-fledged stream processing framework gives you easy access to these more advanced operations. But the cost for a simple application is an explosion of complexity. This makes everything difficult, from debugging to performance optimization to monitoring to deployment. This is even worse if your app has both synchronous and asynchronous pieces as then you end up splitting your code between the stream processing framework and whatever mechanism you have for implementing services or apps. Its just really hard to build and operationalize a critical part of your business in this way. This isnt such a problem in all domainsafter all, if you are already using Spark to build a batch workflow, and you want to add a Spark Streaming job into this mix for some real-time bits, the additional complexity is pretty low and it reuses the skills you already have. However if you are deploying a Spark cluster for the sole purpose of this new application, that is definitely a big complexity hit.
  12. With kafka streams it looks much simple The inputs and outputs are just Kafka topics The data model is just Kafkas keyed record data model throughout The partitioning model is just Kafkas partitioning model, a Kafka partitioner works for streams too The group membership mechanism that manages partitions, assignment, and liveness is just Kafkas group membership mechanism Tables and other stateful computations are just log compacted topics. Metrics are unified across the producer, consumer, and streams app so there is only one type of metric to capture for monitoring The position of your app is maintained by the applicationsoffsets, just as any Kafka consumer er.
  13. core abstractions in Kafka to be the primitives for stream processing we wanted to be able to give something that provides you what you would get out of a stream processing framework, but which has very little additional operational complexity beyond the normal KafkaproducerandconsumerAPIs. In other words we were aiming for something like this
  14. Kafka Streams has astrong focus on usability and a great developer experience. It offers all the necessary stream processing primitives to allow applications to read data from Kafka as streams, process the data, and then either write the resulting data back to Kafka or send the final output to an external system. Developers can choose between ahigh-level DSLwith commonly used operations likefilter,map,join, as well as alow-level APIfor developers who need maximum control and flexibility.
  15. Designed as alightweight library in Apache Kafka, much like theKafka producer and consumer client libraries. You can easily embed and integrate Kafka Streams into your own applications, which is a significant departure from framework-based stream processing tools that dictate many requirements upon you such as how you must package and submit processing jobs to their cluster. Hasno external dependencies on systems other than Apache Kafkaand can be used in any Java application. Read: You do not need to deploy and operate a separate cluster for your stream processing needs. Your Operations and Info Sec teams, among others, will surely be happy to hear this. LeveragesKafka as its internal messaging layerinstead of (re)implementing a custom messaging layer like many other stream processing tools. Notably, Kafka Streams uses Kafkas partitioning model to horizontally scale processing while maintaining strong ordering guarantees. This ensures high performance, scalability, and operational simplicity for production environments. A key benefit of this design decision is that you do not have to understand and tune two different messaging layers one for moving data streams at scale (Kafka) plus a separate one for your stream processing tool. Similarly, any performance and reliability improvements of Kafka will automatically be available to Kafka Streams, too, thus tapping into the momentum of Kafkas strong developer community.
  16. Employsone-record-at-a-time processingto achieve low processing latency, which is crucial for a variety of use cases
  17. Lets illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time and different revisions of the table can be represented as a changelog stream (second column).
  18. Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
  19. Figure 1: Before adding capacity, only a single instance of your Kafka Streams application is running. At this point the corresponding consumer group of your application contains only a single member (this instance). All data is being read and processed by this single instance.
  20. After adding capacity, two additional instances of your Kafka Streams application are running, and they have automatically joined the applications consumer group for a total of three current members. These three instances are automatically splitting the processing work between each other. The splitting is based on the Kafka topic partitions from which data is being read.
  21. If one of the application instances is stopped (e.g. intentional reduction of capacity, maintenance, machine failure), it will automatically leave the applications consumer group, which causes the remaining instances to automatically take over the stopped instances processing work. how many instances can or should you run for your application? Is there an upper limit for the number of instances and, similarly, for the parallelism of your application? In a nutshell, the parallelism of a Kafka Streams application -- similar to the parallelism of Kafka -- is primarily determined by the number of partitions of the input topic(s) from which your application is reading. For example, if your application reads from a single topic that has 10 partitions, then you can run up to 10 instances of your applications (note that you can run further instances but these will be idle)
  22. A common pattern is for the stream processing job to take input records from its input stream, and for each input record make a remote call to a distributed database. The input stream is partitioned over multiple processors, each of which query a remote database. And, of course, since this is adistributeddatabase, it is itself partitioning over multiple machines. One possibility is toco-partitionthe database and the input processing, and then move the data to be directly co-located with the processing
  23. Interactive Queries enables faster and more efficient use of the application state. Data is local to your application (in memory or possibly on SSDs); you can access it very quickly. This is especially useful for applications that need to access large amounts of application state, e.g., when they dojoins. There is no duplication of data between the store doing aggregation for stream processing and the store answering queries. What do you need to do as a developer to make your Kafka Streams applications queryable? It turns out that Kafka Streams handles most of the low-level querying, metadata discovery and data fault tolerance for you. Depending on the application, you might be able to query straight out-of-the box with zero work (see local stores section below), or you might have to implement a layer of indirection for distributed querying We start simple with a single apps instance. That instance can query its own local state stores out-of-the-box. It can enumerate all the stores that are in that instance (in any processor node) as well as query the values of keys in those stores. This is simple and useful for apps that have a single instance, however apps can have multiple instances running in potentially different servers.Kafka Streams will partition up the data amongst these instances to scale out the capacity. If I want to get the latest value for key stock X and that key is not in my instance, how do I go about finding it?
  24. An application might have multiple instances running, each with its own set of state stores. We have made each instance aware of each other instances state stores through periodic metadata exchanges, which we provide through Kafkas group membership protocol. Starting with Confluent Platform 3.1 and Apache Kafka 0.10.1, each instance may expose its endpoint information metadata (hostname and port, collectively known as the application.server config parameter) to other instances of the same application. The new Interactive Query APIs allow a developer to obtain the metadata for a given store name and key, and do that across the instances of an application. Hence, you can discover where the store is that holds a particular key by examining the metadata. So now we know which store on which application instance holds our key but how do we actually query that (potentially remote) store? It sounds like we need some sort of RPC mechanism.
  25. Out-of-the box, the Kafka Streams library includes all the low-level APIs to query state stores and discover state stores across the running instances of an application. The application can then implement its own query patterns on the data and route the requests among instances as needed. Apps often have intricate processing logic and might need non-trivial routing of requests among Kafka Streams instances. For example, an app might want to locate the instance that holds a particular customer ID, and then route a call to rank that customers stock to that particular instance. The business logic on that instance would sort the available data and return the result to the app. In more complex instances, we could havescatter-gatherquery patterns where a call to an instance results in N calls from that instance to other instances and N results being collated and returned to the original caller. It is clear from these examples that there is no one API for distributed querying. Furthermore, there is no single best transport layer for the RPCs either(some users favor or must use REST within their company, others might opt for Thrift, etc.) RPCs are needed for inter-instance communication(i.e., within the same app). Otherwise instance 3 of the app cant actually retrieve state information from instance 1. They are also needed for inter-apps communication, where other applications query the original applications state.