1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.net.InetSocketAddress;
20
21 import com.google.common.base.Strings;
22
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandler.Sharable;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.SimpleChannelInboundHandler;
27 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
28 import io.netty.handler.codec.mqtt.MqttConnectMessage;
29 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
30 import io.netty.handler.codec.mqtt.MqttQoS;
31 import io.netty.util.CharsetUtil;
32 import net.anyflow.lannister.AbnormalDisconnectEventArgs;
33 import net.anyflow.lannister.Settings;
34 import net.anyflow.lannister.cluster.ClusterDataFactory;
35 import net.anyflow.lannister.message.Message;
36 import net.anyflow.lannister.plugin.Authenticator;
37 import net.anyflow.lannister.plugin.Authorizer;
38 import net.anyflow.lannister.plugin.ConnectEventArgs;
39 import net.anyflow.lannister.plugin.ConnectEventListener;
40 import net.anyflow.lannister.plugin.DisconnectEventListener;
41 import net.anyflow.lannister.plugin.IMessage;
42 import net.anyflow.lannister.plugin.Plugins;
43 import net.anyflow.lannister.plugin.ServiceChecker;
44 import net.anyflow.lannister.session.Session;
45 import net.anyflow.lannister.topic.Topic;
46
47 @Sharable
48 public class ConnectReceiver extends SimpleChannelInboundHandler<MqttConnectMessage> {
49 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectReceiver.class);
50 public static final ConnectReceiver INSTANCE = new ConnectReceiver();
51
52 @Override
53 protected void channelRead0(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
54 logger.debug("packet incoming [message={}]", msg.toString());
55
56 Session session = Session.NEXUS.get(ctx.channel().id());
57 if (session != null) {
58 session.dispose(true);
59 return;
60 }
61
62 boolean cleanSession = msg.variableHeader().isCleanSession();
63 String clientId = msg.payload().clientIdentifier();
64
65 if (Strings.isNullOrEmpty(clientId)) {
66 clientId = generateClientId(ctx, cleanSession);
67
68 if (clientId == null) { return; }
69 }
70
71 if (!filterPlugins(ctx, msg)) { return; }
72
73 session = Session.NEXUS.get(clientId);
74 boolean sessionPresent = !cleanSession && session != null;
75
76 String clientIp = ctx.channel().remoteAddress() instanceof InetSocketAddress
77 ? ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() : "0.0.0.0";
78 int clientPort = ctx.channel().remoteAddress() instanceof InetSocketAddress
79 ? ((InetSocketAddress) ctx.channel().remoteAddress()).getPort() : -1;
80
81 if (cleanSession) {
82 if (session != null) {
83 session.dispose(false);
84 }
85 session = newSession(msg, cleanSession, clientId, clientIp, clientPort);
86 }
87 else if (session == null) {
88 session = newSession(msg, cleanSession, clientId, clientIp, clientPort);
89 }
90
91 Session.NEXUS.put(session, ctx);
92
93 processRetainedWill(session);
94
95 final Session sessionFinal = session;
96 final MqttConnAckMessage acceptMsg = MqttMessageFactory.connack(MqttConnectReturnCode.CONNECTION_ACCEPTED,
97 sessionPresent);
98 final String log = acceptMsg.toString();
99
100 session.send(acceptMsg, f -> {
101 if (!f.isSuccess()) {
102 logger.error("packet outgoing failed [{}] {}", log, f.cause());
103 return;
104 }
105
106 ctx.channel().eventLoop().execute(
107 () -> Plugins.INSTANCE.get(ConnectEventListener.class).connectHandled(new ConnectEventArgs() {
108 @Override
109 public String clientId() {
110 return sessionFinal.clientId();
111 }
112
113 @Override
114 public IMessage will() {
115 return sessionFinal.will();
116 }
117
118 @Override
119 public Boolean cleanSession() {
120 return sessionFinal.cleanSession();
121 }
122
123 @Override
124 public MqttConnectReturnCode returnCode() {
125 return MqttConnectReturnCode.CONNECTION_ACCEPTED;
126 }
127 }));
128
129 if (!sessionFinal.cleanSession()) {
130 sessionFinal.completeRemainedMessages();
131 }
132 });
133 }
134
135 private void processRetainedWill(Session session) {
136 if (session.will() == null || !session.will().isRetain()) { return; }
137
138
139 Topic topic = Topic.NEXUS.get(session.will().topicName());
140 if (topic == null) {
141 topic = new Topic(session.will().topicName());
142 Topic.NEXUS.insert(topic);
143 }
144
145 topic.setRetainedMessage(session.will());
146 }
147
148 private String generateClientId(ChannelHandlerContext ctx, boolean cleanSession) {
149 if (cleanSession) {
150 if (Settings.INSTANCE.getBoolean("mqttserver.acceptEmptyClientId", true)) {
151 return "Lannister_"
152 + Long.toString(ClusterDataFactory.INSTANCE.createIdGenerator("clientIdGenerator").newId());
153 }
154 else {
155 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
156 return null;
157 }
158 }
159 else {
160 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
161 return null;
162 }
163 }
164
165 private Session newSession(MqttConnectMessage msg, boolean cleanSession, String clientId, String clientIp,
166 int clientPort) {
167 return new Session(clientId, clientIp, clientPort, msg.variableHeader().keepAliveTimeSeconds(), cleanSession,
168 newWill(clientId, msg));
169 }
170
171 private Message newWill(String clientId, MqttConnectMessage conn) {
172 if (!conn.variableHeader().isWillFlag()) { return null; }
173
174 return new Message(-1, conn.payload().willTopic(), clientId,
175 conn.payload().willMessage().getBytes(CharsetUtil.UTF_8),
176 MqttQoS.valueOf(conn.variableHeader().willQos()), conn.variableHeader().isWillRetain());
177 }
178
179 private boolean filterPlugins(ChannelHandlerContext ctx, MqttConnectMessage msg) {
180 String clientId = msg.payload().clientIdentifier();
181 String userName = msg.variableHeader().hasUserName() ? msg.payload().userName() : null;
182 String password = msg.variableHeader().hasPassword() ? msg.payload().password() : null;
183
184 if (!Plugins.INSTANCE.get(ServiceChecker.class).isServiceAvailable()) {
185 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
186 return false;
187 }
188
189 if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId)) {
190 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
191 return false;
192 }
193
194 if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId, userName, password)) {
195 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
196 return false;
197 }
198
199 if (!Plugins.INSTANCE.get(Authorizer.class).isAuthorized(clientId, userName)) {
200 sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
201 return false;
202 }
203
204 return true;
205 }
206
207 private void sendNoneAcceptMessage(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) {
208 assert returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED;
209
210 MqttConnAckMessage msg = MqttMessageFactory.connack(returnCode, false);
211
212 ctx.channel().writeAndFlush(msg).addListener(f -> {
213 Plugins.INSTANCE.get(ConnectEventListener.class).connectHandled(new ConnectEventArgs() {
214 @Override
215 public String clientId() {
216 return null;
217 }
218
219 @Override
220 public IMessage will() {
221 return null;
222 }
223
224 @Override
225 public Boolean cleanSession() {
226 return null;
227 }
228
229 @Override
230 public MqttConnectReturnCode returnCode() {
231 return returnCode;
232 }
233 });
234
235 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
236 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
237 });
238 }
239 }