1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.util.Collection;
20 import java.util.Date;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24
25 import com.google.common.collect.Lists;
26 import com.google.common.collect.Maps;
27
28 import io.netty.channel.ChannelFutureListener;
29 import io.netty.channel.ChannelHandler.Sharable;
30 import io.netty.channel.ChannelHandlerContext;
31 import io.netty.channel.SimpleChannelInboundHandler;
32 import io.netty.handler.codec.mqtt.MqttQoS;
33 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
34 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
35 import net.anyflow.lannister.AbnormalDisconnectEventArgs;
36 import net.anyflow.lannister.plugin.DefaultSubscribeEventListener;
37 import net.anyflow.lannister.plugin.DisconnectEventListener;
38 import net.anyflow.lannister.plugin.ITopicSubscription;
39 import net.anyflow.lannister.plugin.Plugins;
40 import net.anyflow.lannister.plugin.SubscribeEventArgs;
41 import net.anyflow.lannister.plugin.SubscribeEventListener;
42 import net.anyflow.lannister.session.Session;
43 import net.anyflow.lannister.topic.Topic;
44 import net.anyflow.lannister.topic.TopicMatcher;
45 import net.anyflow.lannister.topic.TopicSubscription;
46
47 @Sharable
48 public class SubscribeReceiver extends SimpleChannelInboundHandler<MqttSubscribeMessage> {
49 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubscribeReceiver.class);
50 public static final SubscribeReceiver INSTANCE = new SubscribeReceiver();
51
52 @Override
53 protected void channelRead0(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
54 logger.debug("packet incoming [message={}]", msg.toString());
55
56 Session session = Session.NEXUS.get(ctx.channel().id());
57 if (session == null) {
58 logger.error("None exist session message [message={}]", msg.toString());
59
60 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
61 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
62 return;
63 }
64
65 session.setLastIncomingTime(new Date());
66
67 List<MqttTopicSubscription> topicSubs = msg.payload().topicSubscriptions();
68
69 if (topicSubs == null || topicSubs.isEmpty()) {
70 session.dispose(true);
71 return;
72 }
73
74
75 Map.Entry<List<Integer>, Map<String, TopicSubscription>> returns = generateReturns(session.clientId(),
76 topicSubs);
77 List<Integer> grantedQoss = returns.getKey();
78 Map<String, TopicSubscription> topicSubscriptions = returns.getValue();
79
80 if (!executePlugins(session, topicSubscriptions.values())) { return; }
81
82 topicSubscriptions.values().forEach(topicSubscription -> TopicSubscription.NEXUS.put(topicSubscription));
83
84 session.send(MqttMessageFactory.suback(msg.variableHeader().messageId(), grantedQoss), null);
85
86 sendRetainedMessage(session, topicSubscriptions.keySet());
87 }
88
89 private void sendRetainedMessage(Session session, Set<String> topicFilters) {
90 Collection<Topic> topics = Topic.NEXUS.matches(topicFilters);
91
92 topics.forEach(topic -> {
93 if (topic.retainedMessage() == null) { return; }
94
95 topic.publish(session, topic.retainedMessage());
96 });
97 }
98
99 private Map.Entry<List<Integer>, Map<String, TopicSubscription>> generateReturns(String clientId,
100 List<MqttTopicSubscription> topicSubs) {
101 List<Integer> grantedQoss = Lists.newArrayList();
102 Map<String, TopicSubscription> topicSubscriptions = Maps.newHashMap();
103
104 topicSubs.stream().forEach(topicSub -> {
105 if (TopicMatcher.isValid(topicSub.topicName(), true)) {
106 TopicSubscription topicSubscription = new TopicSubscription(clientId, topicSub.topicName(),
107 topicSub.qualityOfService());
108
109 grantedQoss.add(topicSubscription.qos().value());
110 topicSubscriptions.put(topicSubscription.topicFilter(), topicSubscription);
111 }
112 else {
113 grantedQoss.add(MqttQoS.FAILURE.value());
114 }
115 });
116
117 return new Map.Entry<List<Integer>, Map<String, TopicSubscription>>() {
118 @Override
119 public List<Integer> getKey() {
120 return grantedQoss;
121 }
122
123 @Override
124 public Map<String, TopicSubscription> getValue() {
125 return topicSubscriptions;
126 }
127
128 @Override
129 public Map<String, TopicSubscription> setValue(Map<String, TopicSubscription> value) {
130 return null;
131 }
132 };
133 }
134
135 private boolean executePlugins(Session session, Collection<TopicSubscription> topicSubscriptions) {
136 SubscribeEventArgs args = new SubscribeEventArgs() {
137 @Override
138 public List<ITopicSubscription> topicSubscriptions() {
139 return Lists.newArrayList(topicSubscriptions);
140 }
141
142 @Override
143 public String clientId() {
144 return session.clientId();
145 }
146
147 @Override
148 public boolean cleanSession() {
149 return session.cleanSession();
150 }
151 };
152
153 if (!DefaultSubscribeEventListener.SHARED.allowSubscribe(args)) {
154 session.dispose(true);
155 return false;
156 }
157
158 if (!Plugins.INSTANCE.get(SubscribeEventListener.class).allowSubscribe(args)) {
159 session.dispose(true);
160 return false;
161 }
162
163 return true;
164 }
165 }