联系客服
客服二维码

联系客服获取更多资料

微信号:LingLab1

客服电话:010-82185409

意见反馈
关注我们
关注公众号

关注公众号

linglab语言实验室

回到顶部
flink批处理从0到1

349 阅读 2020-08-14 09:22:02 上传

以下文章来源于 汉语教学技术


一、DataSet API之Data Sources(消费者之数据源)

介绍:

flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现sourceFunction接口来自定义无并行度的source, 或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。

类型:
基于文件

readTextFile(path) 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。

基于集合

fromCollection(Collection) 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。

代码实现:
1、fromCollection
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamingFromCollectionScala {

 def main(args: Array[String]): Unit = {

   val env = StreamExecutionEnvironment.getExecutionEnvironment

//隐式转换
import org.apache.flink.api.scala._

   val data = List(10,15,20)

   val text = env.fromCollection(data)

//针对map接收到的数据执行加1的操作
   val num = text.map(_+1)

   num.print().setParallelism(1)

   env.execute("StreamingFromCollectionScala")

 }

}
package xuwei.tech.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
*/
public class BatchWordCountJava {

   public static void main(String[] args) throws Exception{
        val data = List(10,15,20)
       String outPath = "D:\\data\\result";
//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       //获取文件中的内容
       val text = env.fromCollection(data)
       DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
       counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
       env.execute("batch word count");

   }


public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
       public void flatMap(String value, Collector> out) throws Exception {
           String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens) {
               if(token.length()>0){
                   out.collect(new Tuple2(token,1));
               }
           }
       }
   }
}

,>
,>
,>
2、readTextFile
package xuwei.tech.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchWordCountJava {

   public static void main(String[] args) throws Exception{
       String inputPath = "D:\\data\\file";
String outPath = "D:\\data\\result";

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       //获取文件中的内容
       DataSource<String> text = env.readTextFile(inputPath);

       DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
       counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
       env.execute("batch word count");

   }


public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
       public void flatMap(String value, Collector> out) throws Exception {
           String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens) {
               if(token.length()>0){
                   out.collect(new Tuple2(token,1));
               }
           }
       }
   }
}
,>
,>
,>

二、DataSet API之Transformations

介绍:

  1. Map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

  2. FlatMap:输入一个元素,可以返回零个,一个或者多个元素

  3. MapPartition:类似map,一次处理一个分区的数据【如果在进行map处理的时候需要获取第三方资源链接,建议使用MapPartition】

  4. Filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

  5. Reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

  6. Aggregate:sum、max、min等

  7. Distinct:返回一个数据集中去重之后的元素,data.distinct()

  8. Join:内连接

  9. OuterJoin:外链接

  10. Cross:获取两个数据集的笛卡尔积

  11. Union:返回两个数据集的总和,数据类型需要一致

  12. First-n:获取集合中的前N个元素

  13. Sort Partition:在本地对数据集的所有分区进行排序,通过sortPartition()的链接调用来完成对多个字段的排序

代码实现:
1、broadcast(广播变量)
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* broadcast广播变量
*
*
*
* 需求:
* flink会从数据源中获取到用户的姓名
*
* 最终需要把用户的姓名和年龄信息打印出来
*
* 分析:
* 所以就需要在中间的map处理的时候获取用户的年龄信息
*
* 建议吧用户的关系数据集使用广播变量进行处理
*
*
*
*
* 注意:如果多个算子需要使用同一份数据集,那么需要在对应的多个算子后面分别注册广播变量
*/
public class BatchDemoBroadcast {

