WildFly 11 and later: mdb hangs when calling "activate" on Resource Adapter
dgrittner Apr 5, 2018 11:41 AMWe are using a resource adapter with an inbound connection for opening a port in WildFly, in this case it is the DICOM port 11112.
One peculiarity is that a EJB singleton is injected into the receiving mdb.
But the problem seems more related to load (so maybe a race condition) than any special kind of EJB, because I replaced the singleton with other EJBs and could reproduce the hang.
For reproducing the problem with WildFly 11 and 12 the following steps are executed:
- on a console "telnet localhost 11112", this triggers the resource adapter and calls and initializes the mdb
- on the jboss cli simply activate the resource adapter again "/subsystem=resource-adapters/resource-adapter=TestRar.rar/:activate"
- the resource adapter is shut down and restarted, the mdb is removed (ejbRemove is called)
- repeat step 1: "telnet localhost 11112"
- on WildFly 11 and 12 the application container hangs forever when calling "((AcceptedSocketListener) messageEndpoint).onMessage(socket);" in a class StartupAwaitInterceptor
- on WildFly 10 everything works, the mdb is initialized and called, etc.
I think that the behavior in WildFly 11 and later is not correct, there seems to be a bug.
Here is the code for the mdb, injected singleton, resource adapter, and worker for the resource adapter
example code for the mdb:
package de.sohard.sedi.dicom; import java.io.IOException; import java.net.Socket; import javax.ejb.EJB; import javax.ejb.EJBException; import javax.ejb.MessageDriven; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.inject.Inject; import org.jboss.ejb3.annotation.ResourceAdapter; import de.sohard.sedi.ra.inbound.AcceptedSocketListener; /** * Message driven bean to handle inbound connections * */ @MessageDriven(messageListenerInterface = AcceptedSocketListener.class) @ResourceAdapter("TestRar.rar") public class DicomProtocolListener implements MessageDrivenBean, AcceptedSocketListener { private static final long serialVersionUID = 1L; @EJB TestService testService; @Override public void ejbRemove() throws EJBException { } @Override public void setMessageDrivenContext(MessageDrivenContext arg0) throws EJBException { } /** * */ @Override public void onMessage(Socket socket) { try { testService.test(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } }
example code for the singleton:
package de.sohard.sedi.dicom; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.ejb.LocalBean; import javax.ejb.Singleton; import javax.ejb.Startup; @Singleton @Startup public class TestService { public int a; @PostConstruct public void init() { } @PreDestroy public void destroy() { } public void test() { a = 4711; } }
example code for the resource adapter:
package de.sohard.sedi.ra.inbound; import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.BootstrapContext; import javax.resource.spi.Connector; import javax.resource.spi.ResourceAdapter; import javax.resource.spi.ResourceAdapterInternalException; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.WorkManager; import javax.transaction.xa.XAResource; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; /** * Implements an inbound JCA resource adapter that mediates communication from external DICOM AE devices (SCUs) to the * application-server-internal DICOM AE (SCP). */ @Connector(displayName = "SediDicomRA", vendorName = "SOHARD Software GmbH") public class SediDicomRA implements ResourceAdapter { private static final String RESOURCE_ADAPTER_INET_ADDRESS = "de.sohard.sedi.resourceAdapter.inetAddress"; private static final String RESOURCE_ADAPTER_PORT = "de.sohard.sedi.resourceAdapter.port"; //private static final Logger LOG = LoggerFactory.getLogger(SediDicomRA.class); private WorkManager workManager; private SediDicomWorker serverLoop; private int port = 11112; private String inetAddress = "127.0.0.1"; /** * Paramless constructor */ public SediDicomRA() { } /** * Sets the port the resource adapter should listen to. * * @param port * the port as int */ /** * Starts the server loop when the message-endpoint (MDB) was activated. * * {@inheritDoc} */ public void endpointActivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) throws ResourceException { DicomServerConfiguration configuration = new DicomServerConfiguration(); configuration.setInetAddress(this.inetAddress); configuration.setPort(this.port); serverLoop = new SediDicomWorker(workManager, configuration, messageEndpointFactory); serverLoop.start(); } /** * Stops the server loop when the message endpoint was deactivated. * * {@inheritDoc} */ public void endpointDeactivation(MessageEndpointFactory arg0, ActivationSpec arg1) { serverLoop.stop(); } public XAResource[] getXAResources(ActivationSpec[] arg0) throws ResourceException { return null; } public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException { //LOG.info("[SediDicomRA] start()"); workManager = bootstrapContext.getWorkManager(); } public void stop() { //LOG.info("[SediDicomRA] stop()"); } @Override public boolean equals(Object obj) { return super.equals(obj); } @Override public int hashCode() { return -1; } }
example code for the worker:
package de.sohard.sedi.ra.inbound; ; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.atomic.AtomicBoolean; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import javax.resource.spi.ResourceAdapterInternalException; import javax.resource.spi.endpoint.MessageEndpoint; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.Work; import javax.resource.spi.work.WorkException; import javax.resource.spi.work.WorkManager; /** * Executes work on behalf of the resource adapter. * * Starts a server loop and accepts socket requests. * */ public class SediDicomWorker implements Work { private static final String SECURE_SOCKET_PROTOCOL = "TLSv1.2"; private static final String CERTIFICATE_STORE_ALGORITHM = "PKIX"; //private static final Logger LOG = LoggerFactory.getLogger(SediDicomWorker.class); private WorkManager workManager; private ServerSocket serverSocket; private DicomServerConfiguration configuration; private MessageEndpointFactory messageEndpointFactory; private boolean isTls = false; /** * Creates a SediDicomWorker and safes it's arguments for later use. * * @param workManager * receives the work * @param configuration * contains server configuration for the internal DICOM AE * @param messageEndpointFactory * used to obtain a message endpoint */ public SediDicomWorker(WorkManager workManager, DicomServerConfiguration configuration, MessageEndpointFactory messageEndpointFactory) { this.workManager = workManager; this.configuration = configuration; this.messageEndpointFactory = messageEndpointFactory; } /** * Creates a server socket and binds it to the address specified in the configuration. * * @throws ResourceAdapterInternalException * {@link ResourceAdapterInternalException} * */ public void start() throws ResourceAdapterInternalException { try { isTls = false; InetSocketAddress inetSocketAddress = getInetSocketAddress(); if (!isTls) { serverSocket = new ServerSocket(); } serverSocket.setReuseAddress(true); serverSocket.bind(inetSocketAddress); //LOG.info("Started listening on {} address: " + inetSocketAddress, isTls ? "TLS secured" : "unsecured"); workManager.startWork(this); } catch (IOException | WorkException e) { //LOG.error("Exception while starting resource adapter, rethrowing with ResourceAdapterInternalException"); throw new ResourceAdapterInternalException(e); } } @Override public void release() { try { if (serverSocket != null) { serverSocket.close(); } } catch (IOException e) { throw new RuntimeException(e); } } private AtomicBoolean isRunning = new AtomicBoolean(false); final boolean isRunning() { return isRunning.get(); } final void setRunning(Boolean running) { isRunning.set(running); } @Override public void run() { setRunning(true); try { while (isRunning()) { final Socket socket = serverSocket.accept(); if (socket != null) { MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null); if (messageEndpoint instanceof AcceptedSocketListener) { ((AcceptedSocketListener) messageEndpoint).onMessage(socket); } } } } catch (SocketException e) { if (isRunning()) { //LOG.error("Error while accepting a socket request and scheduling work on the request. " // + "See linked exception"); } // when isRunnging() == false the socket was closed by stop(), ie nothing bad happened } catch (Exception e) { if (isRunning()) { //LOG.error("Error while accepting a socket request and scheduling work on the request. " // + "See linked exception"); } setRunning(false); } } /** * Stops the server-socket from listening. */ public void stop() { setRunning(false); try { serverSocket.close(); } catch (IOException e) { //LOG.error("Can not close server socket of DICOM adapter"); } } private InetSocketAddress getInetSocketAddress() { InetSocketAddress inetSocketAddress = new InetSocketAddress(configuration.getInetAddress(), configuration.getPort()); if (inetSocketAddress.isUnresolved()) { throw new IllegalArgumentException("Hostname cannot be resolved: " + configuration.getInetAddress()); } return inetSocketAddress; } }