1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.session;
18
19 import java.util.Date;
20
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.handler.codec.mqtt.MqttMessage;
24 import io.netty.handler.codec.mqtt.MqttQoS;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.GenericFutureListener;
27 import net.anyflow.lannister.Settings;
28 import net.anyflow.lannister.Statistics;
29 import net.anyflow.lannister.message.InboundMessageStatus;
30 import net.anyflow.lannister.message.Message;
31 import net.anyflow.lannister.message.OutboundMessageStatus;
32 import net.anyflow.lannister.packetreceiver.MqttMessageFactory;
33 import net.anyflow.lannister.topic.Topic;
34
35 public class MessageSender {
36 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MessageSender.class);
37
38 private final static int RESPONSE_TIMEOUT_SECONDS = Settings.INSTANCE.getInt("mqttserver.responseTimeoutSeconds",
39 60);
40
41 private final Session session;
42
43 protected MessageSender(Session session) {
44 this.session = session;
45 }
46
47 protected void send(MqttMessage message, GenericFutureListener<? extends Future<? super Void>> completeListener) {
48 if (!session.isConnected(true)) {
49 logger.error("Message is not sent - Channel is inactive or out of the node. [{}]", message);
50 return;
51 }
52
53 ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
54
55 String log = message.toString();
56 ChannelFuture cf = ctx.writeAndFlush(message).addListener(f -> {
57 if (f.isSuccess()) {
58 logger.debug("packet outgoing [{}]", log);
59 }
60 else {
61 logger.error("packet outgoing failed [{}] {}", log, f.cause());
62 }
63 });
64
65 if (completeListener != null) {
66 cf.addListener(completeListener);
67 }
68 }
69
70 protected void sendPublish(Topic topic, Message message) {
71 logger.debug("event arrived [clientId={}, message={}]", session.clientId(), message);
72
73 if (!session.isConnected(true)) { return; }
74
75 send(MqttMessageFactory.publish(message, false), f -> {
76 if (!f.isSuccess()) { return; }
77
78 Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
79
80 switch (message.qos()) {
81 case AT_MOST_ONCE:
82 break;
83
84 case AT_LEAST_ONCE:
85 case EXACTLY_ONCE:
86 OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
87 OutboundMessageStatus.Status.PUBLISHED);
88 break;
89
90 default:
91 logger.error("Invalid QoS [QoS={}, clientId={}, topic={}]", message.qos(), session.clientId(),
92 message.topicName());
93 break;
94 }
95 });
96 }
97
98 protected void completeRemainedMessages() {
99
100
101 completeOutboundMessageStatuses();
102 completeInboundMessageStatuses();
103 }
104
105 private void completeInboundMessageStatuses() {
106 ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
107 if (ctx == null) { return; }
108
109 ctx.executor().submit(() -> {
110 Date now = new Date();
111
112 InboundMessageStatus.NEXUS.keySet().stream().map(key -> InboundMessageStatus.NEXUS.get(key))
113 .forEach(messageStatus -> {
114 long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
115 if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
116
117 Message message = Message.NEXUS.get(messageStatus.key());
118 MqttMessage toSend;
119 String log;
120
121 switch (messageStatus.status()) {
122 case RECEIVED:
123 case PUBRECED:
124 if (message.qos() == MqttQoS.AT_LEAST_ONCE) {
125 toSend = MqttMessageFactory.puback(message.id());
126 log = toSend.toString();
127
128 send(toSend, f -> {
129 if (!f.isSuccess()) {
130 logger.error("packet outgoing failed [{}] {}", log, f.cause());
131 return;
132 }
133
134 InboundMessageStatus.NEXUS.removeByKey(message.id(), session.clientId());
135 logger.debug("Inbound message status REMOVED [clientId={}, messageId={}]",
136 session.clientId(), message.id());
137 });
138 }
139 else {
140 toSend = MqttMessageFactory.pubrec(message.id());
141 log = toSend.toString();
142
143 send(toSend, f -> {
144 if (!f.isSuccess()) {
145 logger.error("packet outgoing failed [{}] {}", log, f.cause());
146 return;
147 }
148
149 InboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
150 InboundMessageStatus.Status.PUBRECED);
151 });
152 }
153 break;
154
155 default:
156 logger.error(
157 "Invalid Inbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
158 messageStatus.status(), session.clientId(), message.topicName(), message.id());
159 break;
160 }
161 });
162 });
163 }
164
165 private void completeOutboundMessageStatuses() {
166 ChannelHandlerContext ctx = Session.NEXUS.channelHandlerContext(session.clientId());
167 if (ctx == null) { return; }
168
169 ctx.executor().submit(() -> {
170 Date now = new Date();
171
172 OutboundMessageStatus.NEXUS.messageIdsOf(session.clientId()).stream()
173 .map(messageId -> OutboundMessageStatus.NEXUS.getBy(messageId, session.clientId()))
174 .forEach(messageStatus -> {
175 long intervalSeconds = (now.getTime() - messageStatus.updateTime().getTime()) * 1000;
176 if (intervalSeconds < RESPONSE_TIMEOUT_SECONDS) { return; }
177
178 Message message = Message.NEXUS.get(messageStatus.messageKey());
179 MqttMessage toSend;
180 String log;
181
182 message.qos(messageStatus.qos());
183 message.id(messageStatus.messageId());
184 message.setRetain(false);
185
186 switch (messageStatus.status()) {
187 case TO_PUBLISH:
188 case PUBLISHED:
189 toSend = MqttMessageFactory.publish(message,
190 messageStatus.status() == OutboundMessageStatus.Status.PUBLISHED);
191 log = toSend.toString();
192 send(toSend, f -> {
193 if (!f.isSuccess()) {
194 logger.error("packet outgoing failed [{}] {}", log, f.cause());
195 return;
196 }
197
198 OutboundMessageStatus.NEXUS.update(message.id(), session.clientId(),
199 OutboundMessageStatus.Status.PUBLISHED);
200 Statistics.INSTANCE.add(Statistics.Criterion.MESSAGES_PUBLISH_SENT, 1);
201 });
202 break;
203
204 case PUBRECED:
205 send(MqttMessageFactory.pubrel(message.id()), null);
206 break;
207
208 default:
209 logger.error(
210 "Invalid Outbound Message Status [status={}, clientId={}, topic={}, messageId={}]",
211 messageStatus.status(), session.clientId(), message.topicName(), message.id());
212 break;
213 }
214 });
215 });
216 }
217 }