View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.session;
18  
19  import java.util.Date;
20  
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.handler.codec.mqtt.MqttMessage;
24  import io.netty.handler.codec.mqtt.MqttQoS;
25  import io.netty.util.concurrent.Future;
26  import io.netty.util.concurrent.GenericFutureListener;
27  import net.anyflow.lannister.Settings;
28  import net.anyflow.lannister.Statistics;
29  import net.anyflow.lannister.message.InboundMessageStatus;
30  import net.anyflow.lannister.message.Message;
31  import net.anyflow.lannister.message.OutboundMessageStatus;
32  import net.anyflow.lannister.packetreceiver.MqttMessageFactory;
33  import net.anyflow.lannister.topic.Topic;
34  
35  public class MessageSender {
36  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MessageSender.class);
37  
38  	private final static int RESPONSE_TIMEOUT_SECONDS = Settings.INSTANCE.getInt("mqttserver.responseTimeoutSeconds",
39  			60);
40  
41  	private final Session session;
42  
43  	protected MessageSender(Session session) {
44  		this.session = session;
45  	}
46  
47  	protected void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
48  		if (!session.isConnected(true)) {
49  			logger.error("Message is not sent - Channel is inactive or out of the node. [{}]", message);
50  			return;
51  		}
52  
53  		ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
54  
55  		String log = message.toString();
56  		ChannelFuture cf = ctx.writeAndFlush(message).addListener(f -> {
57  			if (f.isSuccess()) {
58  				logger.debug("packet outgoing [{}]", log);
59  			}
60  			else {
61  				logger.error("packet outgoing failed [{}] {}", log, f.cause());
62  			}
63  		});
64  
65  		if (completeListener != null) {
66  			cf.addListener(completeListener);
67  		}
68  	}
69  
70  	protected void sendPublish(Topic topic, Message message) {
71  		logger.debug("event arrived [clientId={}, message={}]", session.clientId(), message);
72  
73  		if (!session.isConnected(true)) { return; }
74  
75  		send(MqttMessageFactory.publish(message, false), f -> { // [MQTT-3.3.1-2]
76  			if (!f.isSuccess()) { return; }
77  
78  			Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
79  
80  			switch (message.qos()) {
81  			case AT_MOST_ONCE:
82  				break;
83  
84  			case AT_LEAST_ONCE:
85  			case EXACTLY_ONCE:
86  				OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
87  						OutboundMessageStatus.Status.PUBLISHED);
88  				break;
89  
90  			default:
91  				logger.error("Invalid QoS [QoS={}, clientId={}, topic={}]", message.qos(), session.clientId(),
92  						message.topicName());
93  				break;
94  			}
95  		});
96  	}
97  
98  	protected void completeRemainedMessages() {
99  		// TODO should be executed in the middle of 'connected' state?
100 
101 		completeOutboundMessageStatuses();
102 		completeInboundMessageStatuses();
103 	}
104 
105 	private void completeInboundMessageStatuses() {
106 		ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
107 		if (ctx == null) { return; }
108 
109 		ctx.executor().submit(() -> {
110 			Date now = new Date();
111 
112 			InboundMessageStatus.NEXUS.keySet().stream().map(key -> InboundMessageStatus.NEXUS.get(key))
113 					.forEach(messageStatus -> {
114 						long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
115 						if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
116 
117 						Message message = Message.NEXUS.get(messageStatus.key());
118 						MqttMessage toSend;
119 						String log;
120 
121 						switch (messageStatus.status()) {
122 						case RECEIVED:
123 						case PUBRECED:
124 							if (message.qos() == MqttQoS.AT_LEAST_ONCE) {
125 								toSend = MqttMessageFactory.puback(message.id());
126 								log = toSend.toString();
127 
128 								send(toSend, f -> { // [MQTT-2.3.1-6]
129 									if (!f.isSuccess()) {
130 										logger.error("packet outgoing failed [{}] {}", log, f.cause());
131 										return;
132 									}
133 
134 									InboundMessageStatus.NEXUS.removeByKey(message.id(), session.clientId());
135 									logger.debug("Inbound message status REMOVED [clientId={}, messageId={}]",
136 											session.clientId(), message.id());
137 								});
138 							}
139 							else {
140 								toSend = MqttMessageFactory.pubrec(message.id());
141 								log = toSend.toString();
142 
143 								send(toSend, f -> {
144 									if (!f.isSuccess()) {
145 										logger.error("packet outgoing failed [{}] {}", log, f.cause());
146 										return;
147 									}
148 
149 									InboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
150 											InboundMessageStatus.Status.PUBRECED);
151 								}); // [MQTT-2.3.1-6]
152 							}
153 							break;
154 
155 						default:
156 							logger.error(
157 									"Invalid Inbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
158 									messageStatus.status(), session.clientId(), message.topicName(), message.id());
159 							break;
160 						}
161 					});
162 		});
163 	}
164 
165 	private void completeOutboundMessageStatuses() {
166 		ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
167 		if (ctx == null) { return; }
168 
169 		ctx.executor().submit(() -> {
170 			Date now = new Date();
171 
172 			OutboundMessageStatus.NEXUS.messageIdsOf(session.clientId()).stream()
173 					.map(messageId -> OutboundMessageStatus.NEXUS.getBy(messageId, session.clientId()))
174 					.forEach(messageStatus -> {
175 						long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
176 						if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
177 
178 						Message message = Message.NEXUS.get(messageStatus.messageKey());
179 						MqttMessage toSend;
180 						String log;
181 
182 						message.qos(messageStatus.qos());
183 						message.id(messageStatus.messageId()); // [MQTT-2.3.1-4]
184 						message.setRetain(false); // [MQTT-3.3.1-9]
185 
186 						switch (messageStatus.status()) {
187 						case TO_PUBLISH:
188 						case PUBLISHED:
189 							toSend = MqttMessageFactory.publish(message,
190 									messageStatus.status() == OutboundMessageStatus.Status.PUBLISHED);
191 							log = toSend.toString();
192 							send(toSend, f -> { // [MQTT-3.3.1-1]
193 								if (!f.isSuccess()) {
194 									logger.error("packet outgoing failed [{}] {}", log, f.cause());
195 									return;
196 								}
197 
198 								OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
199 										OutboundMessageStatus.Status.PUBLISHED);
200 								Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
201 							});
202 							break;
203 
204 						case PUBRECED:
205 							send(MqttMessageFactory.pubrel(message.id()), null);
206 							break;
207 
208 						default:
209 							logger.error(
210 									"Invalid Outbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
211 									messageStatus.status(), session.clientId(), message.topicName(), message.id());
212 							break;
213 						}
214 					});
215 		});
216 	}
217 }