From 94b1abc7b7a83db9d63f4aae39340f179780e018 Mon Sep 17 00:00:00 2001
From: tiptok <785409885@qq.com>
Date: Thu, 28 Apr 2022 16:49:42 +0800
Subject: [PATCH] feat: add flink
---
.gitignore | 1 +
Flink/pom.xml | 88 +++++++++++++++++++
Flink/src/main/java/org/example/App.java | 23 +++++
.../main/java/org/example/BatchWordCount.java | 33 +++++++
.../org/example/SocketStreamWordCount.java | 43 +++++++++
.../java/org/example/StreamWordCount.java | 37 ++++++++
.../src/main/java/org/example/word-count.txt | 4 +
Flink/src/test/java/org/example/AppTest.java | 20 +++++
.../lambda/LambdaComparatorDemo.java | 1 -
9 files changed, 249 insertions(+), 1 deletion(-)
create mode 100644 Flink/pom.xml
create mode 100644 Flink/src/main/java/org/example/App.java
create mode 100644 Flink/src/main/java/org/example/BatchWordCount.java
create mode 100644 Flink/src/main/java/org/example/SocketStreamWordCount.java
create mode 100644 Flink/src/main/java/org/example/StreamWordCount.java
create mode 100644 Flink/src/main/java/org/example/word-count.txt
create mode 100644 Flink/src/test/java/org/example/AppTest.java
diff --git a/.gitignore b/.gitignore
index 3540bb2..dfa28af 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,6 +13,7 @@ Spring-Security/springsecurity.iml
rocketmqdemo/rocketmqdemo.iml
# target
+Flink/target
JdkLearn/target
Spring/target
Spring-AOP/target
diff --git a/Flink/pom.xml b/Flink/pom.xml
new file mode 100644
index 0000000..a450dbc
--- /dev/null
+++ b/Flink/pom.xml
@@ -0,0 +1,88 @@
+
+
+
+ 4.0.0
+
+ org.example
+ Flink
+ 1.0-SNAPSHOT
+
+ Flink
+
+ http://www.example.com
+
+
+ UTF-8
+ 11
+ 11
+ 11
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.28
+
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+
+ org.apache.flink
+ flink-clients_2.12
+ 1.13.6
+
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 3.1.0
+
+
+
+ maven-resources-plugin
+ 3.0.2
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+
+ maven-surefire-plugin
+ 2.22.1
+
+
+ maven-jar-plugin
+ 3.0.2
+
+
+ maven-install-plugin
+ 2.5.2
+
+
+ maven-deploy-plugin
+ 2.8.2
+
+
+
+ maven-site-plugin
+ 3.7.1
+
+
+ maven-project-info-reports-plugin
+ 3.0.0
+
+
+
+
+
diff --git a/Flink/src/main/java/org/example/App.java b/Flink/src/main/java/org/example/App.java
new file mode 100644
index 0000000..ed8c8b3
--- /dev/null
+++ b/Flink/src/main/java/org/example/App.java
@@ -0,0 +1,23 @@
+package org.example;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Collections;
+
+/**
+ * Hello world!
+ *
+ */
+public class App
+{
+ public static void main( String[] args ) throws Exception {
+ //var logger = LoggerFactory.getLogger(App.class);
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSource dataSource = env.fromCollection(Collections.singletonList("Hello Word"));
+ dataSource.print();
+ System.out.println( "Hello World!" );
+ //logger.info("hello flink");
+ }
+}
diff --git a/Flink/src/main/java/org/example/BatchWordCount.java b/Flink/src/main/java/org/example/BatchWordCount.java
new file mode 100644
index 0000000..61cad45
--- /dev/null
+++ b/Flink/src/main/java/org/example/BatchWordCount.java
@@ -0,0 +1,33 @@
+package org.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.flink.util.Collector;
+
+// 离线批处理
+public class BatchWordCount {
+ public static void main(String[] args) throws Exception{
+ ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
+ final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt";
+ DataSource dataSource = environment.readTextFile(fileName);
+ dataSource
+ .flatMap(new FlatMapFunction>() {
+ public void flatMap(String s, Collector> collector) throws Exception{
+ //对读取到的每一行数据按照空格分割
+ String[] split = s.split(" ");
+ //将每个单词放入collector中作为输出,格式类似于{word:1}
+ for (String word : split) {
+ collector.collect(new Tuple2(word, 1));
+ }
+ }
+ })
+ .groupBy(0)
+ .sum(1)
+ .print();
+ //environment.execute();
+ }
+}
diff --git a/Flink/src/main/java/org/example/SocketStreamWordCount.java b/Flink/src/main/java/org/example/SocketStreamWordCount.java
new file mode 100644
index 0000000..ddc73fb
--- /dev/null
+++ b/Flink/src/main/java/org/example/SocketStreamWordCount.java
@@ -0,0 +1,43 @@
+package org.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+public class SocketStreamWordCount {
+ public static void main(String[] args) throws Exception {
+ //获取Flink批处理执行环境
+ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final String host = "localhost";
+ final int port = 8000;
+ //从socket中获取数据源
+ DataStreamSource source = environment.socketTextStream(host, port);
+ //单词计数
+ source
+ //将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1
+ .flatMap(new FlatMapFunction>() {
+ public void flatMap(String s, Collector> collector) throws Exception {
+ //对读取到的每一行数据按照空格分割
+ String[] split = s.split(" ");
+ //将每个单词放入collector中作为输出,格式类似于{word:1}
+ for (String word : split) {
+ collector.collect(new Tuple2(word, 1));
+ }
+ }
+ })
+ //聚合算子,按照第一个字段(即word字段)进行分组
+ .keyBy(v -> v.f0)
+ //聚合算子,对每一个分租内的数据按照第二个字段进行求和
+ .sum(1)
+ .print();
+
+ environment.execute();
+ }
+}
+
diff --git a/Flink/src/main/java/org/example/StreamWordCount.java b/Flink/src/main/java/org/example/StreamWordCount.java
new file mode 100644
index 0000000..c502f6c
--- /dev/null
+++ b/Flink/src/main/java/org/example/StreamWordCount.java
@@ -0,0 +1,37 @@
+package org.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+public class StreamWordCount {
+ public static void main(String[] args)throws Exception{
+ StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ //从文件中获取数据源
+ final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt";
+ DataStreamSource source = environment.readTextFile(fileName);
+ //单词计数
+ source
+ //将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1
+ .flatMap(new FlatMapFunction>() {
+ public void flatMap(String s, Collector> collector) throws Exception {
+ //对读取到的每一行数据按照空格分割
+ String[] split = s.split(" ");
+ //将每个单词放入collector中作为输出,格式类似于{word:1}
+ for (String word : split) {
+ collector.collect(new Tuple2(word, 1));
+ }
+ }
+ })
+ //聚合算子,按照第一个字段(即word字段)进行分组
+ .keyBy(v -> v.f0)
+ //聚合算子,对每一个分租内的数据按照第二个字段进行求和
+ .sum(1)
+ .print();
+
+ environment.execute();
+ }
+}
diff --git a/Flink/src/main/java/org/example/word-count.txt b/Flink/src/main/java/org/example/word-count.txt
new file mode 100644
index 0000000..8cba333
--- /dev/null
+++ b/Flink/src/main/java/org/example/word-count.txt
@@ -0,0 +1,4 @@
+hello word
+hello flink
+hello java
+java is best
\ No newline at end of file
diff --git a/Flink/src/test/java/org/example/AppTest.java b/Flink/src/test/java/org/example/AppTest.java
new file mode 100644
index 0000000..6a1d2d7
--- /dev/null
+++ b/Flink/src/test/java/org/example/AppTest.java
@@ -0,0 +1,20 @@
+package org.example;
+
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest
+{
+ /**
+ * Rigorous Test :-)
+ */
+ @Test
+ public void shouldAnswerWithTrue()
+ {
+ assertTrue( true );
+ }
+}
diff --git a/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java b/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java
index 1db2858..018cc33 100644
--- a/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java
+++ b/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java
@@ -2,7 +2,6 @@
import lombok.AllArgsConstructor;
import lombok.Data;
-import sun.java2d.pipe.SpanShapeRenderer;
import java.text.SimpleDateFormat;
import java.util.Arrays;