   public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       //1:准备需要广播的数据
       ArrayListString, Integer>> broadData = new ArrayList<>();
       broadData.add(new Tuple2<>("zs",18));
       broadData.add(new Tuple2<>("ls",20));
       broadData.add(new Tuple2<>("ww",17));
       DataSetString, Integer>> tupleData = env.fromCollection(broadData);


//1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄
       DataSet<HashMap> toBroadcast = tupleData.map(new MapFunction, HashMap>() {
@Override
           public HashMapmap(Tuple2value) throws Exception {
               HashMapres = new HashMap<>();
               res.put(value.f0, value.f1);
               return res;
           }
       });

//源数据
       DataSource<String> data = env.fromElements("zs", "ls", "ww");

//注意:在这里需要使用到RichMapFunction获取广播变量
       DataSet<String> result = data.map(new RichMapFunction() {

           List> broadCastMap = new ArrayList>();
           HashMapallMap = new HashMap();

/**
            * 这个方法只会执行一次
            * 可以在这里实现一些初始化的功能
            *
            * 所以,就可以在open方法中获取广播变量数据
            *
            */
@Override
public void open(Configuration parameters) throws Exception {
               super.open(parameters);
//3:获取广播数据
this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
for (HashMap map : broadCastMap) {
                   allMap.putAll(map);
               }

           }

@Override
           public String map(String value) throws Exception {
               Integer age = allMap.get(value);
               return value + "," + age;
           }
       }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作
       result.print();
   }

}

,>
,>
,>
,>,>
,>,>,>,>,>,>
2、IntCounter(累加器)
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* 全局累加器
*
* counter 计数器
*
* 需求:
* 计算map函数中处理了多少数据
*
*
* 注意:只有在任务执行结束后,才能获取到累加器的值
*
*
*
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoCounter {

   public static void main(String[] args) throws Exception{

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSource<String> data = env.fromElements("a", "b", "c", "d");

       DataSet<String> result = data.map(new RichMapFunction<String, String>() {

//1:创建累加器
private IntCounter numLines = new IntCounter();

           @Override
           public void open(Configuration parameters) throws Exception {
               super.open(parameters);
//2:注册累加器
               getRuntimeContext().addAccumulator("num-lines",this.numLines);

           }

//int sum = 0;
           @Override
public String map(String value) throws Exception {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
               //sum++;
               //System.out.println("sum:"+sum);
this.numLines.add(1);
               return value;
           }
       }).setParallelism(8);

//result.print();

       result.writeAsText("d:\\data\\count10");

       JobExecutionResult jobResult = env.execute("counter");
//3:获取累加器
int num = jobResult.getAccumulatorResult("num-lines");
       System.out.println("num:"+num);

   }



}


3、cross(获取笛卡尔积)
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
* 获取笛卡尔积
*
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoCross {

   public static void main(String[] args) throws Exception{

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       //tuple2<用户id,用户姓名>
ArrayList<String> data1 = new ArrayList<>();
       data1.add("zs");
       data1.add("ww");

//tuple2<用户id,用户所在城市>
ArrayList<Integer> data2 = new ArrayList<>();
       data2.add(1);
       data2.add(2);

DataSource<String> text1 = env.fromCollection(data1);
DataSource<Integer> text2 = env.fromCollection(data2);

CrossOperator.DefaultCross<String, Integer> cross = text1.cross(text2);

       cross.print();


   }



}



4、registerCachedFile(Distributed Cache)
package xuwei.tech.batch.batchAPI;

import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* Distributed Cache
*/
public class BatchDemoDisCache {

   public static void main(String[] args) throws Exception{

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       //1:注册一个文件,可以使用hdfs或者s3上的文件
       env.registerCachedFile("d:\\data\\file\\a.txt","a.txt");

       DataSource<String> data = env.fromElements("a", "b", "c", "d");

       DataSet<String> result = data.map(new RichMapFunction<String, String>() {
           private ArrayList<String> dataList = new ArrayList<String>();

           @Override
           public void open(Configuration parameters) throws Exception {
               super.open(parameters);
//2:使用文件
File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt");
               List<String> lines = FileUtils.readLines(myFile);
               for (String line : lines) {
                   this.dataList.add(line);
System.out.println("line:" + line);
               }
           }

           @Override
           public String map(String value) throws Exception {
//在这里就可以使用dataList
return value;
           }
       });

       result.print();

   }

}

5、distinct
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;
public class BatchDemoDistinct {

