1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import java.util.Date;
20
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelHandler.Sharable;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.SimpleChannelInboundHandler;
25 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
26 import net.anyflow.lannister.AbnormalDisconnectEventArgs;
27 import net.anyflow.lannister.message.OutboundMessageStatus;
28 import net.anyflow.lannister.plugin.DeliveredEventArgs;
29 import net.anyflow.lannister.plugin.DeliveredEventListener;
30 import net.anyflow.lannister.plugin.DisconnectEventListener;
31 import net.anyflow.lannister.plugin.Plugins;
32 import net.anyflow.lannister.session.Session;
33
34 @Sharable
35 public class PubAckReceiver extends SimpleChannelInboundHandler<MqttPubAckMessage> {
36 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PubAckReceiver.class);
37 public static final PubAckReceiver INSTANCE = new PubAckReceiver();
38
39 @Override
40 protected void channelRead0(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
41 logger.debug("packet incoming [message={}]", msg.toString());
42
43 Session session = Session.NEXUS.get(ctx.channel().id());
44 if (session == null) {
45 logger.error("None exist session message [message={}]", msg.toString());
46
47 ctx.channel().disconnect().addListener(ChannelFutureListener.CLOSE).addListener(fs ->
48 Plugins.INSTANCE.get(DisconnectEventListener.class).disconnected(new AbnormalDisconnectEventArgs()));
49 return;
50 }
51
52 session.setLastIncomingTime(new Date());
53
54 String clientId = session.clientId();
55 int messageId = msg.variableHeader().messageId();
56
57 OutboundMessageStatus status = OutboundMessageStatus.NEXUS.removeByKey(messageId, clientId);
58 if (status == null) {
59 logger.error("PUBACK target does not exist [clientId={}, messageId={}]", clientId, messageId);
60 session.dispose(true);
61 return;
62 }
63
64 ctx.channel().eventLoop()
65 .execute(() -> Plugins.INSTANCE.get(DeliveredEventListener.class).delivered(new DeliveredEventArgs() {
66 @Override
67 public String clientId() {
68 return clientId;
69 }
70
71 @Override
72 public int messageId() {
73 return messageId;
74 }
75 }));
76
77 logger.debug("Outbound message status REMOVED [clientId={}, messageId={}]", clientId, messageId);
78 }
79 }