1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package net.anyflow.lannister.server;
18
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.PooledByteBufAllocator;
21 import io.netty.channel.ChannelOption;
22 import io.netty.channel.EventLoopGroup;
23 import io.netty.channel.ServerChannel;
24 import io.netty.channel.WriteBufferWaterMark;
25 import io.netty.channel.epoll.EpollServerSocketChannel;
26 import io.netty.channel.socket.nio.NioServerSocketChannel;
27 import net.anyflow.lannister.Literals;
28 import net.anyflow.lannister.Settings;
29
30 public class MqttServer {
31 private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttServer.class);
32
33 private final EventLoopGroup bossGroup;
34 private final EventLoopGroup workerGroup;
35
36 public MqttServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
37 this.bossGroup = bossGroup;
38 this.workerGroup = workerGroup;
39 }
40
41 public void start() throws Exception {
42 if (Settings.INSTANCE.mqttPort() == null && Settings.INSTANCE.mqttsPort() == null
43 && Settings.INSTANCE.websocketPort() == null && Settings.INSTANCE.websocketSslPort() == null) {
44 logger.info("No MQTT port(s) arranged");
45 throw new Exception("No MQTT port(s) arranged");
46 }
47
48 ScheduledExecutor scheduledExecutor = new ScheduledExecutor();
49
50 if (Settings.INSTANCE.mqttPort() != null) {
51 executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttPort(), false, false);
52 scheduledExecutor = null;
53 }
54 if (Settings.INSTANCE.mqttsPort() != null) {
55 executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttsPort(), false, true);
56 scheduledExecutor = null;
57 }
58 if (Settings.INSTANCE.websocketPort() != null) {
59 executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketPort(), true, false);
60 scheduledExecutor = null;
61 }
62 if (Settings.INSTANCE.websocketSslPort() != null) {
63 executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketSslPort(), true, true);
64 scheduledExecutor = null;
65 }
66
67 logger.info(
68 "Lannister MQTT server started [tcp.port={}, tcp.ssl.port={}, websocket.port={}, websocket.ssl.port={}]",
69 Settings.INSTANCE.mqttPort(), Settings.INSTANCE.mqttsPort(), Settings.INSTANCE.websocketPort(),
70 Settings.INSTANCE.websocketSslPort());
71 }
72
73 private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
74 throws InterruptedException {
75 ServerBootstrap bootstrap = new ServerBootstrap();
76
77 Class<? extends ServerChannel> serverChannelClass;
78
79 if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
80 serverChannelClass = EpollServerSocketChannel.class;
81 }
82 else {
83 serverChannelClass = NioServerSocketChannel.class;
84 }
85
86 bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
87 bootstrap.option(ChannelOption.TCP_NODELAY, true);
88
89 if (scheduledExecutor != null) {
90 bootstrap.handler(scheduledExecutor);
91 }
92
93 bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));
94
95 bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
96
97 .childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
98
99
100 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
101 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
102
103 bootstrap.bind(port).sync();
104 }
105 }