   public static void main(String[] args) throws Exception{

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<>();
       data.add("hello you");
       data.add("hello me");

       DataSource<String> text = env.fromCollection(data);

FlatMapOperator<String, String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {
           @Override
           public void flatMap(String value, Collector<String> out) throws Exception {
String[] split = value.toLowerCase().split("\\W+");
for (String word : split) {
System.out.println("单词:"+word);
                   out.collect(word);
               }
           }
       });

       flatMapData.distinct()// 对数据进行整体去重
               .print();


   }



}



6、排序(first)
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
* 获取集合中的前N个元素
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoFirstN {

   public static void main(String[] args) throws Exception{

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
       data.add(new Tuple2<>(2,"zs"));
       data.add(new Tuple2<>(4,"ls"));
       data.add(new Tuple2<>(3,"ww"));
       data.add(new Tuple2<>(1,"xw"));
       data.add(new Tuple2<>(1,"aw"));
       data.add(new Tuple2<>(1,"mw"));


       DataSourceString>> text = env.fromCollection(data);


//获取前3条数据,按照数据插入的顺序
text.first(3).print();
System.out.println("==============================");

//根据数据中的第一列进行分组,获取每组的前2个元素
text.groupBy(0).first(2).print();
System.out.println("==============================");

//根据数据中的第一列分组,再根据第二列进行组内排序[升序],获取每组的前2个元素
text.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();
System.out.println("==============================");

//不分组,全局排序获取集合中的前3个元素,针对第一个元素升序,第二个元素倒序
text.sortPartition(0,Order.ASCENDING).sortPartition(1,Order.DESCENDING).first(3).print();

   }



}

="">
7、join
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
public class BatchDemoJoin {

   public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       //tuple2<用户id,用户姓名>
       ArrayListString>> data1 = new ArrayList<>();
       data1.add(new Tuple2<>(1,"zs"));
       data1.add(new Tuple2<>(2,"ls"));
       data1.add(new Tuple2<>(3,"ww"));


//tuple2<用户id,用户所在城市>
       ArrayListString>> data2 = new ArrayList<>();
       data2.add(new Tuple2<>(1,"beijing"));
       data2.add(new Tuple2<>(2,"shanghai"));
       data2.add(new Tuple2<>(3,"guangzhou"));


       DataSourceString>> text1 = env.fromCollection(data1);
       DataSource> text2 = env.fromCollection(data2);


       text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
                       .equalTo(0)//指定第二个数据集中需要进行比较的元素角标
                       .with(new JoinFunction, Tuple2, Tuple3>() {
@Override
                           public Tuple3join(Tuple2first, Tuple2second)
                                   throws Exception {
                               return new Tuple3<>(first.f0,first.f1,second.f1);
                           }
                       }).print();

       System.out.println("==================================");

//注意,这里用map和上面使用的with最终效果是一致的。
       /*text1.join(text2).where(0)//指定第一个数据集中需要进行比较的元素角标
               .equalTo(0)//指定第二个数据集中需要进行比较的元素角标
               .map(new MapFunction,Tuple2>, Tuple3>() {
                   @Override
                   public Tuple3map(Tuple2, Tuple2> value) throws Exception {
                       return new Tuple3<>(value.f0.f0,value.f0.f1,value.f1.f1);
                   }
               }).print();*/
   }
}
,>
,>
,>
,string,string>
,string>,string>
,>,>,>,string,string>,string>,string>
,>
="">
="">
="">
8、outerJoin
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
* 外连接
*
* 左外连接
* 右外连接
* 全外连接
*/
public class BatchDemoOuterJoin {

   public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       //tuple2<用户id,用户姓名>
       ArrayListString>> data1 = new ArrayList<>();
       data1.add(new Tuple2<>(1,"zs"));
       data1.add(new Tuple2<>(2,"ls"));
       data1.add(new Tuple2<>(3,"ww"));


//tuple2<用户id,用户所在城市>
       ArrayListString>> data2 = new ArrayList<>();
       data2.add(new Tuple2<>(1,"beijing"));
       data2.add(new Tuple2<>(2,"shanghai"));
       data2.add(new Tuple2<>(4,"guangzhou"));


