44import org .apache .hadoop .conf .Configuration ;
55import org .apache .hadoop .fs .Path ;
66import org .apache .hadoop .hbase .HBaseConfiguration ;
7+ import org .apache .hadoop .hbase .KeyValue ;
78import org .apache .hadoop .hbase .TableName ;
89import org .apache .hadoop .hbase .client .Connection ;
910import org .apache .hadoop .hbase .client .ConnectionFactory ;
1011import org .apache .hadoop .hbase .client .Put ;
1112import org .apache .hadoop .hbase .client .Table ;
13+ import org .apache .hadoop .hbase .io .ImmutableBytesWritable ;
14+ import org .apache .hadoop .hbase .mapreduce .TableOutputFormat ;
15+ import org .apache .hadoop .hbase .util .Bytes ;
1216import org .apache .spark .SparkConf ;
17+ import org .apache .spark .api .java .JavaPairRDD ;
18+ import org .apache .spark .api .java .JavaRDD ;
1319import org .apache .spark .api .java .function .ForeachPartitionFunction ;
1420import org .apache .spark .api .java .function .MapFunction ;
1521import org .apache .spark .sql .*;
22+ import scala .Tuple2 ;
23+
1624
1725import java .io .IOException ;
1826import java .io .Serializable ;
@@ -29,7 +37,8 @@ public class LoadToHBase implements Serializable{
2937public LoadToHBase (){
3038conf = new SparkConf ()
3139 .setAppName (getClass ().getName ())
32- .setIfMissing ("spark.master" , "local[*]" );
40+ .setIfMissing ("spark.master" , "local[*]" )
41+ .setIfMissing ("spark.driver.memory" , "4g" );
3342spark = SparkSession .builder ().config (conf ).getOrCreate ();
3443 }
3544
@@ -52,13 +61,19 @@ private void saveStockRecords(Iterator<Stock> rows){
5261conn = ConnectionFactory .createConnection (configuration );
5362table = conn .getTable (TableName .valueOf ("ns1:stocks" ));
5463List <Put > puts = new ArrayList <>();
64+ int batchSize = 2000 ;
65+ int count = 0 ;
5566while (rows .hasNext ()){
5667Stock stock = rows .next ();
5768puts .add (stock .toPut ());
69+ if (puts .size () % batchSize == 0 ){
70+ table .put (puts );
71+ puts .clear ();
72+ }
73+ ++count ;
5874 }
59- System .out .println (String .format ("Saving %d records" , puts .size ()));
60- Object [] results = new Object [puts .size ()];
6175table .put (puts );
76+ System .out .println (String .format ("Saving %d records" , count ));
6277table .close ();
6378 } catch (IOException ex ){
6479ex .printStackTrace ();
@@ -71,32 +86,63 @@ private void saveStockRecords(Iterator<Stock> rows){
7186 }
7287 }
7388
89+ private Stock rowToStock (Row row ){
90+ Stock stock = new Stock ();
91+
92+ stock .setDate (row .getAs ("date" ));
93+
94+ stock .setOpen (row .getAs ("open" ));
95+ stock .setClose (row .getAs ("close" ));
96+ stock .setHigh (row .getAs ("high" ));
97+ stock .setLow (row .getAs ("low" ));
98+ stock .setClose (row .getAs ("close" ));
99+ stock .setAdjclose (row .getAs ("adjclose" ));
100+ stock .setVolume (row .getAs ("volume" ));
101+ stock .setSymbol (row .getAs ("symbol" ));
102+
103+ return stock ;
104+ }
105+
74106public void saveToHBase (String path ){
75107Dataset <Row > dataset = loadCsv (path ).withColumn ("date"
76108 , functions .expr ("cast(`date` as date) as `date`" ));
77109
78- Dataset <Stock > stockRows = dataset .map ((MapFunction <Row , Stock >) row ->{
79- Stock stock = new Stock ();
110+ Dataset <Stock > stockRows = dataset .map ((MapFunction <Row , Stock >) row -> rowToStock (row ), Encoders .bean (Stock .class ));
80111
81- stock . setDate ( row . getAs ( "date" ) );
112+ stockRows . show ( );
82113
83- stock .setOpen (row .getAs ("open" ));
84- stock .setClose (row .getAs ("close" ));
85- stock .setHigh (row .getAs ("high" ));
86- stock .setLow (row .getAs ("low" ));
87- stock .setClose (row .getAs ("close" ));
88- stock .setAdjclose (row .getAs ("adjclose" ));
89- stock .setVolume (row .getAs ("volume" ));
90- stock .setSymbol (row .getAs ("symbol" ));
114+ stockRows .foreachPartition ((ForeachPartitionFunction <Stock >) rows -> saveStockRecords (rows ));
115+ }
91116
92- return stock ;
93- }, Encoders .bean (Stock .class ));
117+
118+ public void createHFiles (String path , String outputPath ){
119+ Dataset <Row > dataset = loadCsv (path ).withColumn ("date"
120+ , functions .expr ("cast(`date` as date) as `date`" ));
121+
122+ Dataset <Stock > stockRows = dataset .map ((MapFunction <Row , Stock >) row -> rowToStock (row ), Encoders .bean (Stock .class ));
94123
95124stockRows .show ();
96125
97- stockRows .foreachPartition ((ForeachPartitionFunction <Stock >) rows -> saveStockRecords (rows ));
126+ JavaPairRDD <ImmutableBytesWritable , Put > pairRdd = stockRows .javaRDD ().mapToPair (r ->
127+ new Tuple2 <>(r .toKey (), r .toPut ()));
128+
129+
130+ Configuration configuration = HBaseConfiguration .create ();
131+ String resourcePath = LoadToHBase .class
132+ .getClassLoader ()
133+ .getResource ("hbase-site.xml" )
134+ .getPath ();
135+ configuration .addResource (new Path (resourcePath ));
136+
137+ configuration .set (TableOutputFormat .OUTPUT_TABLE , "ns1:stocks" );
138+ pairRdd .saveAsNewAPIHadoopFile (outputPath
139+ , ImmutableBytesWritable .class
140+ , Put .class
141+ , TableOutputFormat .class
142+ , configuration );
98143
99144 }
145+
100146public void close (){
101147spark .close ();
102148 }
@@ -105,6 +151,7 @@ public static void main(String[] agrs){
105151String path = "/data/stocks.csv" ;
106152LoadToHBase loadToHBase = new LoadToHBase ();
107153loadToHBase .saveToHBase (path );
154+ //loadToHBase.createHFiles(path, "/tmp/stocks_hfile");
108155loadToHBase .close ();
109156
110157 }
0 commit comments