1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.packetreceiver;
18
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.mqtt.MqttMessage;
21 import net.anyflow.lannister.message.InboundMessageStatus;
22 import net.anyflow.lannister.session.Session;
23 import net.anyflow.lannister.topic.Topic;
24 import net.anyflow.lannister.topic.Topics.ClientType;
25
26 public class PubRelReceiver {
27 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PubRelReceiver.class);
28
29 public static final PubRelReceiver SHARED = new PubRelReceiver();
30
31 private PubRelReceiver() {
32 }
33
34 protected void handle(ChannelHandlerContext ctx, Session session, int messageId) {
35
36
37
38 Topic topic = Topic.NEXUS.get(session.clientId(), messageId, ClientType.PUBLISHER);
39 if (topic == null) {
40 logger.error("PUBREL target does not exist [clientId={}, messageId={}]", session.clientId(), messageId);
41 session.dispose(true);
42 return;
43 }
44
45 MqttMessage toSend = MqttMessageFactory.pubcomp(messageId);
46 final String log = toSend.toString();
47
48 session.send(toSend, f -> {
49 if (!f.isSuccess()) {
50 logger.error("packet outgoing failed [{}] {}", log, f.cause());
51 return;
52 }
53
54 InboundMessageStatus.NEXUS.removeByKey(messageId, session.clientId());
55 logger.debug("Inbound message status REMOVED [clientId={}, messageId={}]", session.clientId(), messageId);
56 });
57 }
58 }