1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.util.Date;
20
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandler.Sharable;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.handler.codec.mqtt.MqttMessage;
26 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
27 import net.anyflow.lannister.AbnormalDisconnectEventArgs;
28 import net.anyflow.lannister.plugin.DisconnectEventListener;
29 import net.anyflow.lannister.plugin.Plugins;
30 import net.anyflow.lannister.session.Session;
31
32 @Sharable
33 public class GenericReceiver extends SimpleChannelInboundHandler<MqttMessage> {
34 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericReceiver.class);
35 public static final GenericReceiver INSTANCE = new GenericReceiver();
36
37 @Override
38 protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
39 if (!msg.decoderResult().isSuccess()) {
40 logger.error("decoding MQTT message failed : {}", msg.decoderResult().cause().getMessage());
41
42 Session session = Session.NEXUS.get(ctx.channel().id());
43 if (session != null) {
44 session.dispose(true);
45 }
46 else {
47 ctx.channel().writeAndFlush(msg).addListener(f -> {
48 logger.debug("packet outgoing [{}]", msg);
49
50 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
51 Plugins.INSTANCE.get(DisconnectEventListener.class)
52 .disconnected(new AbnormalDisconnectEventArgs()));
53 });
54 }
55 return;
56 }
57 else {
58 logger.debug("packet incoming [message={}]", msg.toString());
59
60 Session session = Session.NEXUS.get(ctx.channel().id());
61 if (session == null) {
62 logger.error("None exist session message : {}", msg.toString());
63
64 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
65 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
66 return;
67 }
68
69 session.setLastIncomingTime(new Date());
70
71 switch (msg.fixedHeader().messageType()) {
72 case DISCONNECT:
73 DisconnectReceiver.SHARED.handle(ctx, session);
74 return;
75
76 case PINGREQ:
77 PingReqReceiver.SHARED.handle(ctx, session);
78 return;
79
80 case PUBREC:
81 PubRecReceiver.SHARED.handle(ctx, session,
82 ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
83 return;
84
85 case PUBREL:
86 PubRelReceiver.SHARED.handle(ctx, session,
87 ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
88 return;
89
90 case PUBCOMP:
91 PubCompReceiver.SHARED.handle(ctx, session,
92 ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
93 return;
94
95 default:
96 session.dispose(true);
97 return;
98 }
99 }
100 }
101
102 @Override
103 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
104 Session session = Session.NEXUS.get(ctx.channel().id());
105 if (session == null) {
106 logger.debug("session does not exist : [channelId={}]", ctx.channel().id());
107 return;
108 }
109 else {
110 session.dispose(true);
111
112 }
113 }
114
115 @Override
116 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
117 logger.error(cause.getMessage(), cause);
118
119 Session session = Session.NEXUS.get(ctx.channel().id());
120 if (session != null) {
121 session.dispose(true);
122 }
123 else {
124 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
125 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
126 }
127 }
128 }