View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.packetreceiver;
18  
19  import java.util.Date;
20  
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler.Sharable;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.SimpleChannelInboundHandler;
25  import io.netty.handler.codec.mqtt.MqttMessage;
26  import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
27  import net.anyflow.lannister.AbnormalDisconnectEventArgs;
28  import net.anyflow.lannister.plugin.DisconnectEventListener;
29  import net.anyflow.lannister.plugin.Plugins;
30  import net.anyflow.lannister.session.Session;
31  
32  @Sharable
33  public class GenericReceiver extends SimpleChannelInboundHandler<MqttMessage> {
34  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(GenericReceiver.class);
35  	public static final GenericReceiver INSTANCE = new GenericReceiver();
36  
37  	@Override
38  	protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
39  		if (!msg.decoderResult().isSuccess()) {
40  			logger.error("decoding MQTT message failed : {}", msg.decoderResult().cause().getMessage());
41  
42  			Session session = Session.NEXUS.get(ctx.channel().id());
43  			if (session != null) {
44  				session.dispose(true); // [MQTT-4.8.0-1]
45  			}
46  			else {
47  				ctx.channel().writeAndFlush(msg).addListener(f -> {
48  					logger.debug("packet outgoing [{}]", msg);
49  
50  					ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-3.2.2-5]
51  					Plugins.INSTANCE.get(DisconnectEventListener.class)
52  							.disconnected(new AbnormalDisconnectEventArgs()));
53  				});
54  			}
55  			return;
56  		}
57  		else {
58  			logger.debug("packet incoming [message={}]", msg.toString());
59  
60  			Session session = Session.NEXUS.get(ctx.channel().id());
61  			if (session == null) {
62  				logger.error("None exist session message : {}", msg.toString());
63  
64  				ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-4.8.0-1]
65  				Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
66  				return;
67  			}
68  
69  			session.setLastIncomingTime(new Date());
70  
71  			switch (msg.fixedHeader().messageType()) {
72  			case DISCONNECT:
73  				DisconnectReceiver.SHARED.handle(ctx, session);
74  				return;
75  
76  			case PINGREQ:
77  				PingReqReceiver.SHARED.handle(ctx, session);
78  				return;
79  
80  			case PUBREC:
81  				PubRecReceiver.SHARED.handle(ctx, session,
82  						((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
83  				return;
84  
85  			case PUBREL:
86  				PubRelReceiver.SHARED.handle(ctx, session,
87  						((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
88  				return;
89  
90  			case PUBCOMP:
91  				PubCompReceiver.SHARED.handle(ctx, session,
92  						((MqttMessageIdVariableHeader) msg.variableHeader()).messageId());
93  				return;
94  
95  			default:
96  				session.dispose(true); // [MQTT-4.8.0-1]
97  				return;
98  			}
99  		}
100 	}
101 
102 	@Override
103 	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
104 		Session session = Session.NEXUS.get(ctx.channel().id());
105 		if (session == null) {
106 			logger.debug("session does not exist : [channelId={}]", ctx.channel().id());
107 			return;
108 		}
109 		else {
110 			session.dispose(true); // abnormal disconnection without DISCONNECT
111 									// [MQTT-4.8.0-1]
112 		}
113 	}
114 
115 	@Override
116 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
117 		logger.error(cause.getMessage(), cause);
118 
119 		Session session = Session.NEXUS.get(ctx.channel().id());
120 		if (session != null) {
121 			session.dispose(true);
122 		}
123 		else {
124 			ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-3.2.2-5]
125 			Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
126 		}
127 	}
128 }