diff --git a/.gitignore b/.gitignore index 3540bb2..dfa28af 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ Spring-Security/springsecurity.iml rocketmqdemo/rocketmqdemo.iml # target +Flink/target JdkLearn/target Spring/target Spring-AOP/target diff --git a/Flink/pom.xml b/Flink/pom.xml new file mode 100644 index 0000000..a450dbc --- /dev/null +++ b/Flink/pom.xml @@ -0,0 +1,88 @@ + + + + 4.0.0 + + org.example + Flink + 1.0-SNAPSHOT + + Flink + + http://www.example.com + + + UTF-8 + 11 + 11 + 11 + + + + + org.slf4j + slf4j-api + 1.7.28 + + + + junit + junit + 4.11 + test + + + + org.apache.flink + flink-clients_2.12 + 1.13.6 + + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + diff --git a/Flink/src/main/java/org/example/App.java b/Flink/src/main/java/org/example/App.java new file mode 100644 index 0000000..ed8c8b3 --- /dev/null +++ b/Flink/src/main/java/org/example/App.java @@ -0,0 +1,23 @@ +package org.example; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Collections; + +/** + * Hello world! + * + */ +public class App +{ + public static void main( String[] args ) throws Exception { + //var logger = LoggerFactory.getLogger(App.class); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSource dataSource = env.fromCollection(Collections.singletonList("Hello Word")); + dataSource.print(); + System.out.println( "Hello World!" ); + //logger.info("hello flink"); + } +} diff --git a/Flink/src/main/java/org/example/BatchWordCount.java b/Flink/src/main/java/org/example/BatchWordCount.java new file mode 100644 index 0000000..61cad45 --- /dev/null +++ b/Flink/src/main/java/org/example/BatchWordCount.java @@ -0,0 +1,33 @@ +package org.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.flink.util.Collector; + +// 离线批处理 +public class BatchWordCount { + public static void main(String[] args) throws Exception{ + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt"; + DataSource dataSource = environment.readTextFile(fileName); + dataSource + .flatMap(new FlatMapFunction>() { + public void flatMap(String s, Collector> collector) throws Exception{ + //对读取到的每一行数据按照空格分割 + String[] split = s.split(" "); + //将每个单词放入collector中作为输出,格式类似于{word:1} + for (String word : split) { + collector.collect(new Tuple2(word, 1)); + } + } + }) + .groupBy(0) + .sum(1) + .print(); + //environment.execute(); + } +} diff --git a/Flink/src/main/java/org/example/SocketStreamWordCount.java b/Flink/src/main/java/org/example/SocketStreamWordCount.java new file mode 100644 index 0000000..ddc73fb --- /dev/null +++ b/Flink/src/main/java/org/example/SocketStreamWordCount.java @@ -0,0 +1,43 @@ +package org.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +public class SocketStreamWordCount { + public static void main(String[] args) throws Exception { + //获取Flink批处理执行环境 + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + final String host = "localhost"; + final int port = 8000; + //从socket中获取数据源 + DataStreamSource source = environment.socketTextStream(host, port); + //单词计数 + source + //将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1 + .flatMap(new FlatMapFunction>() { + public void flatMap(String s, Collector> collector) throws Exception { + //对读取到的每一行数据按照空格分割 + String[] split = s.split(" "); + //将每个单词放入collector中作为输出,格式类似于{word:1} + for (String word : split) { + collector.collect(new Tuple2(word, 1)); + } + } + }) + //聚合算子,按照第一个字段(即word字段)进行分组 + .keyBy(v -> v.f0) + //聚合算子,对每一个分租内的数据按照第二个字段进行求和 + .sum(1) + .print(); + + environment.execute(); + } +} + diff --git a/Flink/src/main/java/org/example/StreamWordCount.java b/Flink/src/main/java/org/example/StreamWordCount.java new file mode 100644 index 0000000..c502f6c --- /dev/null +++ b/Flink/src/main/java/org/example/StreamWordCount.java @@ -0,0 +1,37 @@ +package org.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +public class StreamWordCount { + public static void main(String[] args)throws Exception{ + StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); + + //从文件中获取数据源 + final String fileName = "F:\\java\\JavaSourceCodeLearning\\Flink\\src\\main\\java\\org\\example\\word-count.txt"; + DataStreamSource source = environment.readTextFile(fileName); + //单词计数 + source + //将一行句子按照空格拆分,输入一个字符串,输出一个2元组,key为一个单词,value为1 + .flatMap(new FlatMapFunction>() { + public void flatMap(String s, Collector> collector) throws Exception { + //对读取到的每一行数据按照空格分割 + String[] split = s.split(" "); + //将每个单词放入collector中作为输出,格式类似于{word:1} + for (String word : split) { + collector.collect(new Tuple2(word, 1)); + } + } + }) + //聚合算子,按照第一个字段(即word字段)进行分组 + .keyBy(v -> v.f0) + //聚合算子,对每一个分租内的数据按照第二个字段进行求和 + .sum(1) + .print(); + + environment.execute(); + } +} diff --git a/Flink/src/main/java/org/example/word-count.txt b/Flink/src/main/java/org/example/word-count.txt new file mode 100644 index 0000000..8cba333 --- /dev/null +++ b/Flink/src/main/java/org/example/word-count.txt @@ -0,0 +1,4 @@ +hello word +hello flink +hello java +java is best \ No newline at end of file diff --git a/Flink/src/test/java/org/example/AppTest.java b/Flink/src/test/java/org/example/AppTest.java new file mode 100644 index 0000000..6a1d2d7 --- /dev/null +++ b/Flink/src/test/java/org/example/AppTest.java @@ -0,0 +1,20 @@ +package org.example; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() + { + assertTrue( true ); + } +} diff --git a/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java b/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java index 1db2858..018cc33 100644 --- a/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java +++ b/JdkLearn/src/main/java/com/learnjava/lambda/LambdaComparatorDemo.java @@ -2,7 +2,6 @@ import lombok.AllArgsConstructor; import lombok.Data; -import sun.java2d.pipe.SpanShapeRenderer; import java.text.SimpleDateFormat; import java.util.Arrays;