This repository contains the components for a simple streaming pipeline:
- Generate data and write it to Apache Kafka
- Process the generated data from Kafka using Apache Flink
- Write the results back to Kafka for further processing
- Analyze the results from Kafka using Ipython Notebook
Questions and feedback can be sent to gyfora@apache.org
In this very simple example we will analyse temperature data, generated for European cities in a streaming fashion.
Data is generated as simple Strings in the format of: "City, Temperature"
"Budapest, 30", "Stockholm, 20", "Budapest, 32" …
Our goal is to analyse the incoming data in a continuous fashion, updating our statistics as new data becomes available.
We generate random temperatures using a simple Scala program, which writes it directly to Kafka, making it available for processing.
We then use a Flink program to do the following processing steps:
-
Parse the incoming String into the Scala case class
Temp(city: String, temp: Double)
:"Budapest, 30" -> ("Budapest", 30) "Stockholm, 20" -> ("Stockholm", 20)
Note: An important consideration we need to make when implementing the parsing step is that it should be robust to errors coming from incorrect input formats.
-
We compute a historical average of the temperatures for each city:
("Budapest", 30) -> Avg: (“Budapest", 30) ("Budapest", 40) -> Avg: (“Budapest", 35) (“Stockholm”, 20) -> Avg: (“Stockholm”, 20) ("Budapest", 37) -> Avg: (“Budapest", 35.67) (“Stockholm”, 22) -> Avg: (“Stockholm”, 21)
Note: There is distinctive property of computing a historical average compared to the parsing operator. Here we need to keep some computational state for each city (sum and count) so we are able to update the average when the next element arrives.
You can read more about the interesting topic of operator states and stateful stream processing in this wiki article.
-
We compute the current global maximum temperature every 5 seconds:
("Budapest", 32) ("Madrid", 34) -> Max: (“Madrid", 34) ("Stockholm", 20) ----------------------------------------- ("Budapest", 36) ("Madrid", 33) -> Max: (“Budapest", 36) ("Stockholm", 23)
Note: This is a typical example of window computations, when the data stream is discretised into small batches (windows) and some sort of aggregation or transformation is applied independently on each window.
-
The computed statistics are written back to Kafka and can be further analysed. In this example we use IPython notebook to provide basic visualisation.
Prerequisites: JDK 7+, Maven 3.x, Scala 2.10, Python 2.7, IPython notebook
-
Download & install Apache Kafka
-
Install the Python kafka client
pip install kafka-python
-
Clone the repository and build a jar
git clone https://github.com/gyfora/summer-school.git cd summer-school/flink mvn assembly:assembly
-
Start the Kafka server and create the topics
cd <KAFKA-DIR> # Start Zookeper server bin/zookeeper-server-start.sh config/zookeeper.properties # Start Kafka server bin/kafka-server-start.sh config/server.properties # Create input and output topics bin/kafka-topics.sh --create --topic input --partitions 1 --replication-factor 1 --zookeeper localhost:2181 bin/kafka-topics.sh --create --topic output_avg --partitions 1 --replication-factor 1 --zookeeper localhost:2181 bin/kafka-topics.sh --create --topic output_max --partitions 1 --replication-factor 1 --zookeeper localhost:2181
-
Start Flink streaming job
# Running on a Flink mini cluster (equivalent to executing from the IDE) cd <git>/summer-school/flink java -classpath target/FlinkExample.jar summerschool.FlinkKafkaExample
To execute the job on an already running cluster (cluster setup guide) we can either use the Flink command-line or web-client:
# Start a local Flink cluster cd <FLINK-DIR> bin/start-cluster-streaming.sh # Run the example job using the command-line client bin/flink run -c summerschool.FlinkKafkaExample <summer-school-dir>/flink/target/FlinkExample.jar
-
Start Data generator
cd <git>/summer-school/flink java -classpath target/FlinkExample.jar summerschool.DataGenerator
-
Start IPython Notebook to further analyse the results
cd <git>/summer-school/python ipython notebook