1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.util.Date;
20 import java.util.List;
21
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler.Sharable;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
27 import net.anyflow.lannister.AbnormalDisconnectEventArgs;
28 import net.anyflow.lannister.plugin.DisconnectEventListener;
29 import net.anyflow.lannister.plugin.Plugins;
30 import net.anyflow.lannister.plugin.UnsubscribeEventArgs;
31 import net.anyflow.lannister.plugin.UnsubscribeEventListener;
32 import net.anyflow.lannister.session.Session;
33 import net.anyflow.lannister.topic.TopicSubscriber;
34 import net.anyflow.lannister.topic.TopicSubscription;
35
36 @Sharable
37 public class UnsubscribeReceiver extends SimpleChannelInboundHandler<MqttUnsubscribeMessage> {
38 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnsubscribeReceiver.class);
39 public static final UnsubscribeReceiver INSTANCE = new UnsubscribeReceiver();
40
41 @Override
42 protected void channelRead0(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
43 logger.debug("packet incoming [message={}]", msg.toString());
44
45 Session session = Session.NEXUS.get(ctx.channel().id());
46 if (session == null) {
47 logger.error("None exist session message [message={}]", msg.toString());
48
49 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
50 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
51 return;
52 }
53
54 session.setLastIncomingTime(new Date());
55
56 List<String> topicFilters = msg.payload().topics();
57
58 if (topicFilters == null || topicFilters.isEmpty()) {
59 session.dispose(true);
60 return;
61 }
62
63 topicFilters.stream().forEach(tf -> {
64 TopicSubscription.NEXUS.removeByKey(tf, session.clientId());
65 TopicSubscriber.NEXUS.removeByTopicFilter(session.clientId(), tf);
66 });
67
68 Plugins.INSTANCE.get(UnsubscribeEventListener.class).unsubscribed(new UnsubscribeEventArgs() {
69 @Override
70 public String clientId() {
71 return session.clientId();
72 }
73
74 @Override
75 public List<String> topicFilters() {
76 return topicFilters;
77 }
78 });
79
80 session.send(MqttMessageFactory.unsuback(msg.variableHeader().messageId()), null);
81 }
82 }