View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.packetreceiver;
18  
19  import java.util.Date;
20  import java.util.List;
21  
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler.Sharable;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.SimpleChannelInboundHandler;
26  import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
27  import net.anyflow.lannister.AbnormalDisconnectEventArgs;
28  import net.anyflow.lannister.plugin.DisconnectEventListener;
29  import net.anyflow.lannister.plugin.Plugins;
30  import net.anyflow.lannister.plugin.UnsubscribeEventArgs;
31  import net.anyflow.lannister.plugin.UnsubscribeEventListener;
32  import net.anyflow.lannister.session.Session;
33  import net.anyflow.lannister.topic.TopicSubscriber;
34  import net.anyflow.lannister.topic.TopicSubscription;
35  
36  @Sharable
37  public class UnsubscribeReceiver extends SimpleChannelInboundHandler<MqttUnsubscribeMessage> {
38  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnsubscribeReceiver.class);
39  	public static final UnsubscribeReceiver INSTANCE = new UnsubscribeReceiver();
40  
41  	@Override
42  	protected void channelRead0(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
43  		logger.debug("packet incoming [message={}]", msg.toString());
44  
45  		Session session = Session.NEXUS.get(ctx.channel().id());
46  		if (session == null) {
47  			logger.error("None exist session message [message={}]", msg.toString());
48  
49  			ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-4.8.0-1]
50  			Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
51  			return;
52  		}
53  
54  		session.setLastIncomingTime(new Date());
55  
56  		List<String> topicFilters = msg.payload().topics();
57  
58  		if (topicFilters == null || topicFilters.isEmpty()) {
59  			session.dispose(true); // [MQTT-4.8.0-1]
60  			return;
61  		}
62  
63  		topicFilters.stream().forEach(tf -> {
64  			TopicSubscription.NEXUS.removeByKey(tf, session.clientId());
65  			TopicSubscriber.NEXUS.removeByTopicFilter(session.clientId(), tf);
66  		});
67  
68  		Plugins.INSTANCE.get(UnsubscribeEventListener.class).unsubscribed(new UnsubscribeEventArgs() {
69  			@Override
70  			public String clientId() {
71  				return session.clientId();
72  			}
73  
74  			@Override
75  			public List<String> topicFilters() {
76  				return topicFilters;
77  			}
78  		});
79  
80  		session.send(MqttMessageFactory.unsuback(msg.variableHeader().messageId()), null); // [MQTT-2.3.1-7],[MQTT-3.10.4-4],[MQTT-3.10.4-5]
81  	}
82  }