1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.client;
18
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.SimpleChannelInboundHandler;
21 import io.netty.handler.codec.mqtt.MqttMessage;
22 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
23 import io.netty.handler.codec.mqtt.MqttPublishMessage;
24 import io.netty.handler.codec.mqtt.MqttQoS;
25 import net.anyflow.lannister.message.Message;
26 import net.anyflow.lannister.packetreceiver.MqttMessageFactory;
27
28 public class MqttPacketReceiver extends SimpleChannelInboundHandler<MqttMessage> {
29 @SuppressWarnings("unused")
30 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttPacketReceiver.class);
31
32 private final MqttClient client;
33 private final MessageReceiver receiver;
34 private final SharedObject sharedObject;
35
36 protected MqttPacketReceiver(MqttClient client, MessageReceiver receiver, SharedObject sharedObject) {
37 this.client = client;
38 this.receiver = receiver;
39 this.sharedObject = sharedObject;
40 }
41
42 @Override
43 protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
44 switch (msg.fixedHeader().messageType()) {
45 case PUBLISH:
46 if (receiver != null) {
47 receiver.messageReceived(Message.newMessage(client.clientId(), (MqttPublishMessage) msg));
48 }
49
50 int messageId = ((MqttPublishMessage) msg).variableHeader().messageId();
51 if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.AT_LEAST_ONCE) {
52 client.send(MqttMessageFactory.puback(messageId));
53 }
54 else if (((MqttPublishMessage) msg).fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
55 client.send(MqttMessageFactory.pubrec(messageId));
56 }
57 break;
58
59 case CONNACK:
60 sharedObject.receivedMessage(msg);
61
62 synchronized (sharedObject.locker()) {
63 sharedObject.locker().notify();
64 }
65 break;
66
67 case PUBREC:
68 client.send(MqttMessageFactory.pubrel(((MqttMessageIdVariableHeader) msg.variableHeader()).messageId()));
69 break;
70
71 case SUBACK:
72 case PUBACK:
73 case PUBCOMP:
74 default:
75 break;
76 }
77 }
78 }