MqttServer.java

  1. /*
  2.  * Copyright 2016 The Lannister Project
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */

  16. package net.anyflow.lannister.server;

  17. import io.netty.bootstrap.ServerBootstrap;
  18. import io.netty.buffer.PooledByteBufAllocator;
  19. import io.netty.channel.ChannelOption;
  20. import io.netty.channel.EventLoopGroup;
  21. import io.netty.channel.ServerChannel;
  22. import io.netty.channel.WriteBufferWaterMark;
  23. import io.netty.channel.epoll.EpollServerSocketChannel;
  24. import io.netty.channel.socket.nio.NioServerSocketChannel;
  25. import net.anyflow.lannister.Literals;
  26. import net.anyflow.lannister.Settings;

  27. public class MqttServer {
  28.     private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttServer.class);

  29.     private final EventLoopGroup bossGroup;
  30.     private final EventLoopGroup workerGroup;

  31.     public MqttServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
  32.         this.bossGroup = bossGroup;
  33.         this.workerGroup = workerGroup;
  34.     }

  35.     public void start() throws Exception {
  36.         if (Settings.INSTANCE.mqttPort() == null && Settings.INSTANCE.mqttsPort() == null
  37.                 && Settings.INSTANCE.websocketPort() == null && Settings.INSTANCE.websocketSslPort() == null) {
  38.             logger.info("No MQTT port(s) arranged");
  39.             throw new Exception("No MQTT port(s) arranged");
  40.         }

  41.         ScheduledExecutor scheduledExecutor = new ScheduledExecutor();

  42.         if (Settings.INSTANCE.mqttPort() != null) {
  43.             executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttPort(), false, false);
  44.             scheduledExecutor = null;
  45.         }
  46.         if (Settings.INSTANCE.mqttsPort() != null) {
  47.             executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttsPort(), false, true);
  48.             scheduledExecutor = null;
  49.         }
  50.         if (Settings.INSTANCE.websocketPort() != null) {
  51.             executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketPort(), true, false);
  52.             scheduledExecutor = null;
  53.         }
  54.         if (Settings.INSTANCE.websocketSslPort() != null) {
  55.             executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketSslPort(), true, true);
  56.             scheduledExecutor = null;
  57.         }

  58.         logger.info(
  59.                 "Lannister MQTT server started [tcp.port={}, tcp.ssl.port={}, websocket.port={}, websocket.ssl.port={}]",
  60.                 Settings.INSTANCE.mqttPort(), Settings.INSTANCE.mqttsPort(), Settings.INSTANCE.websocketPort(),
  61.                 Settings.INSTANCE.websocketSslPort());
  62.     }

  63.     private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
  64.             throws InterruptedException {
  65.         ServerBootstrap bootstrap = new ServerBootstrap();

  66.         Class<? extends ServerChannel> serverChannelClass;

  67.         if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
  68.             serverChannelClass = EpollServerSocketChannel.class;
  69.         }
  70.         else {
  71.             serverChannelClass = NioServerSocketChannel.class;
  72.         }

  73.         bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
  74.         bootstrap.option(ChannelOption.TCP_NODELAY, true);

  75.         if (scheduledExecutor != null) {
  76.             bootstrap.handler(scheduledExecutor);
  77.         }

  78.         bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));

  79.         bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
  80.                 // setting buffer size can improve I/O
  81.                 .childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
  82.                 // recommended in
  83.                 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
  84.                 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
  85.                 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

  86.         bootstrap.bind(port).sync();
  87.     }
  88. }