View Javadoc
1   /*
2    * Copyright 2016 The Lannister Project
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package net.anyflow.lannister.server;
18  
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.PooledByteBufAllocator;
21  import io.netty.channel.ChannelOption;
22  import io.netty.channel.EventLoopGroup;
23  import io.netty.channel.ServerChannel;
24  import io.netty.channel.WriteBufferWaterMark;
25  import io.netty.channel.epoll.EpollServerSocketChannel;
26  import io.netty.channel.socket.nio.NioServerSocketChannel;
27  import net.anyflow.lannister.Literals;
28  import net.anyflow.lannister.Settings;
29  
30  public class MqttServer {
31  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttServer.class);
32  
33  	private final EventLoopGroup bossGroup;
34  	private final EventLoopGroup workerGroup;
35  
36  	public MqttServer(EventLoopGroup bossGroup, EventLoopGroup workerGroup) {
37  		this.bossGroup = bossGroup;
38  		this.workerGroup = workerGroup;
39  	}
40  
41  	public void start() throws Exception {
42  		if (Settings.INSTANCE.mqttPort() == null && Settings.INSTANCE.mqttsPort() == null
43  				&& Settings.INSTANCE.websocketPort() == null && Settings.INSTANCE.websocketSslPort() == null) {
44  			logger.info("No MQTT port(s) arranged");
45  			throw new Exception("No MQTT port(s) arranged");
46  		}
47  
48  		ScheduledExecutor scheduledExecutor = new ScheduledExecutor();
49  
50  		if (Settings.INSTANCE.mqttPort() != null) {
51  			executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttPort(), false, false);
52  			scheduledExecutor = null;
53  		}
54  		if (Settings.INSTANCE.mqttsPort() != null) {
55  			executeBootstrap(scheduledExecutor, Settings.INSTANCE.mqttsPort(), false, true);
56  			scheduledExecutor = null;
57  		}
58  		if (Settings.INSTANCE.websocketPort() != null) {
59  			executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketPort(), true, false);
60  			scheduledExecutor = null;
61  		}
62  		if (Settings.INSTANCE.websocketSslPort() != null) {
63  			executeBootstrap(scheduledExecutor, Settings.INSTANCE.websocketSslPort(), true, true);
64  			scheduledExecutor = null;
65  		}
66  
67  		logger.info(
68  				"Lannister MQTT server started [tcp.port={}, tcp.ssl.port={}, websocket.port={}, websocket.ssl.port={}]",
69  				Settings.INSTANCE.mqttPort(), Settings.INSTANCE.mqttsPort(), Settings.INSTANCE.websocketPort(),
70  				Settings.INSTANCE.websocketSslPort());
71  	}
72  
73  	private void executeBootstrap(ScheduledExecutor scheduledExecutor, int port, boolean useWebSocket, boolean useSsl)
74  			throws InterruptedException {
75  		ServerBootstrap bootstrap = new ServerBootstrap();
76  
77  		Class<? extends ServerChannel> serverChannelClass;
78  
79  		if (Literals.NETTY_EPOLL.equals(Settings.INSTANCE.nettyTransportMode())) {
80  			serverChannelClass = EpollServerSocketChannel.class;
81  		}
82  		else {
83  			serverChannelClass = NioServerSocketChannel.class;
84  		}
85  
86  		bootstrap = bootstrap.group(bossGroup, workerGroup).channel(serverChannelClass);
87  		bootstrap.option(ChannelOption.TCP_NODELAY, true);
88  
89  		if (scheduledExecutor != null) {
90  			bootstrap.handler(scheduledExecutor);
91  		}
92  
93  		bootstrap.childHandler(new MqttChannelInitializer(useWebSocket, useSsl));
94  
95  		bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
96  				// setting buffer size can improve I/O
97  				.childOption(ChannelOption.SO_SNDBUF, 1048576).childOption(ChannelOption.SO_RCVBUF, 1048576)
98  				// recommended in
99  				// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#11.0
100 				.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
101 				.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
102 
103 		bootstrap.bind(port).sync();
104 	}
105 }