View Javadoc
1   package net.anyflow.lannister.server;
2   
3   import io.netty.channel.ChannelInitializer;
4   import io.netty.channel.socket.SocketChannel;
5   import io.netty.handler.codec.http.HttpContentCompressor;
6   import io.netty.handler.codec.http.HttpObjectAggregator;
7   import io.netty.handler.codec.http.HttpServerCodec;
8   import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
9   import io.netty.handler.codec.mqtt.MqttDecoder;
10  import io.netty.handler.codec.mqtt.MqttEncoder;
11  import io.netty.handler.logging.LogLevel;
12  import io.netty.handler.logging.LoggingHandler;
13  import io.netty.handler.ssl.SslContext;
14  import io.netty.handler.ssl.SslContextBuilder;
15  import net.anyflow.lannister.Settings;
16  import net.anyflow.lannister.packetreceiver.ConnectReceiver;
17  import net.anyflow.lannister.packetreceiver.GenericReceiver;
18  import net.anyflow.lannister.packetreceiver.PubAckReceiver;
19  import net.anyflow.lannister.packetreceiver.PublishReceiver;
20  import net.anyflow.lannister.packetreceiver.SubscribeReceiver;
21  import net.anyflow.lannister.packetreceiver.UnsubscribeReceiver;
22  
23  public class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {
24  
25  	private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MqttChannelInitializer.class);
26  
27  	private final boolean useWebSocket;
28  	private final boolean useSsl;
29  
30  	public MqttChannelInitializer(boolean useWebSocket, boolean useSsl) {
31  		this.useWebSocket = useWebSocket;
32  		this.useSsl = useSsl;
33  	}
34  
35  	@Override
36  	protected void initChannel(SocketChannel ch) throws Exception {
37  		logger.debug("Initializaing channels...");
38  
39  		ch.pipeline().addLast(ByteCounterCodec.class.getName(), new ByteCounterCodec());
40  
41  		if ("true".equalsIgnoreCase(Settings.INSTANCE.getProperty("netty.logger"))) {
42  			ch.pipeline().addLast(LoggingHandler.class.getName(), new LoggingHandler(LogLevel.DEBUG));
43  		}
44  
45  		if (useSsl) {
46  			SslContext sslCtx = SslContextBuilder
47  					.forServer(Settings.INSTANCE.certChainFile(), Settings.INSTANCE.privateKeyFile()).build();
48  
49  			logger.debug("SSL Provider : {}", SslContext.defaultServerProvider());
50  
51  			ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()));
52  		}
53  
54  		if (useWebSocket) {
55  			String websocketPath = Settings.INSTANCE.getProperty("mqttserver.websocket.path", "/");
56  
57  			ch.pipeline().addLast(HttpServerCodec.class.getName(), new HttpServerCodec());
58  			ch.pipeline().addLast(HttpObjectAggregator.class.getName(), new HttpObjectAggregator(1048576));
59  			ch.pipeline().addLast(HttpContentCompressor.class.getName(), new HttpContentCompressor());
60  			ch.pipeline().addLast(WebSocketServerProtocolHandler.class.getName(),
61  					new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536)); // [MQTT-6.0.0-3]
62  			ch.pipeline().addLast(new MqttWebSocketCodec());
63  		}
64  
65  		int maxBytesInMessage = Settings.INSTANCE.getInt("mqttserver.maxBytesInMessage", 8092);
66  
67  		ch.pipeline().addLast(MqttDecoder.class.getName(), new MqttDecoder(maxBytesInMessage));
68  		ch.pipeline().addLast(MqttEncoder.class.getName(), MqttEncoder.INSTANCE);
69  
70  		ch.pipeline().addLast(ConnectReceiver.class.getName(), ConnectReceiver.INSTANCE);
71  		ch.pipeline().addLast(PubAckReceiver.class.getName(), PubAckReceiver.INSTANCE);
72  		ch.pipeline().addLast(PublishReceiver.class.getName(), PublishReceiver.INSTANCE);
73  		ch.pipeline().addLast(SubscribeReceiver.class.getName(), SubscribeReceiver.INSTANCE);
74  		ch.pipeline().addLast(UnsubscribeReceiver.class.getName(), UnsubscribeReceiver.INSTANCE);
75  		ch.pipeline().addLast(GenericReceiver.class.getName(), GenericReceiver.INSTANCE);
76  	}
77  }