View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.packetreceiver;
18  
19  import java.net.InetSocketAddress;
20  
21  import com.google.common.base.Strings;
22  
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler.Sharable;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.SimpleChannelInboundHandler;
27  import io.netty.handler.codec.mqtt.MqttConnAckMessage;
28  import io.netty.handler.codec.mqtt.MqttConnectMessage;
29  import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
30  import io.netty.handler.codec.mqtt.MqttQoS;
31  import io.netty.util.CharsetUtil;
32  import net.anyflow.lannister.AbnormalDisconnectEventArgs;
33  import net.anyflow.lannister.Settings;
34  import net.anyflow.lannister.cluster.ClusterDataFactory;
35  import net.anyflow.lannister.message.Message;
36  import net.anyflow.lannister.plugin.Authenticator;
37  import net.anyflow.lannister.plugin.Authorizer;
38  import net.anyflow.lannister.plugin.ConnectEventArgs;
39  import net.anyflow.lannister.plugin.ConnectEventListener;
40  import net.anyflow.lannister.plugin.DisconnectEventListener;
41  import net.anyflow.lannister.plugin.IMessage;
42  import net.anyflow.lannister.plugin.Plugins;
43  import net.anyflow.lannister.plugin.ServiceChecker;
44  import net.anyflow.lannister.session.Session;
45  import net.anyflow.lannister.topic.Topic;
46  
47  @Sharable
48  public class ConnectReceiver extends SimpleChannelInboundHandler<MqttConnectMessage> {
49  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectReceiver.class);
50  	public static final ConnectReceiver INSTANCE = new ConnectReceiver();
51  
52  	@Override
53  	protected void channelRead0(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
54  		logger.debug("packet incoming [message={}]", msg.toString());
55  
56  		Session session = Session.NEXUS.get(ctx.channel().id());
57  		if (session != null) {
58  			session.dispose(true); // [MQTT-3.1.0-2]
59  			return;
60  		}
61  
62  		boolean cleanSession = msg.variableHeader().isCleanSession();
63  		String clientId = msg.payload().clientIdentifier();
64  
65  		if (Strings.isNullOrEmpty(clientId)) {
66  			clientId = generateClientId(ctx, cleanSession);
67  
68  			if (clientId == null) { return; }
69  		}
70  
71  		if (!filterPlugins(ctx, msg)) { return; }
72  
73  		session = Session.NEXUS.get(clientId); // [MQTT-3.1.2-4]
74  		boolean sessionPresent = !cleanSession && session != null; // [MQTT-3.2.2-1],[MQTT-3.2.2-2],[MQTT-3.2.2-3]
75  
76  		String clientIp = ctx.channel().remoteAddress() instanceof InetSocketAddress
77  				? ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() : "0.0.0.0";
78  		int clientPort = ctx.channel().remoteAddress() instanceof InetSocketAddress
79  				? ((InetSocketAddress) ctx.channel().remoteAddress()).getPort() : -1;
80  
81  		if (cleanSession) {
82  			if (session != null) {
83  				session.dispose(false); // [MQTT-3.1.4-2]
84  			}
85  			session = newSession(msg, cleanSession, clientId, clientIp, clientPort); // [MQTT-3.1.2-6]
86  		}
87  		else if (session == null) { // [MQTT-3.1.2-4]
88  			session = newSession(msg, cleanSession, clientId, clientIp, clientPort);
89  		}
90  
91  		Session.NEXUS.put(session, ctx);
92  
93  		processRetainedWill(session);
94  
95  		final Session sessionFinal = session;
96  		final MqttConnAckMessage acceptMsg = MqttMessageFactory.connack(MqttConnectReturnCode.CONNECTION_ACCEPTED,
97  				sessionPresent); // [MQTT-3.1.4-4]
98  		final String log = acceptMsg.toString();
99  
100 		session.send(acceptMsg, f -> { // [MQTT-3.2.0-1]
101 			if (!f.isSuccess()) {
102 				logger.error("packet outgoing failed [{}] {}", log, f.cause());
103 				return;
104 			}
105 
106 			ctx.channel().eventLoop().execute(
107 					() -> Plugins.INSTANCE.get(ConnectEventListener.class).connectHandled(new ConnectEventArgs() {
108 						@Override
109 						public String clientId() {
110 							return sessionFinal.clientId();
111 						}
112 
113 						@Override
114 						public IMessage will() {
115 							return sessionFinal.will();
116 						}
117 
118 						@Override
119 						public Boolean cleanSession() {
120 							return sessionFinal.cleanSession();
121 						}
122 
123 						@Override
124 						public MqttConnectReturnCode returnCode() {
125 							return MqttConnectReturnCode.CONNECTION_ACCEPTED;
126 						}
127 					}));
128 
129 			if (!sessionFinal.cleanSession()) {
130 				sessionFinal.completeRemainedMessages(); // [MQTT-4.4.0-1]
131 			}
132 		});
133 	}
134 
135 	private void processRetainedWill(Session session) {
136 		if (session.will() == null || !session.will().isRetain()) { return; }
137 
138 		// [MQTT-3.1.2-16],[MQTT-3.1.2-17]
139 		Topic topic = Topic.NEXUS.get(session.will().topicName());
140 		if (topic == null) {
141 			topic = new Topic(session.will().topicName());
142 			Topic.NEXUS.insert(topic);
143 		}
144 
145 		topic.setRetainedMessage(session.will());
146 	}
147 
148 	private String generateClientId(ChannelHandlerContext ctx, boolean cleanSession) {
149 		if (cleanSession) {
150 			if (Settings.INSTANCE.getBoolean("mqttserver.acceptEmptyClientId", true)) {
151 				return "Lannister_"
152 						+ Long.toString(ClusterDataFactory.INSTANCE.createIdGenerator("clientIdGenerator").newId()); // [MQTT-3.1.3-6],[MQTT-3.1.3-7]
153 			}
154 			else {
155 				sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
156 				return null;
157 			}
158 		}
159 		else {
160 			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-8]
161 			return null;
162 		}
163 	}
164 
165 	private Session newSession(MqttConnectMessage msg, boolean cleanSession, String clientId, String clientIp,
166 			int clientPort) {
167 		return new Session(clientId, clientIp, clientPort, msg.variableHeader().keepAliveTimeSeconds(), cleanSession,
168 				newWill(clientId, msg));
169 	}
170 
171 	private Message newWill(String clientId, MqttConnectMessage conn) {
172 		if (!conn.variableHeader().isWillFlag()) { return null; } // [MQTT-3.1.2-12]
173 
174 		return new Message(-1, conn.payload().willTopic(), clientId,
175 				conn.payload().willMessage().getBytes(CharsetUtil.UTF_8),
176 				MqttQoS.valueOf(conn.variableHeader().willQos()), conn.variableHeader().isWillRetain());
177 	}
178 
179 	private boolean filterPlugins(ChannelHandlerContext ctx, MqttConnectMessage msg) {
180 		String clientId = msg.payload().clientIdentifier();
181 		String userName = msg.variableHeader().hasUserName() ? msg.payload().userName() : null;
182 		String password = msg.variableHeader().hasPassword() ? msg.payload().password() : null;
183 
184 		if (!Plugins.INSTANCE.get(ServiceChecker.class).isServiceAvailable()) {
185 			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
186 			return false;
187 		}
188 
189 		if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId)) {
190 			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); // [MQTT-3.1.3-9]
191 			return false;
192 		}
193 
194 		if (!Plugins.INSTANCE.get(Authenticator.class).isValid(clientId, userName, password)) {
195 			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
196 			return false;
197 		}
198 
199 		if (!Plugins.INSTANCE.get(Authorizer.class).isAuthorized(clientId, userName)) {
200 			sendNoneAcceptMessage(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
201 			return false;
202 		}
203 
204 		return true;
205 	}
206 
207 	private void sendNoneAcceptMessage(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) {
208 		assert returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED;
209 
210 		MqttConnAckMessage msg = MqttMessageFactory.connack(returnCode, false); // [MQTT-3.2.2-4]
211 
212 		ctx.channel().writeAndFlush(msg).addListener(f -> {
213 			Plugins.INSTANCE.get(ConnectEventListener.class).connectHandled(new ConnectEventArgs() {
214 				@Override
215 				public String clientId() {
216 					return null;
217 				}
218 
219 				@Override
220 				public IMessage will() {
221 					return null;
222 				}
223 
224 				@Override
225 				public Boolean cleanSession() {
226 					return null;
227 				}
228 
229 				@Override
230 				public MqttConnectReturnCode returnCode() {
231 					return returnCode;
232 				}
233 			});
234 
235 			ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs -> // [MQTT-3.2.2-5],[MQTT-3.1.4-5]
236 			Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
237 		});
238 	}
239 }