WildFly 11 and later: mdb hangs when calling "activate" on Resource Adapter
dgrittner Apr 5, 2018 11:41 AMWe are using a resource adapter with an inbound connection for opening a port in WildFly, in this case it is the DICOM port 11112.
One peculiarity is that a EJB singleton is injected into the receiving mdb.
But the problem seems more related to load (so maybe a race condition) than any special kind of EJB, because I replaced the singleton with other EJBs and could reproduce the hang.
For reproducing the problem with WildFly 11 and 12 the following steps are executed:
- on a console "telnet localhost 11112", this triggers the resource adapter and calls and initializes the mdb
- on the jboss cli simply activate the resource adapter again "/subsystem=resource-adapters/resource-adapter=TestRar.rar/:activate"
- the resource adapter is shut down and restarted, the mdb is removed (ejbRemove is called)
- repeat step 1: "telnet localhost 11112"
- on WildFly 11 and 12 the application container hangs forever when calling "((AcceptedSocketListener) messageEndpoint).onMessage(socket);" in a class StartupAwaitInterceptor
- on WildFly 10 everything works, the mdb is initialized and called, etc.
I think that the behavior in WildFly 11 and later is not correct, there seems to be a bug.
Here is the code for the mdb, injected singleton, resource adapter, and worker for the resource adapter
example code for the mdb:
package de.sohard.sedi.dicom;
import java.io.IOException;
import java.net.Socket;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.inject.Inject;
import org.jboss.ejb3.annotation.ResourceAdapter;
import de.sohard.sedi.ra.inbound.AcceptedSocketListener;
/**
* Message driven bean to handle inbound connections
*
*/
@MessageDriven(messageListenerInterface = AcceptedSocketListener.class)
@ResourceAdapter("TestRar.rar")
public class DicomProtocolListener implements MessageDrivenBean, AcceptedSocketListener {
private static final long serialVersionUID = 1L;
@EJB
TestService testService;
@Override
public void ejbRemove() throws EJBException {
}
@Override
public void setMessageDrivenContext(MessageDrivenContext arg0) throws EJBException {
}
/**
*
*/
@Override
public void onMessage(Socket socket) {
try {
testService.test();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
example code for the singleton:
package de.sohard.sedi.dicom;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.LocalBean;
import javax.ejb.Singleton;
import javax.ejb.Startup;
@Singleton
@Startup
public class TestService {
public int a;
@PostConstruct
public void init() {
}
@PreDestroy
public void destroy() {
}
public void test() {
a = 4711;
}
}
example code for the resource adapter:
package de.sohard.sedi.ra.inbound;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.Connector;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
/**
* Implements an inbound JCA resource adapter that mediates communication from external DICOM AE devices (SCUs) to the
* application-server-internal DICOM AE (SCP).
*/
@Connector(displayName = "SediDicomRA", vendorName = "SOHARD Software GmbH")
public class SediDicomRA implements ResourceAdapter {
private static final String RESOURCE_ADAPTER_INET_ADDRESS = "de.sohard.sedi.resourceAdapter.inetAddress";
private static final String RESOURCE_ADAPTER_PORT = "de.sohard.sedi.resourceAdapter.port";
//private static final Logger LOG = LoggerFactory.getLogger(SediDicomRA.class);
private WorkManager workManager;
private SediDicomWorker serverLoop;
private int port = 11112;
private String inetAddress = "127.0.0.1";
/**
* Paramless constructor
*/
public SediDicomRA() {
}
/**
* Sets the port the resource adapter should listen to.
*
* @param port
* the port as int
*/
/**
* Starts the server loop when the message-endpoint (MDB) was activated.
*
* {@inheritDoc}
*/
public void endpointActivation(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec)
throws ResourceException {
DicomServerConfiguration configuration = new DicomServerConfiguration();
configuration.setInetAddress(this.inetAddress);
configuration.setPort(this.port);
serverLoop = new SediDicomWorker(workManager, configuration, messageEndpointFactory);
serverLoop.start();
}
/**
* Stops the server loop when the message endpoint was deactivated.
*
* {@inheritDoc}
*/
public void endpointDeactivation(MessageEndpointFactory arg0, ActivationSpec arg1) {
serverLoop.stop();
}
public XAResource[] getXAResources(ActivationSpec[] arg0) throws ResourceException {
return null;
}
public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
//LOG.info("[SediDicomRA] start()");
workManager = bootstrapContext.getWorkManager();
}
public void stop() {
//LOG.info("[SediDicomRA] stop()");
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public int hashCode() {
return -1;
}
}
example code for the worker:
package de.sohard.sedi.ra.inbound;
;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
/**
* Executes work on behalf of the resource adapter.
*
* Starts a server loop and accepts socket requests.
*
*/
public class SediDicomWorker implements Work {
private static final String SECURE_SOCKET_PROTOCOL = "TLSv1.2";
private static final String CERTIFICATE_STORE_ALGORITHM = "PKIX";
//private static final Logger LOG = LoggerFactory.getLogger(SediDicomWorker.class);
private WorkManager workManager;
private ServerSocket serverSocket;
private DicomServerConfiguration configuration;
private MessageEndpointFactory messageEndpointFactory;
private boolean isTls = false;
/**
* Creates a SediDicomWorker and safes it's arguments for later use.
*
* @param workManager
* receives the work
* @param configuration
* contains server configuration for the internal DICOM AE
* @param messageEndpointFactory
* used to obtain a message endpoint
*/
public SediDicomWorker(WorkManager workManager, DicomServerConfiguration configuration,
MessageEndpointFactory messageEndpointFactory) {
this.workManager = workManager;
this.configuration = configuration;
this.messageEndpointFactory = messageEndpointFactory;
}
/**
* Creates a server socket and binds it to the address specified in the configuration.
*
* @throws ResourceAdapterInternalException
* {@link ResourceAdapterInternalException}
*
*/
public void start() throws ResourceAdapterInternalException {
try {
isTls = false;
InetSocketAddress inetSocketAddress = getInetSocketAddress();
if (!isTls) {
serverSocket = new ServerSocket();
}
serverSocket.setReuseAddress(true);
serverSocket.bind(inetSocketAddress);
//LOG.info("Started listening on {} address: " + inetSocketAddress, isTls ? "TLS secured" : "unsecured");
workManager.startWork(this);
} catch (IOException | WorkException e) {
//LOG.error("Exception while starting resource adapter, rethrowing with ResourceAdapterInternalException");
throw new ResourceAdapterInternalException(e);
}
}
@Override
public void release() {
try {
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private AtomicBoolean isRunning = new AtomicBoolean(false);
final boolean isRunning() {
return isRunning.get();
}
final void setRunning(Boolean running) {
isRunning.set(running);
}
@Override
public void run() {
setRunning(true);
try {
while (isRunning()) {
final Socket socket = serverSocket.accept();
if (socket != null) {
MessageEndpoint messageEndpoint = messageEndpointFactory.createEndpoint(null);
if (messageEndpoint instanceof AcceptedSocketListener) {
((AcceptedSocketListener) messageEndpoint).onMessage(socket);
}
}
}
} catch (SocketException e) {
if (isRunning()) {
//LOG.error("Error while accepting a socket request and scheduling work on the request. "
// + "See linked exception");
}
// when isRunnging() == false the socket was closed by stop(), ie nothing bad happened
} catch (Exception e) {
if (isRunning()) {
//LOG.error("Error while accepting a socket request and scheduling work on the request. "
// + "See linked exception");
}
setRunning(false);
}
}
/**
* Stops the server-socket from listening.
*/
public void stop() {
setRunning(false);
try {
serverSocket.close();
} catch (IOException e) {
//LOG.error("Can not close server socket of DICOM adapter");
}
}
private InetSocketAddress getInetSocketAddress() {
InetSocketAddress inetSocketAddress = new InetSocketAddress(configuration.getInetAddress(),
configuration.getPort());
if (inetSocketAddress.isUnresolved()) {
throw new IllegalArgumentException("Hostname cannot be resolved: " + configuration.getInetAddress());
}
return inetSocketAddress;
}
}