1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.topic;
18
19 import java.io.IOException;
20
21 import com.fasterxml.jackson.annotation.JsonIgnore;
22 import com.fasterxml.jackson.annotation.JsonProperty;
23 import com.hazelcast.nio.ObjectDataInput;
24 import com.hazelcast.nio.ObjectDataOutput;
25
26 import io.netty.handler.codec.mqtt.MqttQoS;
27 import net.anyflow.lannister.message.Message;
28 import net.anyflow.lannister.message.OutboundMessageStatus;
29 import net.anyflow.lannister.serialization.SerializableFactory;
30 import net.anyflow.lannister.session.Session;
31
32 public class Topic implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
33 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Topic.class);
34
35 public static final Topics NEXUS = new Topics(Session.NEXUS);
36 public static final int ID = 6;
37
38 @JsonProperty
39 private String name;
40 @JsonProperty
41 private Message retainedMessage;
42
43 public Topic() {
44 }
45
46 public Topic(String name) {
47 this.name = name;
48 this.retainedMessage = null;
49 }
50
51 public String name() {
52 return name;
53 }
54
55 public Message retainedMessage() {
56 return retainedMessage;
57 }
58
59 public void setRetainedMessage(Message message) {
60 if (message == null || message.message().length <= 0) {
61 this.retainedMessage = null;
62 }
63 else {
64 this.retainedMessage = message.clone();
65 }
66
67 NEXUS.persist(this);
68 }
69
70 private static MqttQoS adjustQoS(MqttQoS subscriptionQos, MqttQoS publishQos) {
71 return subscriptionQos.value() <= publishQos.value() ? subscriptionQos : publishQos;
72 }
73
74 public void publish(final Message message) {
75 assert message != null;
76 assert name.equals(message.topicName());
77
78 if (message.qos() != MqttQoS.AT_MOST_ONCE) {
79 Message.NEXUS.put(message.key(), message);
80 }
81
82 TopicSubscriber.NEXUS.clientIdsOf(name).forEach(id -> {
83 Session session = Session.NEXUS.get(id);
84 logger.debug("Before publish to [clientId={}, sessionsSize={}]", id, Session.NEXUS.keySet().size());
85 assert session != null;
86
87 publish(session, message);
88 });
89 }
90
91 public void publish(final Session session, final Message message) {
92 assert session != null;
93 assert message != null;
94
95 Message toSend = message.clone();
96
97 TopicSubscription subscription = session.matches(name);
98 assert subscription != null;
99
100 toSend.qos(adjustQoS(subscription.qos(), toSend.qos()));
101
102 if (!OutboundMessageStatus.NEXUS.containsKey(toSend.id(), session.clientId())) {
103 toSend.id(session.nextMessageId());
104
105 if (toSend.qos() != MqttQoS.AT_MOST_ONCE) {
106 OutboundMessageStatus outboundMessageStatus = new OutboundMessageStatus(message.key(),
107 session.clientId(), toSend.id(), toSend.topicName(), OutboundMessageStatus.Status.TO_PUBLISH,
108 toSend.qos());
109
110 OutboundMessageStatus.NEXUS.put(outboundMessageStatus);
111 }
112 }
113
114 NEXUS.notifier().publish(new Notification(session.clientId(), this, toSend));
115 }
116
117 public void publish(Session session, String messageKey) {
118 assert session != null;
119 assert messageKey != null;
120
121 Message message = Message.NEXUS.get(messageKey);
122 assert message != null;
123 assert OutboundMessageStatus.NEXUS.containsKey(message.id(), session.clientId());
124
125 NEXUS.notifier().publish(new Notification(session.clientId(), this, message));
126 }
127
128 @JsonIgnore
129 @Override
130 public int getFactoryId() {
131 return SerializableFactory.ID;
132 }
133
134 @JsonIgnore
135 @Override
136 public int getId() {
137 return ID;
138 }
139
140 @Override
141 public void writeData(ObjectDataOutput out) throws IOException {
142 out.writeUTF(name);
143
144 out.writeBoolean(retainedMessage != null);
145 if (retainedMessage != null) {
146 retainedMessage.writeData(out);
147 }
148 }
149
150 @Override
151 public void readData(ObjectDataInput in) throws IOException {
152 name = in.readUTF();
153
154 if (in.readBoolean()) {
155 retainedMessage = new Message(in);
156 }
157 }
158 }