       DataSourceString>> text1 = env.fromCollection(data1);
       DataSource> text2 = env.fromCollection(data2);

/**
        * 左外连接
        *
        * 注意:second这个tuple中的元素可能为null
        *
        */
       text1.leftOuterJoin(text2)
               .where(0)
               .equalTo(0)
               .with(new JoinFunction, Tuple2, Tuple3>() {
@Override
                   public Tuple3join(Tuple2first, Tuple2second) throws Exception {
                       if(second==null){
                           return new Tuple3<>(first.f0,first.f1,"null");
                       }else{
                           return new Tuple3<>(first.f0,first.f1,second.f1);
                       }

                   }
               }).print();

       System.out.println("=============================================================================");

/**
        * 右外连接
        *
        * 注意:first这个tuple中的数据可能为null
        *
        */
       text1.rightOuterJoin(text2)
               .where(0)
               .equalTo(0)
               .with(new JoinFunction, Tuple2, Tuple3>() {
@Override
                   public Tuple3join(Tuple2first, Tuple2second) throws Exception {
                       if(first==null){
                           return new Tuple3<>(second.f0,"null",second.f1);
                       }
                       return new Tuple3<>(first.f0,first.f1,second.f1);
                   }
               }).print();



       System.out.println("=============================================================================");

/**
        * 全外连接
        *
        * 注意:first和second这两个tuple都有可能为null
        *
        */

       text1.fullOuterJoin(text2)
               .where(0)
               .equalTo(0)
               .with(new JoinFunction, Tuple2, Tuple3>() {
@Override
                   public Tuple3join(Tuple2first, Tuple2second) throws Exception {
                       if(first==null){
                           return new Tuple3<>(second.f0,"null",second.f1);
                       }else if(second == null){
                           return new Tuple3<>(first.f0,first.f1,"null");
                       }else{
                           return new Tuple3<>(first.f0,first.f1,second.f1);
                       }
                   }
               }).print();


   }

}


,>
,>
,>
,string,string>
,string>
,string>
,>,>,>,string,string>,string>,string>
,>,>,>,string,string>,string>,string>
,>
="">
="">="">
9、union
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.ArrayList;

/**
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoUnion {

public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       ArrayListString>> data1 = new ArrayList<>();
       data1.add(new Tuple2<>(1,"zs"));
       data1.add(new Tuple2<>(2,"ls"));
       data1.add(new Tuple2<>(3,"ww"));


       ArrayListString>> data2 = new ArrayList<>();
       data2.add(new Tuple2<>(1,"lili"));
       data2.add(new Tuple2<>(2,"jack"));
       data2.add(new Tuple2<>(3,"jessic"));


       DataSourceString>> text1 = env.fromCollection(data1);
       DataSource> text2 = env.fromCollection(data2);

       UnionOperator> union = text1.union(text2);

       union.print();



   }



}


,>,>
="">
="">
="">

三、DataStream API之partition

介绍:
  1. Rebalance:对数据集进行再平衡,重分区,消除数据倾斜

  2. Hash-Partition:根据指定key的哈希值对数据集进行分区

  3. partitionByHash()

  4. Range-Partition:根据指定的key对数据集进行范围分区

  5. .partitionByRange()

  6. Custom Partitioning:自定义分区规则

  7. 自定义分区需要实现Partitioner接口

  8. partitionCustom(partitioner, "someKey")

  9. 或者partitionCustom(partitioner, 0)

代码实现:
1、partitionByRange或partitionByHash
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;

/**
* Hash-Partition
*
* Range-Partition
*
*
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoHashRangePartition {

   public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       ArrayListInteger, String>> data = new ArrayList<>();
       data.add(new Tuple2<>(1,"hello1"));
       data.add(new Tuple2<>(2,"hello2"));
       data.add(new Tuple2<>(2,"hello3"));
       data.add(new Tuple2<>(3,"hello4"));
       data.add(new Tuple2<>(3,"hello5"));
       data.add(new Tuple2<>(3,"hello6"));
       data.add(new Tuple2<>(4,"hello7"));
       data.add(new Tuple2<>(4,"hello8"));
       data.add(new Tuple2<>(4,"hello9"));
       data.add(new Tuple2<>(4,"hello10"));
       data.add(new Tuple2<>(5,"hello11"));
       data.add(new Tuple2<>(5,"hello12"));
       data.add(new Tuple2<>(5,"hello13"));
       data.add(new Tuple2<>(5,"hello14"));
       data.add(new Tuple2<>(5,"hello15"));
       data.add(new Tuple2<>(6,"hello16"));
       data.add(new Tuple2<>(6,"hello17"));
       data.add(new Tuple2<>(6,"hello18"));
       data.add(new Tuple2<>(6,"hello19"));
       data.add(new Tuple2<>(6,"hello20"));
       data.add(new Tuple2<>(6,"hello21"));


       DataSourceInteger, String>> text = env.fromCollection(data);

/*text.partitionByHash(0).mapPartition(new MapPartitionFunction, Tuple2>() {
           @Override
           public void mapPartition(Iterable> values, Collector> out) throws Exception {
               Iterator> it = values.iterator();
               while (it.hasNext()){
                   Tuple2next = it.next();
                   System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
               }

           }
       }).print();*/


