初涉Flink领域,你是否觉得安装、设置、编写代码的过程让人头疼?许多人遇到这些入门难题,但实际上,只要掌握了正确的方法,就能轻松入门。
Flink基础回顾
aliMaven aliyun Maven http://Maven.aliyun.com/nexus/content/groups/public/ central
Flink有其独特的基本原理、架构和众多组成部分。这些构成了掌握Flink的关键。比如,掌握其架构有助于理解数据的流动和处理过程。有位朋友在学习Flink时,跳过基础原理直接尝试案例,结果遇到了不少难题。只有对基础知识有深入理解,后续操作才会更加得心应手。在真实的开发环境中,许多团队都要求开发者先掌握Flink的基础知识。
在大数据领域,Flink的相关知识同样至关重要。众多大数据企业在招聘过程中,往往会涉及这一话题。熟练掌握Flink,有助于求职者在竞争激烈的市场中脱颖而出。
org.apache.flink flink-java 1.6.1 provided org.apache.flink flink-streaming-java_2.11 1.6.1 provided
选择开发语言
org.apache.flink flink-scala_2.11 1.6.1 provided org.apache.flink flink-streaming-scala_2.11 1.6.1 provided
在编写Flink程序时,可以选择Java或Scala。Scala在实现函数式编程时显得更为精炼。我的一位同事起初用Java编写Flink程序,代码冗长,维护起来颇为不易。后来他转而使用Scala,程序状况得到了显著提升。尽管如此,Java也有其独特优势,比如众多开发者对Java更为熟悉。开发语言的选择受到多种因素影响,包括开发环境和个人偏好。在有些项目里,团队成员普遍擅长Java,那么选用Java来开发Flink程序也是合情合理的。
配置国内镜像与依赖管理
使用阿里云的Maven仓库镜像,需要调整.conf/.xml文件。在Maven中管理依赖项,必须先完成配置。我遇到一个项目,由于Maven仓库镜像配置不当,依赖项下载变得极其缓慢。同样,在Maven项目的pom.xml文件中也需要进行配置。不同版本需要去Maven仓库寻找相应的配置。例如,之前的一个小项目使用的是旧版本,配置过程颇为复杂。
不同语言的配置差异
package xuwei.tech.streaming;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.utils.ParameterTool;
import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.Flink.runtime.state.filesystem.FsStateBackend;
import org.apache.Flink.runtime.state.memory.MemoryStateBackend;
import org.apache.Flink.streaming.api.DataStream.DataStream;
import org.apache.Flink.streaming.api.DataStream.DataStreamSource;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.windowing.time.Time;
import org.apache.Flink.util.Collector;
/**
* 单词计数之滑动窗口计算
*
* Created by xuwei.tech
*/
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception{
//获取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9000--Java");
port = 9000;
}
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String hostname = "hadoop100";
String delimiter = "n";
//连接Socket获取输入的数据
DataStreamSource text = env.socketTextStream(hostname, port, delimiter);
// a a c
// a 1
// a 1
// c 1
DataStream windowCounts = text.flatMap(new FlatMapFunction
() {
public void flatMap(String value, Collector out) throws
Exception {
String[] splits = value.split("\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2s,指定时间间隔为1s
.sum("count");//在这里使用sum或者reduce都可以
/*.reduce(new ReduceFunction() {
public WordWithCount reduce(WordWithCount a,
WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
})*/
//把数据打印到控制台并且设置并行度
windowCounts.print().setParallelism(1);
//这一行代码一定要实现,否则程序不执行
env.execute("Socket window count");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word,long count){
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + ''' +
", count=" + count +
'}';
}
}
}
在Java编程中需要加入特定的Java配置,Scala编程同样需要相应的配置。我之前曾将这两种语言的配置搞混。一旦配置错误,程序便可能无法正常运作。此外,不同版本的配置要求各异,有时为了与旧版本兼容,还需付出额外努力。
在开发工具中运行和打包的要点
package xuwei.tech.streaming
import org.apache.Flink.api.Java.utils.ParameterTool
import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment
import org.apache.Flink.streaming.api.windowing.time.Time
/**
* 单词计数之滑动窗口计算
*
* Created by xuwei.tech
*/
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
//获取Socket端口号
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
}catch {
case e: Exception => {
System.err.println("No port set. use default port 9000--Scala")
}
9000
}
//获取运行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//连接Socket获取输入数据
val text = env.socketTextStream("hadoop100",port,'n')
//解析数据(把数据打平),分组,窗口计算,并且聚合求sum
//注意:必须要添加这一行隐式转行,否则下面的FlatMap方法执行会报错
import org.apache.Flink.api.Scala._
val windowCounts = text.flatMap(line => line.split("\s"))//打平,把每一行单词都切开
.map(w => WordWithCount(w,1))//把单词转成word , 1这种形式
.keyBy("word")//分组
.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间
.sum("count");// sum或者reduce都可以
//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))
//打印到控制台
windowCounts.print().setParallelism(1);
//执行任务
env.execute("Socket window count");
}
case class WordWithCount(word: String,count: Long)
}
[root@hadoop100 soft]# nc -l 9000 a b a
使用IDEA等开发环境执行代码时,记得将依赖配置中的scope属性注释掉。而在制作JAR包时,则需确保这一属性被启用。举例来说,若之前某个项目在打包时遗漏了这一步骤,结果生成的JAR包体积庞大,还附带了许多不必要的依赖。Flink具备延迟计算的特性,只有调用特定方法才会启动执行。在调试过程中,我为此特性耗费了不少精力,不过它也确实简化了复杂程序的编写。
案例需求分析与数据处理
WordWithCount{word='a', count=1}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=1}
手工生成单词,Flink实时获取数据,并在特定窗口内进行汇总计算。需要为多种语言添加Maven依赖。之前曾遇到过类似需求,仅配置依赖就耗费了大量时间。按照要求完成依赖添加后,在IDEA中执行代码即可获得结果。此外,之后还可以尝试使用Flink的Batch离线批处理功能。
package xuwei.tech.batch;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.DataSet;
import org.apache.Flink.api.Java.ExecutionEnvironment;
import org.apache.Flink.api.Java.operators.DataSource;
import org.apache.Flink.api.Java.tuple.Tuple2;
import org.apache.Flink.util.Collector;
/**
*单词计数之离线计算
*
* Created by xuwei.tech
*/
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
String inputPath = "D:\data\file";
String outPath = "D:\data\result";
//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取文件中的内容
DataSource text = env.readTextFile(inputPath);
DataSet<Tuple2> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath,"n"," ").setParallelism(1);
env.execute("batch word count");
}
public static class Tokenizer implements FlatMapFunction<String,Tuple2>{
public void flatMap(String value, Collector<Tuple2> out)
throws Exception {
String[] tokens = value.toLowerCase().split("\W+");
for (String token: tokens) {
if(token.length()>0){
out.collect(new Tuple2(token,1));
}
}
}
}
}
最后我想请教各位,在学习Flink的过程中,大家觉得哪一部分最让人感到棘手?期待大家的点赞和转发。
package xuwei.tech.batch
import org.apache.Flink.api.Scala.ExecutionEnvironment
/**
* 单词计数之离线计算
* Created by xuwei.tech
*/
object BatchWordCountScala {
def main(args: Array[String]): Unit = {
val inputPath = "D:\data\file"
val outPut = "D:\data\result"
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(inputPath)
//引入隐式转换
import org.apache.Flink.api.Scala._
val counts = text.flatMap(_.toLowerCase.split("\W+"))
.filter(_.nonEmpty)
.map((_,1))
.groupBy(0)
.sum(1)
counts.writeAsCsv(outPut,"n"," ").setParallelism(1)
env.execute("batch word count")
}
}

