Skip to content

Commit a433a10

Browse files
authored
Create CustomForEachSink.java
1 parent 9aba4c7 commit a433a10

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
packagecom.example;
2+
3+
importjava.sql.Connection;
4+
importjava.sql.DriverManager;
5+
importjava.sql.PreparedStatement;
6+
importjava.sql.SQLException;
7+
8+
importorg.apache.spark.sql.ForeachWriter;
9+
importorg.apache.spark.sql.Row;
10+
11+
/*
12+
13+
Prepare the mysql database
14+
1. create database
15+
create database iot;
16+
2. Create a table inside iot database
17+
create table tags(tag varchar(255) primary key not null, count bigint not null);
18+
19+
Add mysql jdbc as maven dependency to the project.
20+
21+
*/
22+
23+
publicclassCustomForEachSinkextendsForeachWriter<Row>{
24+
25+
privatestaticfinallongserialVersionUID = 1L;
26+
privateConnectionconn = null;
27+
privatePreparedStatementstatement = null;
28+
29+
30+
@Override
31+
publicbooleanopen(longpartitionId, longversion){
32+
try{
33+
Class.forName("com.mysql.jdbc.Driver");
34+
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/iot","root","cloudera");
35+
Stringsql = "insert into tags (tag, count) value (?, ?) ON DUPLICATE KEY UPDATE count=?";
36+
statement = conn.prepareStatement(sql);
37+
} catch (Exceptione){
38+
// TODO Auto-generated catch block
39+
e.printStackTrace();
40+
}
41+
returntrue;
42+
}
43+
44+
@Override
45+
publicvoidprocess(Rowrow){
46+
Stringtag = row.getString(0);
47+
Longcount = row.getLong(1);
48+
try{
49+
statement.setString(1, tag);
50+
statement.setLong(2, count);
51+
statement.setLong(3, count);
52+
statement.execute();
53+
} catch (SQLExceptione){
54+
// TODO Auto-generated catch block
55+
e.printStackTrace();
56+
}
57+
System.out.println(tag + ":" + count);
58+
}
59+
60+
@Override
61+
publicvoidclose(ThrowableerrorOrNull){
62+
try{
63+
conn.close();
64+
} catch (SQLExceptione){
65+
// TODO Auto-generated catch block
66+
e.printStackTrace();
67+
}
68+
}
69+
}

0 commit comments

Comments
(0)