99 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			99 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
|   | --- | ||
|  | title: Flink Batch Example JAVA | ||
|  | --- | ||
|  | ## Flink Batch Example JAVA
 | ||
|  | 
 | ||
|  | Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. | ||
|  | 
 | ||
|  | ### Prerequisites
 | ||
|  | 
 | ||
|  | * Unix-like environment (Linux, Mac OS X, Cygwin) | ||
|  | * git | ||
|  | * Maven (we recommend version 3.0.4) | ||
|  | * Java 7 or 8 | ||
|  | * IntelliJ IDEA or Eclipse IDE | ||
|  | 
 | ||
|  | ``` | ||
|  | git clone https://github.com/apache/flink.git | ||
|  | cd flink | ||
|  | mvn clean package -DskipTests # this will take up to 10 minutes | ||
|  | ``` | ||
|  | 
 | ||
|  | ### Datasets
 | ||
|  | 
 | ||
|  | For the batch processing data we'll be using the datasets in here: [datasets](http://files.grouplens.org/datasets/movielens/ml-latest-small.zip) | ||
|  | In this example we'll be using the movies.csv and the ratings.csv, create a new java project and put them in a folder in the application base. | ||
|  | 
 | ||
|  | ### Example
 | ||
|  | 
 | ||
|  | We're going to make an execution where we retrieve the average rating by movie genre of the entire dataset we have.  | ||
|  | 
 | ||
|  | **Environment and datasets** | ||
|  | 
 | ||
|  | First create a new Java file, I'm going to name it AverageRating.java | ||
|  | 
 | ||
|  | The first thing we'll do is to create the execution environment and load the csv files in a dataset. Like this:  | ||
|  | 
 | ||
|  | ``` | ||
|  | ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | ||
|  | DataSet<Tuple3<Long, String, String>> movies = env.readCsvFile("ml-latest-small/movies.csv") | ||
|  |   .ignoreFirstLine() | ||
|  |   .parseQuotedStrings('"') | ||
|  |   .ignoreInvalidLines() | ||
|  |   .types(Long.class, String.class, String.class); | ||
|  | 
 | ||
|  | DataSet<Tuple2<Long, Double>> ratings = env.readCsvFile("ml-latest-small/ratings.csv") | ||
|  |   .ignoreFirstLine() | ||
|  |   .includeFields(false, true, true, false) | ||
|  |   .types(Long.class, Double.class); | ||
|  | ``` | ||
|  | 
 | ||
|  | There, we are making a dataset with a <Long, String, String> for the movies, ignoring errors, quotes and the header line, and a dataset with <Long, Double> for the movie ratings, also ignoring the header, invalid lines and quotes. | ||
|  | 
 | ||
|  | **Flink Processing** | ||
|  | 
 | ||
|  | Here we will process the dataset with flink. The result will be in a List of String, Double tuples. where the genre will be in the String and the average rating will be in the double. | ||
|  | 
 | ||
|  | First we'll join the ratings dataset with the movies dataset by the moviesId present in each dataset.  | ||
|  | With this we'll create a new Tuple  with the movie name, genre and score.  | ||
|  | Later, we group this tuple by genre and add the score of all equal genres, finally we divide the score by the total results and we have our desired result.  | ||
|  | 
 | ||
|  | ``` | ||
|  | List<Tuple2<String, Double>> distribution = movies.join(ratings) | ||
|  |   .where(0) | ||
|  |   .equalTo(0) | ||
|  |   .with(new JoinFunction<Tuple3<Long, String, String>,Tuple2<Long, Double>, Tuple3<StringValue, StringValue, DoubleValue>>() { | ||
|  |     private StringValue name = new StringValue(); | ||
|  |     private StringValue genre = new StringValue(); | ||
|  |     private DoubleValue score = new DoubleValue(); | ||
|  |     private Tuple3<StringValue, StringValue, DoubleValue> result = new Tuple3<>(name,genre,score); | ||
|  | 
 | ||
|  |     @Override | ||
|  |     public Tuple3<StringValue, StringValue, DoubleValue> join(Tuple3<Long, String, String> movie,Tuple2<Long, Double> rating) throws Exception { | ||
|  |       name.setValue(movie.f1); | ||
|  |       genre.setValue(movie.f2.split("\\|")[0]); | ||
|  |       score.setValue(rating.f1); | ||
|  |       return result; | ||
|  |     } | ||
|  | }) | ||
|  |   .groupBy(1) | ||
|  |   .reduceGroup(new GroupReduceFunction<Tuple3<StringValue,StringValue,DoubleValue>, Tuple2<String, Double>>() { | ||
|  |     @Override | ||
|  |     public void reduce(Iterable<Tuple3<StringValue,StringValue,DoubleValue>> iterable, Collector<Tuple2<String, Double>> collector) throws Exception { | ||
|  |       StringValue genre = null; | ||
|  |       int count = 0; | ||
|  |       double totalScore = 0; | ||
|  |       for(Tuple3<StringValue,StringValue,DoubleValue> movie: iterable){ | ||
|  |         genre = movie.f1; | ||
|  |         totalScore += movie.f2.getValue(); | ||
|  |         count++; | ||
|  |       } | ||
|  | 
 | ||
|  |       collector.collect(new Tuple2<>(genre.getValue(), totalScore/count)); | ||
|  |     } | ||
|  | }) | ||
|  |   .collect(); | ||
|  | ``` | ||
|  | 
 | ||
|  | With this you'll have a working batch processing flink application. Enjoy!. |