MqttServer.java
- /*
- * Copyright 2016 The Lannister Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package net.anyflow.lannister.server;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.buffer.PooledByteBufAllocator;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.ServerChannel;
- import io.netty.channel.WriteBufferWaterMark;
- import io.netty.channel.epoll.EpollServerSocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import net.anyflow.lannister.Literals;
- import net.anyflow.lannister.Settings;
- public class MqttServer {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttServer.class);
- private final EventLoopGroup bossGroup;
- private final EventLoopGroup workerGroup;
- public MqttServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
- this.bossGroup = bossGroup;
- this.workerGroup = workerGroup;
- }
- public void start() throws Exception {
- if (Settings.INSTANCE.mqttPort() == null && Settings.INSTANCE.mqttsPort() == null
- && Settings.INSTANCE.websocketPort() == null && Settings.INSTANCE.websocketSslPort() == null) {
- logger.info("No MQTT port(s) arranged");
- throw new Exception("No MQTT port(s) arranged");
- }
- ScheduledExecutor scheduledExecutor = new ScheduledExecutor();
- if (Settings.INSTANCE.mqttPort() != null) {
- executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttPort(), false, false);
- scheduledExecutor = null;
- }
- if (Settings.INSTANCE.mqttsPort() != null) {
- executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttsPort(), false, true);
- scheduledExecutor = null;
- }
- if (Settings.INSTANCE.websocketPort() != null) {
- executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketPort(), true, false);
- scheduledExecutor = null;
- }
- if (Settings.INSTANCE.websocketSslPort() != null) {
- executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketSslPort(), true, true);
- scheduledExecutor = null;
- }
- logger.info(
- "Lannister MQTT server started [tcp.port={}, tcp.ssl.port={}, websocket.port={}, websocket.ssl.port={}]",
- Settings.INSTANCE.mqttPort(), Settings.INSTANCE.mqttsPort(), Settings.INSTANCE.websocketPort(),
- Settings.INSTANCE.websocketSslPort());
- }
- private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
- throws InterruptedException {
- ServerBootstrap bootstrap = new ServerBootstrap();
- Class<? extends ServerChannel> serverChannelClass;
- if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
- serverChannelClass = EpollServerSocketChannel.class;
- }
- else {
- serverChannelClass = NioServerSocketChannel.class;
- }
- bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
- if (scheduledExecutor != null) {
- bootstrap.handler(scheduledExecutor);
- }
- bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));
- bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
- // setting buffer size can improve I/O
- .childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
- // recommended in
- // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
- .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
- .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- bootstrap.bind(port).sync();
- }
- }