1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import io.netty.channel.ChannelHandlerContext;
20 import net.anyflow.lannister.message.OutboundMessageStatus;
21 import net.anyflow.lannister.plugin.DeliveredEventArgs;
22 import net.anyflow.lannister.plugin.DeliveredEventListener;
23 import net.anyflow.lannister.plugin.Plugins;
24 import net.anyflow.lannister.session.Session;
25
26 public class PubRecReceiver {
27 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PubRecReceiver.class);
28
29 public static final PubRecReceiver SHARED = new PubRecReceiver();
30
31 private PubRecReceiver() {
32 }
33
34 protected void handle(ChannelHandlerContext ctx, Session session, int messageId) {
35 OutboundMessageStatus status = OutboundMessageStatus.NEXUS.getBy(messageId, session.clientId());
36 if (status == null || status.status() == OutboundMessageStatus.Status.TO_PUBLISH) {
37 logger.error("PUBREC target does not exist or Invalid status [clientId={}, messageId={}]",
38 session.clientId(), messageId);
39 session.dispose(true);
40 return;
41 }
42
43 if (status.status() == OutboundMessageStatus.Status.PUBLISHED) {
44 ctx.channel().eventLoop().execute(() -> {
45 Plugins.INSTANCE.get(DeliveredEventListener.class).delivered(new DeliveredEventArgs() {
46 @Override
47 public String clientId() {
48 return session.clientId();
49 }
50
51 @Override
52 public int messageId() {
53 return messageId;
54 }
55 });
56 });
57 }
58
59 OutboundMessageStatus.NEXUS.update(messageId, session.clientId(), OutboundMessageStatus.Status.PUBRECED);
60
61 session.send(MqttMessageFactory.pubrel(messageId), null);
62 }
63 }