View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.topic;
18  
19  import java.io.IOException;
20  
21  import com.fasterxml.jackson.annotation.JsonIgnore;
22  import com.fasterxml.jackson.annotation.JsonProperty;
23  import com.hazelcast.nio.ObjectDataInput;
24  import com.hazelcast.nio.ObjectDataOutput;
25  
26  import io.netty.handler.codec.mqtt.MqttQoS;
27  import net.anyflow.lannister.message.Message;
28  import net.anyflow.lannister.message.OutboundMessageStatus;
29  import net.anyflow.lannister.serialization.SerializableFactory;
30  import net.anyflow.lannister.session.Session;
31  
32  public class Topic implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
33  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Topic.class);
34  
35  	public static final Topics NEXUS = new Topics(Session.NEXUS);
36  	public static final int ID = 6;
37  
38  	@JsonProperty
39  	private String name;
40  	@JsonProperty
41  	private Message retainedMessage; // [MQTT-3.1.2.7]
42  
43  	public Topic() { // just for Serialization
44  	}
45  
46  	public Topic(String name) {
47  		this.name = name;
48  		this.retainedMessage = null;
49  	}
50  
51  	public String name() {
52  		return name;
53  	}
54  
55  	public Message retainedMessage() {
56  		return retainedMessage;
57  	}
58  
59  	public void setRetainedMessage(Message message) {
60  		if (message == null || message.message().length <= 0) { // [mqtt-3.3.1-10],[MQTT-3.3.1-11]
61  			this.retainedMessage = null;
62  		}
63  		else {
64  			this.retainedMessage = message.clone();
65  		}
66  
67  		NEXUS.persist(this);
68  	}
69  
70  	private static MqttQoS adjustQoS(MqttQoS subscriptionQos, MqttQoS publishQos) {
71  		return subscriptionQos.value() <= publishQos.value() ? subscriptionQos : publishQos;
72  	}
73  
74  	public void publish(final Message message) {
75  		assert message != null;
76  		assert name.equals(message.topicName());
77  
78  		if (message.qos() != MqttQoS.AT_MOST_ONCE) {
79  			Message.NEXUS.put(message.key(), message);
80  		}
81  
82  		TopicSubscriber.NEXUS.clientIdsOf(name).forEach(id -> {
83  			Session session = Session.NEXUS.get(id);
84  			logger.debug("Before publish to [clientId={}, sessionsSize={}]", id, Session.NEXUS.keySet().size());
85  			assert session != null;
86  
87  			publish(session, message);
88  		});
89  	}
90  
91  	public void publish(final Session session, final Message message) {
92  		assert session != null;
93  		assert message != null;
94  
95  		Message toSend = message.clone();
96  
97  		TopicSubscription subscription = session.matches(name);
98  		assert subscription != null;
99  
100 		toSend.qos(adjustQoS(subscription.qos(), toSend.qos()));
101 
102 		if (!OutboundMessageStatus.NEXUS.containsKey(toSend.id(), session.clientId())) {
103 			toSend.id(session.nextMessageId()); // [MQTT-2.3.1-2]
104 
105 			if (toSend.qos() != MqttQoS.AT_MOST_ONCE) {
106 				OutboundMessageStatus outboundMessageStatus = new OutboundMessageStatus(message.key(),
107 						session.clientId(), toSend.id(), toSend.topicName(), OutboundMessageStatus.Status.TO_PUBLISH,
108 						toSend.qos()); // [MQTT-3.1.2-5]
109 
110 				OutboundMessageStatus.NEXUS.put(outboundMessageStatus);
111 			}
112 		}
113 
114 		NEXUS.notifier().publish(new Notification(session.clientId(), this, toSend));
115 	}
116 
117 	public void publish(Session session, String messageKey) {
118 		assert session != null;
119 		assert messageKey != null;
120 
121 		Message message = Message.NEXUS.get(messageKey);
122 		assert message != null;
123 		assert OutboundMessageStatus.NEXUS.containsKey(message.id(), session.clientId());
124 
125 		NEXUS.notifier().publish(new Notification(session.clientId(), this, message));
126 	}
127 
128 	@JsonIgnore
129 	@Override
130 	public int getFactoryId() {
131 		return SerializableFactory.ID;
132 	}
133 
134 	@JsonIgnore
135 	@Override
136 	public int getId() {
137 		return ID;
138 	}
139 
140 	@Override
141 	public void writeData(ObjectDataOutput out) throws IOException {
142 		out.writeUTF(name);
143 
144 		out.writeBoolean(retainedMessage != null);
145 		if (retainedMessage != null) {
146 			retainedMessage.writeData(out);
147 		}
148 	}
149 
150 	@Override
151 	public void readData(ObjectDataInput in) throws IOException {
152 		name = in.readUTF();
153 
154 		if (in.readBoolean()) {
155 			retainedMessage = new Message(in);
156 		}
157 	}
158 }