/* * Copyright 2009 Red Hat, Inc. * Red Hat licenses this file to you under the Apache License, version * 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing * permissions and limitations under the License. */ package org.hornetq.core.remoting.impl.netty; import static org.hornetq.utils.Base64.encodeBytes; import java.io.IOException; import java.net.ConnectException; import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.security.AccessController; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedAction; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.base64.Base64; import io.netty.handler.codec.http.ClientCookieEncoder; import io.netty.handler.codec.http.Cookie; import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequestEncoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.core.client.HornetQClientLogger; import org.hornetq.core.client.HornetQClientMessageBundle; import org.hornetq.core.client.impl.ClientSessionFactoryImpl; import org.hornetq.core.remoting.impl.ssl.SSLSupport; import org.hornetq.core.server.HornetQComponent; import org.hornetq.spi.core.remoting.AbstractConnector; import org.hornetq.spi.core.remoting.BufferHandler; import org.hornetq.spi.core.remoting.Connection; import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener; import org.hornetq.utils.ConfigurationHelper; import org.hornetq.utils.FutureLatch; import org.hornetq.utils.HornetQThreadFactory; /** * A NettyConnector * * @author Tim Fox * @author Trustin Lee * @author Norman Maurer */ public class NettyConnector extends AbstractConnector { // Constants ----------------------------------------------------- public static final String JAVAX_KEYSTORE_PATH_PROP_NAME = "javax.net.ssl.keyStore"; public static final String JAVAX_KEYSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.keyStorePassword"; public static final String JAVAX_TRUSTSTORE_PATH_PROP_NAME = "javax.net.ssl.trustStore"; public static final String JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME = "javax.net.ssl.trustStorePassword"; public static final String HORNETQ_KEYSTORE_PATH_PROP_NAME = "org.hornetq.ssl.keyStore"; public static final String HORNETQ_KEYSTORE_PASSWORD_PROP_NAME = "org.hornetq.ssl.keyStorePassword"; public static final String HORNETQ_TRUSTSTORE_PATH_PROP_NAME = "org.hornetq.ssl.trustStore"; public static final String HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME = "org.hornetq.ssl.trustStorePassword"; // Constants for HTTP upgrade // These constants are exposed publicly as they are used on the server-side to fetch // headers from the HTTP request, compute some values and fill the HTTP response public static final String MAGIC_NUMBER = "CF70DEB8-70F9-4FBA-8B4F-DFC3E723B4CD"; public static final String SEC_HORNETQ_REMOTING_KEY = "Sec-HornetQRemoting-Key"; public static final String SEC_HORNETQ_REMOTING_ACCEPT= "Sec-HornetQRemoting-Accept"; public static final String HORNETQ_REMOTING = "hornetq-remoting"; private static final AttributeKey REMOTING_KEY = AttributeKey.valueOf(SEC_HORNETQ_REMOTING_KEY); static { // Disable resource leak detection for performance reasons by default ResourceLeakDetector.setEnabled(false); } // Attributes ---------------------------------------------------- private Class channelClazz; private Bootstrap bootstrap; private ChannelGroup channelGroup; private final BufferHandler handler; private final ConnectionLifeCycleListener listener; private final boolean sslEnabled; private final boolean httpEnabled; private final long httpMaxClientIdleTime; private final long httpClientIdleScanPeriod; private final boolean httpRequiresSessionId; // if true, after the connection, the connector will send // a HTTP GET request (+ Upgrade: hornetq-remoting) that // will be handled by the server's http server. private final boolean httpUpgradeEnabled; private final boolean useServlet; private final String host; private final int port; private final String localAddress; private final int localPort; private final String keyStorePath; private final String keyStorePassword; private final String trustStorePath; private final String trustStorePassword; private final String enabledCipherSuites; private final String enabledProtocols; private final boolean tcpNoDelay; private final int tcpSendBufferSize; private final int tcpReceiveBufferSize; private final long batchDelay; private final ConcurrentMap connections = new ConcurrentHashMap(); private final String servletPath; private final int nioRemotingThreads; private final boolean useNioGlobalWorkerPool; private final ScheduledExecutorService scheduledThreadPool; private final Executor closeExecutor; private BatchFlusher flusher; private ScheduledFuture batchFlusherFuture; private static EventLoopGroup nioEventLoopGroup; private EventLoopGroup group; private static final Object nioWorkerPoolGuard = new Object(); private static final AtomicInteger nioChannelFactoryCount = new AtomicInteger(0); private int connectTimeoutMillis; // Static -------------------------------------------------------- // Constructors -------------------------------------------------- // Public -------------------------------------------------------- public NettyConnector(final Map configuration, final BufferHandler handler, final ConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool) { super(configuration); if (listener == null) { throw HornetQClientMessageBundle.BUNDLE.nullListener(); } if (handler == null) { throw HornetQClientMessageBundle.BUNDLE.nullHandler(); } this.listener = listener; this.handler = handler; sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration); httpEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration); servletPath = ConfigurationHelper.getStringProperty(TransportConstants.SERVLET_PATH, TransportConstants.DEFAULT_SERVLET_PATH, configuration); if (httpEnabled) { httpMaxClientIdleTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, TransportConstants.DEFAULT_HTTP_CLIENT_IDLE_TIME, configuration); httpClientIdleScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, TransportConstants.DEFAULT_HTTP_CLIENT_SCAN_PERIOD, configuration); httpRequiresSessionId = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_REQUIRES_SESSION_ID, TransportConstants.DEFAULT_HTTP_REQUIRES_SESSION_ID, configuration); } else { httpMaxClientIdleTime = 0; httpClientIdleScanPeriod = -1; httpRequiresSessionId = false; } httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration); nioRemotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration); useNioGlobalWorkerPool = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME, TransportConstants.DEFAULT_USE_NIO_GLOBAL_WORKER_POOL, configuration); useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration); host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration); localAddress = ConfigurationHelper.getStringProperty(TransportConstants.LOCAL_ADDRESS_PROP_NAME, TransportConstants.DEFAULT_LOCAL_ADDRESS, configuration); localPort = ConfigurationHelper.getIntProperty(TransportConstants.LOCAL_PORT_PROP_NAME, TransportConstants.DEFAULT_LOCAL_PORT, configuration); if (sslEnabled) { keyStorePath = ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration); keyStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration, HornetQDefaultConfiguration.getPropMaskPassword(), HornetQDefaultConfiguration.getPropMaskPassword()); trustStorePath = ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration); trustStorePassword = ConfigurationHelper.getPasswordProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration, HornetQDefaultConfiguration.getPropMaskPassword(), HornetQDefaultConfiguration.getPropMaskPassword()); enabledCipherSuites = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME, TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES, configuration); enabledProtocols = ConfigurationHelper.getStringProperty(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME, TransportConstants.DEFAULT_ENABLED_PROTOCOLS, configuration); } else { keyStorePath = TransportConstants.DEFAULT_KEYSTORE_PATH; keyStorePassword = TransportConstants.DEFAULT_KEYSTORE_PASSWORD; trustStorePath = TransportConstants.DEFAULT_TRUSTSTORE_PATH; trustStorePassword = TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD; enabledCipherSuites = TransportConstants.DEFAULT_ENABLED_CIPHER_SUITES; enabledProtocols = TransportConstants.DEFAULT_ENABLED_PROTOCOLS; } tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration); tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration); tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration); batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration); connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT, TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, configuration); this.closeExecutor = closeExecutor; this.scheduledThreadPool = scheduledThreadPool; } @Override public String toString() { return "NettyConnector [host=" + host + ", port=" + port + ", httpEnabled=" + httpEnabled + ", httpUpgradeEnabled=" + httpUpgradeEnabled + ", useServlet=" + useServlet + ", servletPath=" + servletPath + ", sslEnabled=" + sslEnabled + ", useNio=" + true + "]"; } public synchronized void start() { if (channelClazz != null) { return; } int threadsToUse; if (nioRemotingThreads == -1) { // Default to number of cores * 3 threadsToUse = Runtime.getRuntime().availableProcessors() * 3; } else { threadsToUse = this.nioRemotingThreads; } if(useNioGlobalWorkerPool) { synchronized (nioWorkerPoolGuard) { if (nioEventLoopGroup == null) { nioEventLoopGroup = new NioEventLoopGroup(threadsToUse, new HornetQThreadFactory("HornetQ-client-netty-threads", true, getThisClassLoader())); } channelClazz = NioSocketChannel.class; group = nioEventLoopGroup; nioChannelFactoryCount.incrementAndGet(); } } else { channelClazz = NioSocketChannel.class; group = new NioEventLoopGroup(threadsToUse); } // if we are a servlet wrap the socketChannelFactory if (useServlet) { // TODO: This will be replaced by allow upgrade HTTP connection from Undertow.; } bootstrap = new Bootstrap(); bootstrap.channel(channelClazz); bootstrap.group(group); bootstrap.option(ChannelOption.TCP_NODELAY, tcpNoDelay); if (connectTimeoutMillis != -1) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis); } if (tcpReceiveBufferSize != -1) { bootstrap.option(ChannelOption.SO_RCVBUF, tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize); } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false)); channelGroup = new DefaultChannelGroup("hornetq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; if (sslEnabled) { try { // HORNETQ-680 - override the server-side config if client-side system properties are set String realKeyStorePath = keyStorePath; String realKeyStorePassword = keyStorePassword; if (System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME) != null) { realKeyStorePath = System.getProperty(JAVAX_KEYSTORE_PATH_PROP_NAME); } if (System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME) != null) { realKeyStorePassword = System.getProperty(JAVAX_KEYSTORE_PASSWORD_PROP_NAME); } if (System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME) != null) { realKeyStorePath = System.getProperty(HORNETQ_KEYSTORE_PATH_PROP_NAME); } if (System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME) != null) { realKeyStorePassword = System.getProperty(HORNETQ_KEYSTORE_PASSWORD_PROP_NAME); } String realTrustStorePath = trustStorePath; String realTrustStorePassword = trustStorePassword; if (System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME) != null) { realTrustStorePath = System.getProperty(JAVAX_TRUSTSTORE_PATH_PROP_NAME); } if (System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME) != null) { realTrustStorePassword = System.getProperty(JAVAX_TRUSTSTORE_PASSWORD_PROP_NAME); } if (System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME) != null) { realTrustStorePath = System.getProperty(HORNETQ_TRUSTSTORE_PATH_PROP_NAME); } if (System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME) != null) { realTrustStorePassword = System.getProperty(HORNETQ_TRUSTSTORE_PASSWORD_PROP_NAME); } context = SSLSupport.createContext(realKeyStorePath, realKeyStorePassword, realTrustStorePath, realTrustStorePassword); } catch (Exception e) { close(); IllegalStateException ise = new IllegalStateException("Unable to create NettyConnector for " + host + ":" + port); ise.initCause(e); throw ise; } } else { context = null; // Unused } if (context != null && useServlet) { // TODO: Fix me //bootstrap.setOption("sslContext", context); } bootstrap.handler(new ChannelInitializer() { public void initChannel(Channel channel) throws Exception { final ChannelPipeline pipeline = channel.pipeline(); if (sslEnabled && !useServlet) { SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(true); engine.setWantClientAuth(true); // setting the enabled cipher suites resets the enabled protocols so we need // to save the enabled protocols so that after the customer cipher suite is enabled // we can reset the enabled protocols if a customer protocol isn't specified String[] originalProtocols = engine.getEnabledProtocols(); if (enabledCipherSuites != null) { try { engine.setEnabledCipherSuites(SSLSupport.parseCommaSeparatedListIntoArray(enabledCipherSuites)); } catch (IllegalArgumentException e) { HornetQClientLogger.LOGGER.invalidCipherSuite(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedCipherSuites())); throw e; } } if (enabledProtocols != null) { try { engine.setEnabledProtocols(SSLSupport.parseCommaSeparatedListIntoArray(enabledProtocols)); } catch (IllegalArgumentException e) { HornetQClientLogger.LOGGER.invalidProtocol(SSLSupport.parseArrayIntoCommandSeparatedList(engine.getSupportedProtocols())); throw e; } } else { engine.setEnabledProtocols(originalProtocols); } SslHandler handler = new SslHandler(engine); pipeline.addLast(handler); } if (httpEnabled) { pipeline.addLast(new HttpRequestEncoder()); pipeline.addLast(new HttpResponseDecoder()); pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); pipeline.addLast(new HttpHandler()); } if (httpUpgradeEnabled) { // prepare to handle a HTTP 101 response to upgrade the protocol. final HttpClientCodec httpClientCodec = new HttpClientCodec(); pipeline.addLast(httpClientCodec); pipeline.addLast("http-upgrade", new HttpUpgradeHandler(pipeline, httpClientCodec)); } pipeline.addLast(new HornetQFrameDecoder2()); pipeline.addLast(new HornetQClientChannelHandler(channelGroup, handler, new Listener())); } }); if (batchDelay > 0) { flusher = new BatchFlusher(); batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS); } HornetQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION); } public synchronized void close() { if (channelClazz == null) { return; } if (batchFlusherFuture != null) { batchFlusherFuture.cancel(false); flusher.cancel(); flusher = null; batchFlusherFuture = null; } bootstrap = null; channelGroup.close().awaitUninterruptibly(); // if using shared pools only release them once there are no references if(useNioGlobalWorkerPool) { synchronized (nioWorkerPoolGuard) { if(nioChannelFactoryCount.decrementAndGet() == 0) { closePools(); } } } else { group.shutdown(); } channelClazz = null; for (Connection connection : connections.values()) { listener.connectionDestroyed(connection.getID()); } connections.clear(); } public boolean isStarted() { return channelClazz != null; } public Connection createConnection() { if (channelClazz == null) { return null; } // HORNETQ-907 - strip off IPv6 scope-id (if necessary) SocketAddress remoteDestination = new InetSocketAddress(host, port); InetAddress inetAddress = ((InetSocketAddress) remoteDestination).getAddress(); if (inetAddress instanceof Inet6Address) { Inet6Address inet6Address = (Inet6Address) inetAddress; if (inet6Address.getScopeId() != 0) { try { remoteDestination = new InetSocketAddress(InetAddress.getByAddress(inet6Address.getAddress()), ((InetSocketAddress) remoteDestination).getPort()); } catch (UnknownHostException e) { throw new IllegalArgumentException(e.getMessage()); } } } HornetQClientLogger.LOGGER.debug("Remote destination: " + remoteDestination); ChannelFuture future; //port 0 does not work so only use local address if set if(localPort != 0) { SocketAddress localDestination; if(localAddress != null) { localDestination = new InetSocketAddress(localAddress, localPort); } else { localDestination = new InetSocketAddress(localPort); } future = bootstrap.connect(remoteDestination, localDestination); } else { future = bootstrap.connect(remoteDestination); } future.awaitUninterruptibly(); if (future.isSuccess()) { final Channel ch = future.channel(); SslHandler sslHandler = ch.pipeline().get(SslHandler.class); if (sslHandler != null) { Future handshakeFuture = sslHandler.handshakeFuture(); if (handshakeFuture.awaitUninterruptibly(30000)) { if (!handshakeFuture.isSuccess()) { ch.close().awaitUninterruptibly(); HornetQClientLogger.LOGGER.errorCreatingNettyConnection(handshakeFuture.cause()); return null; } } else { //handshakeFuture.setFailure(new SSLException("Handshake was not completed in 30 seconds")); ch.close().awaitUninterruptibly(); return null; } } if (httpUpgradeEnabled) { // Send a HTTP GET + Upgrade request that will be handled by the http-upgrade handler. try { //get this first incase it removes itself HttpUpgradeHandler httpUpgradeHandler = (HttpUpgradeHandler) ch.pipeline().get("http-upgrade"); URI uri = new URI("http", null, host, port, null, null, null); HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); request.headers().set(HttpHeaders.Names.HOST, host); request.headers().set(HttpHeaders.Names.UPGRADE, HORNETQ_REMOTING); request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE); // Get 16 bit nonce and base 64 encode it byte[] nonce = randomBytes(16); String key = base64(nonce); request.headers().set(SEC_HORNETQ_REMOTING_KEY, key); ch.attr(REMOTING_KEY).set(key); HornetQClientLogger.LOGGER.debugf("Sending HTTP request %s", request); // Send the HTTP request. ch.writeAndFlush(request); if(!httpUpgradeHandler.awaitHandshake()) { return null; } } catch (URISyntaxException e) { HornetQClientLogger.LOGGER.errorCreatingNettyConnection(e); return null; } } else { ch.pipeline().get(HornetQChannelHandler.class).active = true; } // No acceptor on a client connection Listener connectionListener = new Listener(); NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false); connectionListener.connectionCreated(null, conn, HornetQClient.DEFAULT_CORE_PROTOCOL); return conn; } else { Throwable t = future.cause(); if (t != null && !(t instanceof ConnectException)) { HornetQClientLogger.LOGGER.errorCreatingNettyConnection(future.cause()); } return null; } } // Public -------------------------------------------------------- // Package protected --------------------------------------------- // Protected ----------------------------------------------------- // Private ------------------------------------------------------- // Inner classes ------------------------------------------------- private static final class HornetQClientChannelHandler extends HornetQChannelHandler { HornetQClientChannelHandler(final ChannelGroup group, final BufferHandler handler, final ConnectionLifeCycleListener listener) { super(group, handler, listener); } } private static class HttpUpgradeHandler extends SimpleChannelInboundHandler { private final ChannelPipeline pipeline; private final HttpClientCodec httpClientCodec; private final CountDownLatch latch = new CountDownLatch(1); private boolean handshakeComplete = false; public HttpUpgradeHandler(ChannelPipeline pipeline, HttpClientCodec httpClientCodec) { this.pipeline = pipeline; this.httpClientCodec = httpClientCodec; } @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(HORNETQ_REMOTING)) { String accept = response.headers().get(SEC_HORNETQ_REMOTING_ACCEPT); String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); if (expectedResponse.equals(accept)) { // remove the http handlers and flag the hornetq channel handler as active pipeline.remove(httpClientCodec); pipeline.remove(this); handshakeComplete = true; pipeline.get(HornetQChannelHandler.class).active = true; } else { HornetQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse); ctx.close(); } } latch.countDown(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { HornetQClientLogger.LOGGER.errorCreatingNettyConnection(cause); ctx.close(); } public boolean awaitHandshake() { try { if(!latch.await(30000, TimeUnit.MILLISECONDS)) { return false; } } catch (InterruptedException e) { return false; } return handshakeComplete; } } class HttpHandler extends ChannelDuplexHandler { private Channel channel; private long lastSendTime = 0; private boolean waitingGet = false; private HttpIdleTimer task; private final String url; private final FutureLatch handShakeFuture = new FutureLatch(); private boolean active = false; private boolean handshaking = false; private String cookie; public HttpHandler() throws Exception { url = new URI("http", null, host, port, servletPath, null, null).toString(); } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); channel = ctx.channel(); if (httpClientIdleScanPeriod > 0) { task = new HttpIdleTimer(); java.util.concurrent.Future future = scheduledThreadPool.scheduleAtFixedRate(task, httpClientIdleScanPeriod, httpClientIdleScanPeriod, TimeUnit.MILLISECONDS); task.setFuture(future); } } @Override public void channelInactive(final ChannelHandlerContext ctx) throws Exception { if (task != null) { task.close(); } super.channelInactive(ctx); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { FullHttpResponse response = (FullHttpResponse) msg; if (httpRequiresSessionId && !active) { Set cookieMap = CookieDecoder.decode(response.headers().get(HttpHeaders.Names.SET_COOKIE)); for (Cookie cookie : cookieMap) { if (cookie.getName().equals("JSESSIONID")) { this.cookie = ClientCookieEncoder.encode(cookie); } } active = true; handShakeFuture.run(); } waitingGet = false; ctx.fireChannelRead(response.content()); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { if (httpRequiresSessionId && !active) { if (handshaking) { handshaking = true; } else { if (!handShakeFuture.await(5000)) { throw new RuntimeException("Handshake failed after timeout"); } } } ByteBuf buf = (ByteBuf) msg; FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf); httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); if (cookie != null) { httpRequest.headers().add(HttpHeaders.Names.COOKIE, cookie); } httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes())); ctx.write(httpRequest, promise); lastSendTime = System.currentTimeMillis(); } else { ctx.write(msg, promise); lastSendTime = System.currentTimeMillis(); } } private class HttpIdleTimer implements Runnable { private boolean closed = false; private java.util.concurrent.Future future; public synchronized void run() { if (closed) { return; } if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) { FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); waitingGet = true; channel.writeAndFlush(httpRequest); } } public synchronized void setFuture(final java.util.concurrent.Future future) { this.future = future; } public void close() { if (future != null) { future.cancel(false); } closed = true; } } } private class Listener implements ConnectionLifeCycleListener { public void connectionCreated(final HornetQComponent component, final Connection connection, final String protocol) { if (connections.putIfAbsent(connection.getID(), connection) != null) { throw HornetQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); } String handshake = "HORNETQ"; HornetQBuffer buffer = connection.createBuffer(handshake.length()); buffer.writeBytes(handshake.getBytes()); connection.write(buffer); } public void connectionDestroyed(final Object connectionID) { if (connections.remove(connectionID) != null) { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { public void run() { listener.connectionDestroyed(connectionID); } }); } } public void connectionException(final Object connectionID, final HornetQException me) { // Execute on different thread to avoid deadlocks closeExecutor.execute(new Runnable() { public void run() { listener.connectionException(connectionID, me); } }); } public void connectionReadyForWrites(Object connectionID, boolean ready) { } } private class BatchFlusher implements Runnable { private boolean cancelled; public synchronized void run() { if (!cancelled) { for (Connection connection : connections.values()) { connection.checkFlushBatchBuffer(); } } } public synchronized void cancel() { cancelled = true; } } public boolean isEquivalent(Map configuration) { //here we only check host and port because these two parameters //is sufficient to determine the target host String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration); Integer port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration); if (!port.equals(this.port)) return false; if (host.equals(this.host)) return true; //The host may be an alias. We need to compare raw IP address. boolean result = false; try { InetAddress inetAddr1 = InetAddress.getByName(host); InetAddress inetAddr2 = InetAddress.getByName(this.host); String ip1 = inetAddr1.getHostAddress(); String ip2 = inetAddr2.getHostAddress(); HornetQClientLogger.LOGGER.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2); result = ip1.equals(ip2); } catch (UnknownHostException e) { HornetQClientLogger.LOGGER.error("Cannot resolve host", e); } return result; } private void closePools() { if (nioEventLoopGroup != null) { nioEventLoopGroup.shutdown(); nioEventLoopGroup = null; } } public void finalize() throws Throwable { close(); super.finalize(); } //for test purpose only public Bootstrap getBootStrap() { return bootstrap; } public static void clearThreadPools() { if (nioEventLoopGroup != null) { nioEventLoopGroup.shutdown(); nioEventLoopGroup = null; } } private static ClassLoader getThisClassLoader() { return AccessController.doPrivileged(new PrivilegedAction() { public ClassLoader run() { return ClientSessionFactoryImpl.class.getClassLoader(); } }); } private static String base64(byte[] data) { ByteBuf encodedData = Unpooled.wrappedBuffer(data); ByteBuf encoded = Base64.encode(encodedData); String encodedString = encoded.toString(CharsetUtil.UTF_8); encoded.release(); return encodedString; } /** * Creates an arbitrary number of random bytes * * @param size the number of random bytes to create * @return An array of random bytes */ private static byte[] randomBytes(int size) { byte[] bytes = new byte[size]; for (int index = 0; index < size; index++) { bytes[index] = (byte) randomNumber(0, 255); } return bytes; } private static int randomNumber(int minimum, int maximum) { return (int) (Math.random() * maximum + minimum); } public static String createExpectedResponse(final String magicNumber, final String secretKey) throws IOException { try { final String concat = secretKey + magicNumber; final MessageDigest digest = MessageDigest.getInstance("SHA1"); digest.update(concat.getBytes("UTF-8")); final byte[] bytes = digest.digest(); return encodeBytes(bytes); } catch (NoSuchAlgorithmException e) { throw new IOException(e); } } }