Topic.java

/*
 * Copyright 2016 The Lannister Project
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.anyflow.lannister.topic;

import java.io.IOException;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;

import io.netty.handler.codec.mqtt.MqttQoS;
import net.anyflow.lannister.message.Message;
import net.anyflow.lannister.message.OutboundMessageStatus;
import net.anyflow.lannister.serialization.SerializableFactory;
import net.anyflow.lannister.session.Session;

public class Topic implements com.hazelcast.nio.serialization.IdentifiedDataSerializable {
	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Topic.class);

	public static final Topics NEXUS = new Topics(Session.NEXUS);
	public static final int ID = 6;

	@JsonProperty
	private String name;
	@JsonProperty
	private Message retainedMessage; // [MQTT-3.1.2.7]

	public Topic() { // just for Serialization
	}

	public Topic(String name) {
		this.name = name;
		this.retainedMessage = null;
	}

	public String name() {
		return name;
	}

	public Message retainedMessage() {
		return retainedMessage;
	}

	public void setRetainedMessage(Message message) {
		if (message == null || message.message().length <= 0) { // [mqtt-3.3.1-10],[MQTT-3.3.1-11]
			this.retainedMessage = null;
		}
		else {
			this.retainedMessage = message.clone();
		}

		NEXUS.persist(this);
	}

	private static MqttQoS adjustQoS(MqttQoS subscriptionQos, MqttQoS publishQos) {
		return subscriptionQos.value() <= publishQos.value() ? subscriptionQos : publishQos;
	}

	public void publish(final Message message) {
		assert message != null;
		assert name.equals(message.topicName());

		if (message.qos() != MqttQoS.AT_MOST_ONCE) {
			Message.NEXUS.put(message.key(), message);
		}

		TopicSubscriber.NEXUS.clientIdsOf(name).forEach(id -> {
			Session session = Session.NEXUS.get(id);
			logger.debug("Before publish to [clientId={}, sessionsSize={}]", id, Session.NEXUS.keySet().size());
			assert session != null;

			publish(session, message);
		});
	}

	public void publish(final Session session, final Message message) {
		assert session != null;
		assert message != null;

		Message toSend = message.clone();

		TopicSubscription subscription = session.matches(name);
		assert subscription != null;

		toSend.qos(adjustQoS(subscription.qos(), toSend.qos()));

		if (!OutboundMessageStatus.NEXUS.containsKey(toSend.id(), session.clientId())) {
			toSend.id(session.nextMessageId()); // [MQTT-2.3.1-2]

			if (toSend.qos() != MqttQoS.AT_MOST_ONCE) {
				OutboundMessageStatus outboundMessageStatus = new OutboundMessageStatus(message.key(),
						session.clientId(), toSend.id(), toSend.topicName(), OutboundMessageStatus.Status.TO_PUBLISH,
						toSend.qos()); // [MQTT-3.1.2-5]

				OutboundMessageStatus.NEXUS.put(outboundMessageStatus);
			}
		}

		NEXUS.notifier().publish(new Notification(session.clientId(), this, toSend));
	}

	public void publish(Session session, String messageKey) {
		assert session != null;
		assert messageKey != null;

		Message message = Message.NEXUS.get(messageKey);
		assert message != null;
		assert OutboundMessageStatus.NEXUS.containsKey(message.id(), session.clientId());

		NEXUS.notifier().publish(new Notification(session.clientId(), this, message));
	}

	@JsonIgnore
	@Override
	public int getFactoryId() {
		return SerializableFactory.ID;
	}

	@JsonIgnore
	@Override
	public int getId() {
		return ID;
	}

	@Override
	public void writeData(ObjectDataOutput out) throws IOException {
		out.writeUTF(name);

		out.writeBoolean(retainedMessage != null);
		if (retainedMessage != null) {
			retainedMessage.writeData(out);
		}
	}

	@Override
	public void readData(ObjectDataInput in) throws IOException {
		name = in.readUTF();

		if (in.readBoolean()) {
			retainedMessage = new Message(in);
		}
	}
}