5 Replies Latest reply on Dec 13, 2019 6:03 AM by yurine

    Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id

    yurine

      I'm investigating a wierd error related to JGroups while upgrading from Wildfly 11.0.0.Final to Wildfly 18.0.1.Final.

       

      I have ReplicatedHashMap instances bound to ForkChannel instances. ForkChannel instances are created by application code.

       

      The error happens in the following scenario:

      1. Node slave-1 is running. It's calling methods of ReplicatedHashMap and everything works fine.

      2. Node slave-2 is starting. Application is not deployed yet, but the node is already joined the jgroups cluster, the view is changed to [slave1:slave-1|1] (2) [slave1:slave-1, slave2:slave-2]

      3. slave-1 calls some method of ReplicatedHashMap (with RequestOptions.timeout set to 30 seconds). Two responses are received nearly immediately, one is from slave-1 and it's fine, and another is from slave-2 and it contains an exception, IndexOutOfBoundsException (in Rsp.getException). This step can repeat several times before step 4.

      4. Application is deployed on slave-2. It created ForkChannel and start receiving remote calls for ReplicatedHashMap. From this moment everything works fine again. IndexOutOfBoundsException doesn't happen any more.

       

      The exception in step 3 is the following:

      Caused by: java.lang.IndexOutOfBoundsException: pos=0, limit=0

      at org.jgroups.util.ByteArrayDataInputStream.checkBounds(ByteArrayDataInputStream.java:279)

      at org.jgroups.util.ByteArrayDataInputStream.<init>(ByteArrayDataInputStream.java:28)

      at org.jgroups.blocks.RequestCorrelator.replyFromBuffer(RequestCorrelator.java:463)

      at org.jgroups.blocks.RequestCorrelator.handleResponse(RequestCorrelator.java:409)

      at org.jgroups.blocks.RequestCorrelator.dispatch(RequestCorrelator.java:364)

      at org.jgroups.blocks.RequestCorrelator.receiveMessageBatch(RequestCorrelator.java:327)

      at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:589)

      at org.jgroups.JChannel.up(JChannel.java:796)

      at org.jgroups.fork.ForkProtocolStack.up(ForkProtocolStack.java:162)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.protocols.FORK.up(FORK.java:169)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.protocols.FRAG3.up(FRAG3.java:191)

      at org.jgroups.protocols.FlowControl.up(FlowControl.java:411)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:294)

      at org.jgroups.protocols.UNICAST3.deliverBatch(UNICAST3.java:1041)

      at org.jgroups.protocols.UNICAST3.removeAndDeliver(UNICAST3.java:850)

      at org.jgroups.protocols.UNICAST3.handleBatchReceived(UNICAST3.java:816)

      at org.jgroups.protocols.UNICAST3.up(UNICAST3.java:487)

      at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:687)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:215)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.stack.Protocol.up(Protocol.java:338)

      at org.jgroups.protocols.TP.passBatchUp(TP.java:1301)

      at org.jgroups.util.MaxOneThreadPerSender$BatchHandlerLoop.passBatchUp(MaxOneThreadPerSender.java:284)

      at org.jgroups.util.SubmitToThreadPool$BatchHandler.run(SubmitToThreadPool.java:136)

      at org.jgroups.util.MaxOneThreadPerSender$BatchHandlerLoop.run(MaxOneThreadPerSender.java:273)

      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

      at org.jboss.as.clustering.jgroups.ClassLoaderThreadFactory.lambda$newThread$0(ClassLoaderThreadFactory.java:52)

      at java.lang.Thread.run(Thread.java:748)

       

      In Wildfly 11 the same scenario differs in step 3: there's a 30 seconds timeout and then there's a response with Rsp.wasReceived returning false.

      Which looks reasonable and logical and it's ok for me. I log this event as a WARN and it's ok for me because it's easy to understand while observing logs and seeing cluster view change due to a starting node.

       

       

      Unlike original ReplicatedHashMap I do process exceptions received in Rsp (because once I had to debug to see an exception which was received in Rsp from a message addressee and didn't show up anywhere).

      That's why I interpret this situation as an error for my logic, consider this a failure and abort/rollback higher-level operations.

       

      First I thought something has changed or I was doing something wrong from the beginning, so I verified my configuration and code several times. Also tried to find something on this on the Internet, but didn't find anything similar, which made me continue searching for some my mistake.

       

      It was confusing that local error appears in Rsp. I was expecting that only exceptions from remote side will be delivered within Rsp object. It wasn't easy to understand where the exception actually happened - here on a caller side or on a remote side.

      Enabling TRACE for jgroups didn't help much, but shown that slave-2 actually receives a message and sends some response, but slave-1 fails to process it. Unfortunately FORK doesn't write TRACE logs and I couldn't see which layer actually generates a problematic response.

      Then I got confused that I don't see the following warning in both Wildfly 18 and in Wildfly 11:

          log.warn("%s: fork-stack for id=%s not found; discarding message", local_addr, forkStackId)  // it's in org.jgroups.protocols.FORK

      So I had to do some tough debugging and found that the problematic response is generated by Wildfly's own UnknownForkHandler in JChannelFactory (which also doesn't log anything, while this could help save time).

       

       

      It doesn't look like the exception can be caused by my configuration or code. So far it rather looks like some bug in Wildfly JGroups extension, doesn't it?

       

      I wish I'd be wrong and I would be helpful if someone could explain how can I fix this in my configuration/code.

      And I wonder if there is any workaround for this except just ignoring exceptions in Rsp.

       

       

      Thanks in advance.

       

      /cc pferraro

        • 1. Re: Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id
          pferraro

          If a message is received by a member where a response is expected, and no such fork channel exists, that members auto-replies with an empty response.  This is necessary to prevent the sender hanging waiting on a response from that member.

          Any user of a fork channel build on top of a server managed channel must handle this type of response, i.e. where org.wildfly.clustering.jgroups.spi.ChannelFactory.isUnknownForkResponse(ByteBuffer.wrap(msg.rawBuffer(), msg.offset(), msg.length()) = true.

          • 2. Re: Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id
            yurine

            Hi Paul,

            Yeah, I've seen a comment for sending UNKNOWN_FORK_RESPONSE in JChannelFactory source.

            But unfortunately I can't see how I'm supposed to handle this response myself. I use ForkChannel for RpcDispatcher (inside ReplicatedHashMap), and not dealing at all with underlying messages.

             

            After your reply I've got an idea to try using Wildfly's ChannelFactory.createChannel instead of JGroups' ForkChannel constructor (both were working for me in Wildfly 11). I've hoped that ChannelFactory will return some different ForkChannel implementation (or something different inside it), which is aware of Wildfly's UNKNOWN_FORK_RESPONSE and can handle it some way.

             

            But this didn't help. The error is the same anyway.

            Moreover it's not clear how to specify fork stack id or protocols for the fork stack when using ChannelFactory like it can be done with ForkChannel constructor. Probably it's supposed that fork channels are declared in configuration with their additional protocols. Fortunately it's not a blocker for me currently because I use default stack anyway, but it also could be the problem as I don't declare fork channels and create them programmatically.

             

            So, is it supposed that application code will handle UNKNOWN_FORK_RESPONSE itself?

            The only guess I have so far is try setting some of UpHandler / RequestHandler / RequestCorrelator and try to intercept the processing for this UNKNOWN_FORK_RESPONSE.

            But frankly, I'd expect that if underlying layer inserts some trick like UNKNOWN_FORK_RESPONSE, then it probably should have process it itself, or at least don't fail to parse it. I wonder how this worked in Wildfly 11, can't see any relevant changes at least to JChannelFactory.

             

            Thank you.

            • 3. Re: Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id
              pferraro

              So, unfortunately, ReplicatedHashMap was written before fork channels existed.  That means you have 2 options:

              1. Ensure your ReplicatedHashMap uses a designated JChannel.
              2. You can still use ReplicatedHashMap using a fork of the default WF channel, but you need to extend it to handle the condition where a member does not (yet) have a ReplicatedHashMap started.

               

              e.g.

               

              public class ForkFriendlyReplicatedHashMap extends ReplicatedHashMap {
                  public ForkFriendlyReplicatedHashMap(ChannelFactory factory, String id) {
                      super(factory.createChannel(id));
                      this.dispatcher.marshaller(new MethodCallMarshaller(factory));
                  }
              }
              
              public class MethodCallMarshaller implements org.jgroups.blocks.Marshaller {
                  private final ChannelFactory factory;
              
                  public MethodCallMarshaller(ChannelFactory factory) {
                      this.factory = factory;
                  }
              
                  @Override
                  public void objectToStream(Object object, DataOutput stream) throws IOException {
                      BufferOutputStream bufferOutput = new BufferOutputStream();
                      try (DataOutputStream output = new DataOutputStream(bufferOutput)) {
                          ((MethodCall) object).writeTo(output);
                      }
                      Buffer buffer = bufferOutput.getBuffer();
                      stream.writeInt(buffer.getLength());
                      stream.write(buffer.getBuf(), buffer.getOffset(), buffer.getLength());
                  }
              
                  @Override
                  public Object objectFromStream(DataInput stream) throws IOException, ClassNotFoundException {
                      int size = stream.readInt();
                      byte[] buffer = new byte[size];
                      stream.readFully(buffer);
                      if (this.factory.isUnknownForkResponse(ByteBuffer.wrap(buffer))) return null;
                      try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(buffer))) {
                          MethodCall call=new MethodCall();
                          call.readFrom(input);
                          return call;
                      }
                  }
              }
              
              public final class BufferOutputStream extends ByteArrayOutputStream {
              
                  public BufferOutputStream() {
                      super();
                  }
              
                  public ByteBufferOutputStream(int size) {
                      super(size);
                  }
              
                  public org.jgroups.util.Buffer getBuffer() {
                      return new org.jgroups.util.Buffer(this.buf, 0, this.count);
                  }
              }
              1 of 1 people found this helpful
              • 4. Re: Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id
                yurine

                I will try solving the problem using marshaller, moreover I already use custom marshaller for passing my classloader (this wasn't working in WildFly 11 and now I finally can use application classes in 18).

                 

                Actually IndexOutOfBoundsException issue is not only about ReplicatedHashMap, but at least about RpcDispatcher in general. So anyone who doesn't ignore Rsp.hasException will have to implement custom processing for the empty response.

                 

                (Still I wonder this is working in WildFly 11, but this anyway won't help me in 18.)

                 

                Thank you very much for your help!

                • 5. Re: Wlidfly 18: IndexOutOfBoundsException on unknown fork stack id
                  yurine

                  Ok, now I've managed to solve my issue. Here are the details, in case someone will meet the same problem.

                   

                  There's a problem in the code proposed above. The following line will throw EOFException

                        stream.readFully(buffer);

                  I guess it's possible to fix this by looking into original objectToStream method implementation, but I don't want to have this kind of dependency on JGroups internals.

                   

                  So I've tried different approach: set my UpHandler to the channel for preprocessing messages.

                   

                  The final implementation replaces the buffer of the problematic message:

                          @Override
                          public Object up(Event evt) {
                              return upperHandler.up(evt);
                          }
                  
                  
                          @Override
                          public Object up(Message msg) {
                              fixUnknownForkResponse(msg);
                              return upperHandler.up(msg);
                          }
                  
                  
                          @Override
                          public void up(MessageBatch batch) {
                              for (Message msg : batch) {
                                  fixUnknownForkResponse(msg);
                              }
                              upperHandler.up(batch);
                          }
                  
                  
                          private void fixUnknownForkResponse(Message msg) {
                              if (channelFactory.isUnknownForkResponse(ByteBuffer.wrap(msg.getRawBuffer(), msg.getOffset(), msg.getLength()))) {
                                  log.debug(String.format(
                                      "Remote method call failed: %s replied 'UNKNOWN FORK' for fork '%s'",
                                      msg.getDest(), channelId
                                  ));
                                  try {
                                      // without this MessageDispatcher will fail parsing original buffer
                                      msg.setBuffer(Util.objectToBuffer(null));
                                  } catch (IOException e) {
                                      throw new RuntimeException(e);
                                  }
                              }
                          }
                  

                   

                  This will prevent code that sent request from waiting for the response (if it waits).

                  It is also should be possible to construct a buffer with a meaningful response, e.g. containing some UnknownForkException, to move the processing to a higher levels, but I don't need this.

                   

                  I've also tried the following implementation, which just drops problematic messages:

                          @Override
                          public Object up(Message msg) {
                              if (isUnknownForkResponse(msg)) {
                                  return null;
                              }
                              return upperHandler.up(msg);
                          }
                  
                  
                          @Override
                          public void up(MessageBatch batch) {
                              Iterator<Message> it = batch.iterator();
                              while (it.hasNext()) {
                                  Message msg = it.next();
                                  if (isUnknownForkResponse(msg)) {
                                      it.remove();
                                  }
                              }
                              upperHandler.up(batch);
                          }
                  
                  
                          private boolean isUnknownForkResponse(Message msg) {
                              if (channelFactory.isUnknownForkResponse(ByteBuffer.wrap(msg.getRawBuffer(), msg.getOffset(), msg.getLength()))) {
                                  log.debug(String.format(
                                      "Remote method call failed: %s replied 'UNKNOWN FORK' for fork '%s'",
                                      msg.getDest(), channelId
                                  ));
                                  return true;
                              }
                              return false;
                          }
                  

                   

                  It also works, but the code that sent request will continue waiting for the response.

                   

                  (In general, replacing UpHandler makes my code a little messy because I have to make modifications to fork channel not in the begging, but after ReplicatedHashMap is created and real UpHandler is set to the channel. Also it's not possible to get id from the fork channel so I have to keep it in between channel creation and UpHandler creation. Though it can be get from a message header, but I don't like to do so).

                   

                  Another detail to save smb's time: org.wildfly.clustering.jgroups.spi.ChannelFactory#isUnknownForkResponse will be available from the following dependency

                              <groupId>org.wildfly</groupId>
                              <artifactId>wildfly-clustering-jgroups-spi</artifactId>
                  

                   

                  and adding "org.wildfly.clustering.jgroups.spi" module to application dependencies.

                  1 of 1 people found this helpful