Skip to content

Commit 6da62e9

Browse files
committed
update
1 parent 8c2e70a commit 6da62e9

File tree

4 files changed

+37
-30
lines changed

4 files changed

+37
-30
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
packageio.kimmking.javacourse.kafka;
2+
3+
importio.kimmking.javacourse.kafka.kimmking.ConsumerImpl;
4+
importio.kimmking.javacourse.kafka.kimmking.ProducerImpl;
5+
6+
publicclassKafkaConsumerDemo{
7+
8+
publicstaticvoidmain(String[] args){
9+
testConsumer();
10+
}
11+
12+
privatestaticvoidtestConsumer(){
13+
ConsumerImplconsumer = newConsumerImpl();
14+
consumer.consumeOrder();
15+
16+
}
17+
}

09mq/kafka-demo/src/main/java/io/kimmking/javacourse/kafka/KafkaDemo.java renamed to 09mq/kafka-demo/src/main/java/io/kimmking/javacourse/kafka/KafkaProducerDemo.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,10 @@
33
importio.kimmking.javacourse.kafka.kimmking.ConsumerImpl;
44
importio.kimmking.javacourse.kafka.kimmking.ProducerImpl;
55

6-
publicclassKafkaDemo{
6+
publicclassKafkaProducerDemo{
77

88
publicstaticvoidmain(String[] args){
9-
//testProducer();
10-
testConsumer();
11-
}
12-
13-
privatestaticvoidtestConsumer(){
14-
ConsumerImplconsumer = newConsumerImpl();
15-
consumer.consumeOrder();
16-
9+
testProducer();
1710
}
1811

1912
privatestaticvoidtestProducer(){
@@ -22,7 +15,5 @@ private static void testProducer(){
2215
producer.send(newOrder(1000L + i,System.currentTimeMillis(),"USD2CNY", 6.5d));
2316
producer.send(newOrder(2000L + i,System.currentTimeMillis(),"USD2CNY", 6.51d));
2417
}
25-
2618
}
27-
2819
}

‎09mq/kafka-demo/src/main/java/io/kimmking/javacourse/kafka/kimmking/ConsumerImpl.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public ConsumerImpl(){
2121
// properties.put("enable.auto.commit", false);
2222
// properties.put("isolation.level", "read_committed");
2323
// properties.put("auto.offset.reset", "latest");
24-
properties.put("group.id", "java0-kimmking");
24+
properties.put("group.id", "java1-kimmking");
2525
properties.put("bootstrap.servers", "localhost:9092");
2626
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
2727
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@@ -58,7 +58,7 @@ public void consumeOrder(){
5858
e.printStackTrace();
5959
} finally{
6060
try{
61-
consumer.commitSync(currentOffsets);
61+
consumer.commitSync();//currentOffsets);
6262
} catch (Exceptione){
6363
consumer.close();
6464
}

‎09mq/kafka-demo/src/main/java/io/kimmking/javacourse/kafka/kimmking/ProducerImpl.java‎

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,37 +16,36 @@ public class ProducerImpl implements Producer{
1616

1717
publicProducerImpl(){
1818
properties = newProperties();
19-
properties.put("queue.enqueue.timeout.ms", -1);
20-
properties.put("enable.idempotence", true);
2119
properties.put("bootstrap.servers", "localhost:9092");
22-
properties.put("transactional.id", "transactional_1");
23-
properties.put("acks", "all");
24-
properties.put("retries", "3");
25-
properties.put("max.in.flight.requests.per.connection", 1);
20+
// properties.put("queue.enqueue.timeout.ms", -1);
21+
// properties.put("enable.idempotence", true);
22+
// properties.put("transactional.id", "transactional_1");
23+
// properties.put("acks", "all");
24+
// properties.put("retries", "3");
25+
// properties.put("max.in.flight.requests.per.connection", 1);
2626
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
2727
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
2828
producer = newKafkaProducer<String, String>(properties);
29-
producer.initTransactions();
29+
//producer.initTransactions();
3030
}
3131

3232
@Override
3333
publicvoidsend(Orderorder){
34-
Objectjson = JSON.toJSON(order);
3534
try{
36-
producer.beginTransaction();
37-
ProducerRecordrecord = newProducerRecord(topic, order.getId().toString(), json.toString());
35+
//producer.beginTransaction();
36+
ProducerRecordrecord = newProducerRecord(topic, order.getId().toString(), JSON.toJSONString(order));
3837
producer.send(record, (metadata, exception) ->{
39-
if (exception != null){
40-
producer.abortTransaction();
41-
thrownewKafkaException(exception.getMessage() + " , data: " + record);
42-
}
38+
// if (exception != null){
39+
// producer.abortTransaction();
40+
// throw new KafkaException(exception.getMessage() + " , data: " + record);
41+
// }
4342
});
44-
producer.commitTransaction();
43+
//producer.commitTransaction();
4544

4645
} catch (Throwablee){
47-
producer.abortTransaction();
46+
//producer.abortTransaction();
4847
}
49-
System.out.println("************" + json + "************");
48+
//System.out.println("************" + json + "************");
5049
}
5150

5251
@Override

0 commit comments

Comments
(0)