text.partitionByRange(0).mapPartition(new MapPartitionFunctionInteger,String>, Tuple2>() {
           @Override
public void mapPartition(Iterable> values, CollectorInteger, String>> out) throws Exception {
               IteratorInteger, String>> it = values.iterator();
               while (it.hasNext()){
                   Tuple2<Integer, String> next = it.next();
System.out.println("当前线程id:"+Thread.currentThread().getId()+","+next);
               }

           }
       }).print();
   }

}



,>
,string>
,>,>,>,>,string>,string>
2、mapPartition
package xuwei.tech.batch.batchAPI;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Iterator;

/**
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchDemoMapPartition {

   public static void main(String[] args) throws Exception{

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

       ArrayList<String> data = new ArrayList<>();
       data.add("hello you");
       data.add("hello me");

       DataSource<String> text = env.fromCollection(data);

/*text.map(new MapFunction() {
           @Override
           public String map(String value) throws Exception {
               //获取数据库连接--注意,此时是每过来一条数据就获取一次链接
               //处理数据
               //关闭连接
               return value;
           }
       });*/


       DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction() {
@Override
           public void mapPartition(Iterablevalues, Collectorout) throws Exception {
//获取数据库连接--注意,此时是一个分区的数据获取一次连接【优点,每个分区获取一次链接】
               //values中保存了一个分区的数据
               //处理数据
Iterator<String> it = values.iterator();
               while (it.hasNext()) {
                   String next = it.next();
                   String[] split = next.split("\\W+");
for (String word : split) {
                       out.collect(word);
                   }
               }
//关闭链接
           }
       });

       mapPartitionData.print();


   }



}


,>
,>

四、DataSet API之Data Sink(数据落地)

介绍:
  1. writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  2. writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法

  3. print():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

代码:
1、writeAsCsv
package xuwei.tech.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
* Created by xuwei.tech on 2018/10/8.
*/
public class BatchWordCountJava {

   public static void main(String[] args) throws Exception{
       String inputPath = "D:\\data\\file";
String outPath = "D:\\data\\result";

//获取运行环境
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       //获取文件中的内容
       DataSource<String> text = env.readTextFile(inputPath);

       DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
       counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
       env.execute("batch word count");

   }


public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
       public void flatMap(String value, Collector> out) throws Exception {
           String[] tokens = value.toLowerCase().split("\\W+");
for (String token: tokens) {
               if(token.length()>0){
                   out.collect(new Tuple2(token,1));
               }
           }
       }
   }
}
,>
,>
,>

 

致力于大数据,机器算法,人工智能学习,共享于有需要人士,希望共享内容对大家有用,欢迎大家转发关注。

flink流处理从0到1





点赞
收藏
表情
图片
附件