MqttServer.java

/*
 * Copyright 2016 The Lannister Project
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.anyflow.lannister.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import net.anyflow.lannister.Literals;
import net.anyflow.lannister.Settings;

public class MqttServer {
	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttServer.class);

	private final EventLoopGroup bossGroup;
	private final EventLoopGroup workerGroup;

	public MqttServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
		this.bossGroup = bossGroup;
		this.workerGroup = workerGroup;
	}

	public void start() throws Exception {
		if (Settings.INSTANCE.mqttPort() == null && Settings.INSTANCE.mqttsPort() == null
				&& Settings.INSTANCE.websocketPort() == null && Settings.INSTANCE.websocketSslPort() == null) {
			logger.info("No MQTT port(s) arranged");
			throw new Exception("No MQTT port(s) arranged");
		}

		ScheduledExecutor scheduledExecutor = new ScheduledExecutor();

		if (Settings.INSTANCE.mqttPort() != null) {
			executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttPort(), false, false);
			scheduledExecutor = null;
		}
		if (Settings.INSTANCE.mqttsPort() != null) {
			executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttsPort(), false, true);
			scheduledExecutor = null;
		}
		if (Settings.INSTANCE.websocketPort() != null) {
			executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketPort(), true, false);
			scheduledExecutor = null;
		}
		if (Settings.INSTANCE.websocketSslPort() != null) {
			executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketSslPort(), true, true);
			scheduledExecutor = null;
		}

		logger.info(
				"Lannister MQTT server started [tcp.port={}, tcp.ssl.port={}, websocket.port={}, websocket.ssl.port={}]",
				Settings.INSTANCE.mqttPort(), Settings.INSTANCE.mqttsPort(), Settings.INSTANCE.websocketPort(),
				Settings.INSTANCE.websocketSslPort());
	}

	private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
			throws InterruptedException {
		ServerBootstrap bootstrap = new ServerBootstrap();

		Class<? extends ServerChannel> serverChannelClass;

		if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
			serverChannelClass = EpollServerSocketChannel.class;
		}
		else {
			serverChannelClass = NioServerSocketChannel.class;
		}

		bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
		bootstrap.option(ChannelOption.TCP_NODELAY, true);

		if (scheduledExecutor != null) {
			bootstrap.handler(scheduledExecutor);
		}

		bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));

		bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
				// setting buffer size can improve I/O
				.childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
				// recommended in
				// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
				.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
				.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

		bootstrap.bind(port).sync();
	}
}