11 Replies Latest reply on Nov 1, 2011 4:37 PM by markaddleman

    Using Teiid for Async Event Processing?

    markaddleman

      We're using Teiid for typical JDBC request-response processing across many varied data sources using custom translators.  Many of our data sources also support establishing subscriptions for async events.  Superficially, the async and sync request problems are similar:

      1. Desire a common language across all data sources
      2. Desire to join across data sources
      3. Data sources have different capabilities
      4. Performance concerns dictate pushing-down as much of the processing as possible

      Given these similarities and the desire for a common code-base across async & sync data source connectors, I'd love to use Teiid as much as possible for async processing.

       

      It seems that Teiid, at least partially, supports on-going query operations with the DataNotAvailableException which appears to set up an internal Teiid polling mechanism.  Although not truly async, I'm pretty sure polling will satisfy our needs (at least initially).

       

      My questions:

      1. If a translator throws a DataNotAvailableException, what happens to the originating JDBC client thread?  Is it notified?  Does it block until the result set is complete? 
      2. If the originating JDBC client blocks until the result set is complete (which, of course, may never happen in an on-going query), is there some other Teiid layer that we can plug into?  I don't particularly care if the layer is a publich (or even stable) API.
        • 1. Re: Using Teiid for Async Event Processing?
          rareddy
          1. If a translator throws a DataNotAvailableException, what happens to the originating JDBC client thread?  Is it notified?  Does it block until the result set is complete? 

          This message is only between the query engine and the translator. when this exception is thrown, the engine just re-queues the work. This is a way to release the connector thread. The JDBC still blocks, and does not get notified.

           

          1. If the originating JDBC client blocks until the result set is complete (which, of course, may never happen in an on-going query), is there some other Teiid layer that we can plug into?  I don't particularly care if the layer is a publich (or even stable) API.

          If you do not want JDBC client to block, then issue a "TranslatorException" when your threshold for waiting for data is reached, and turn on the "partial results" mode, then JDBC will be given results of whatever it gathered so far from the all the other sources. The failed exceptions will be in the "getWarnings" call on the ResultSet object.

           

          I am not sure what you mean plug-into Teiid layer, what is the intended behavior you are trying to achieve in this situation?

           

          Ramesh..

          • 2. Re: Using Teiid for Async Event Processing?
            shawkins

            Mark,

             

            There is a special purpose non-blocking jdbc api used by our odbc layer.  This is only for embedded clients as remote clients still uses old io and relies on the calling thread to do socket polling.

             

            See StatementImpl and PreparedStatementImpl for submitExecute calls that return futures.  From there you will get a ResultSetImpl that as has a non-blocking submitNext call.  See ODBCServerRemoteImpl for an example of how the logic is used.  Typically once a future is obtained a callback (CompletionListener) is registered.  From there, the listener will be processed by the executing thread and the calling thread is free.  In the case of ODBC the calling thread is an IO worker that we cannot block for long running queries.

             

            The original intention was to further refine the non-blocking layer and make it public, https://issues.jboss.org/browse/TEIID-1176, but we haven't gone beyond the embedded/odbc usage yet.

             

            If the DataNotAvailableException polling mechanism is insufficient, then log an enhancement request and we can add a callback on the ExecutionContext that would allow you to wake up processing on demand.

             

            Steve

            • 3. Re: Using Teiid for Async Event Processing?
              markaddleman

              Thanks, Steve - The non-blocking JDBC api sounds exactly like what we need for now.  That it's embedded only doesn't concern me as we will simply be republishing the results over a message bus anyway.

               

              I'm pretty sure the DataNotAvailableException will suffice for our next release but having a callback on the ExecutionContext sounds like an ideal longer term solution.  I'm sure we'll be submitting an enhancement request in the near future.

              • 4. Re: Using Teiid for Async Event Processing?
                markaddleman

                (disclaimer:  I'm basing this message on a quick perusal of 7.3 source code and 7.5 javadoc)

                It looks like the existing non-blocking JDBC API won't quite fit our needs.  I'd like to use Teiid for a streaming result set use case.  In particular, our result sets may never end (except by client request).  A couple of concrete example (here, I assume a translator that provides live data similar to what "top" on unix provides):

                1. Give me a list of processes and their CPU consumption:  SELECT SECOND(timestamp), process, AVG(cpu_percent) FROM top PARTITION BY SECOND(timestamp)

                 

                Through our application, we would ensure only queries that make sense get through (such as, no aggregate queries without windowing on time).  After scanning the code, I fear Teiid isn't set up to support returning partial result sets.  Is this so?  As near as I can piece together, it all hinges ResultsMessage and related logic deal in streams/batches of result sets or only in complete result sets.

                 

                From an API point of view, I'd like an addResultsReciever(ResultsReceiver<ResultsMessage?> rr) method in either PreparedStatementImpl or ResultSetImpl (not sure which one makes more sense). 

                 

                If Teiid doesn't handle the async streaming case, then we would probably emulate the approach using a polling method but take advantage of the non-blocking JDBC API to better scale across many dozens (possibly hundreds) of on-going queries. 

                • 5. Re: Using Teiid for Async Event Processing?
                  shawkins

                  Mark,

                   

                  >> After scanning the code, I fear Teiid isn't set up to support returning partial result sets.  Is this so?

                   

                  Teiid will always return batches as soon as possible to the client.  Exactly when and what is delivered depends on a lot of factors.  The client has to have actually requested the results.  If not the engine will buffer them until requested.  The next issue is that the plan should be able to deliver incremental results.  For example anything that performs a full sort will need to wait for all values to be received by the engine until the first result is sent back. 

                   

                  It seems like you're looking to just setup a single reciever at a lower level of the API correct?  It's a little more convoluted than that since we were trying to tie in at the JDBC level and the engine still expects a request/response model.  The pattern used by ODBC looks like:

                   

                  final PreparedStatementImpl stmt = connection.prepareStatement(sql);
                  ResultsFuture<Boolean> executionFuture = stmt.submitExecute(ResultsMode.RESULTSET);
                  executionFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
                  
                       @Override
                       public void onCompletion(ResultsFuture<Boolean> future) {
                          try {
                              future.get(); //will return true, this just checks for an exception
                              final ResultSetImpl resultSet = stmt.getResultSet();
                              final Runnable rowProcessor = new Runnable() {
                                 @Override
                                 public void run() {
                                      while (true) {
                                          ResultsFuture<Boolean> hasNext = resultSet.submitNext();
                                          synchronized (hasNext) {
                                              if (!hasNext.isDone()) {
                                                  hasNext.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
                  
                                                      @Override
                                                      public void onCompletion(ResultsFuture<Boolean> future) {
                                                          if (processRow(future)) {
                                                              rowProcessor.run();
                                                          }
                                                      }
                                                  });
                                                  break; //will be resumed by onCompletion above
                                              }
                                          }
                                          if (!processRow(hasNext)) {
                                              break;
                                          }
                                      }
                                 }
                              };
                              rowProcessor.run();
                          } catch (ExecutionException e) {
                              //statement failed
                          }
                       }
                  
                       /**
                        * return true to continue processing
                        */   
                       boolean processRow(ResultsFuture<Boolean> hasNext) {
                          try {
                              if (!hasNext.get()) {
                                  //no more results
                                  //...
                                  return false;
                              }
                  
                              //process the row using any of the normal getXXX calls
                              //...
                  
                              return true;
                          } catch (ExecutionException t) {
                              //exception while building the results
                              return false;
                          }
                       }
                  });
                  

                   

                  You can abstract the above and add method hooks for row completion, error handling, etc.  That would give you a more convienent starting point.

                   

                  It sounds like however you may also be expecting a fundementally different processing model for streaming when using a plan needs all rows before returing a single row (such as logically needing a full sort, which includes aggregation, but note that dup removal has a streaming varient).  Is that correct? 

                   

                  If so, there is no built-in handling for that mode of operation.  Unless a translator indicates that rows have terminated the engine will potentially wait until it can consume it - and will consider future rows as necessary.  The closest approximation is like you say a hybrid of the non-blocking with polling. 

                   

                  Something like that could be added we would just need to know how to correlate incoming results and we would probably assume that the plan/results need flushed and reset before processing the next set of incoming results.

                   

                  Steve

                  • 6. Re: Using Teiid for Async Event Processing?
                    markaddleman

                    Teiid will always return batches as soon as possible to the client.  Exactly when and what is delivered depends on a lot of factors.  The client has to have actually requested the results.  If not the engine will buffer them until requested.  The next issue is that the plan should be able to deliver incremental results.  For example anything that performs a full sort will need to wait for all values to be received by the engine until the first result is sent back. 

                    Understood.  We would write our application such that clients of data streams must have no sorting or aggregating unless those operations are executed within a window.  I haven't thought this all the way through, but I suspect the window much be time based as well.

                     

                    It seems like you're looking to just setup a single reciever at a lower level of the API correct? 

                    Yes, I think so.  While we could have many dozens (if not hundreds) of on-going streaming queries, each query would have a single client.  That client should be notified asynchronously as parts of the result set become available from the engine.

                     

                    It sounds like however you may also be expecting a fundementally different processing model for streaming when using a plan needs all rows before returing a single row (such as logically needing a full sort, which includes aggregation, but note that dup removal has a streaming varient).  Is that correct?

                    I don't think I need this use case covered at all.  I'm pretty sure that we can construct every query we want under the restriction that any aggregation, sorting and de-duping be done within a window partition so there is no need for a fundamental change in the model.  Simply a callback from a result set as data becomes available seems like it would be sufficient.  For example:

                    PreparedStatement dailyTempStmt = c.prepareStatement("SELECT city, avg(temperature) OVER (PARTITION BY day(time)) FROM hourly_temperatures");
                    dailyTempStmt.executeQueryAsync(new ResultSetCallback() {
                                                                                  public void onResultSet(ResultSet rs) {
                                                                                     //  Semanics of resultset.next() change slightly:
                                                                                     //  Return false when no more rows available on this callback
                                                                                     while (rs.next()) {
                                                                                     }
                                                                                  }
                    
                                                                                  public void onComplete() {};
                                                                                  public void onException() {};
                                                                               });
                    

                     

                    What do you think?

                    • 7. Re: Using Teiid for Async Event Processing?
                      shawkins

                      Mark,

                       

                      If Teiid is processing the window function, then we would still wait for all source rows.  There is a possibility of blocking on rs.next(), so to be fully non-blocking you must process a single row at a time (or be aware of the precise number of rows sent in each results batch).  So for the callback, you would want to take the CompletionListener class above, make it abstract and pass in the statement:

                       

                      public abstract class BaseCompletionListener extends ResultsFuture.CompletionListener<Boolean>() {
                      
                           protected StatementImpl stmt;
                      
                           public BaseCompletionListener(StatementImpl stmt) {
                               this.stmt = stmt;
                           }
                      
                           @Override
                           public void onCompletion(ResultsFuture<Boolean> future) {
                              try {
                                  future.get(); //will return true, this just checks for an exception
                                  final ResultSetImpl resultSet = stmt.getResultSet();
                                  final Runnable rowProcessor = new Runnable() {
                                     @Override
                                     public void run() {
                                          while (true) {
                                              ResultsFuture<Boolean> hasNext = resultSet.submitNext();
                                              synchronized (hasNext) {
                                                  if (!hasNext.isDone()) {
                                                      hasNext.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
                      
                                                          @Override
                                                          public void onCompletion(ResultsFuture<Boolean> future) {
                                                              if (processRow(future)) {
                                                                  rowProcessor.run();
                                                              }
                                                          }
                                                      });
                                                      break; //will be resumed by onCompletion above
                                                  }
                                              }
                                              if (!processRow(hasNext)) {
                                                  break;
                                              }
                                          }
                                     }
                                  };
                                  rowProcessor.run();
                              } catch (ExecutionException e) {
                                  onException(e);
                              }
                           }
                      
                           /**
                            * return true to continue processing
                            */  
                           boolean processRow(ResultsFuture<Boolean> hasNext) {
                              try {
                                  if (!hasNext.get()) {
                                      onComplete(stmt.getResultSet());
                                      return false;
                                  }
                      
                                  processRow(stmt.getResultSet());
                      
                                  return true;
                              } catch (Exception t) {
                                  onException(t);
                                  return false;
                              }
                           }
                      
                           abstract void processRow(ResultSet rs) throws Excpetion;
                      
                           abstract protected void onException(Exception e);
                      
                           abstract protected void onComplete(ResultSet rs);
                      
                      }
                      

                       

                      Then the usage would look similar:

                       

                      final PreparedStatementImpl stmt = connection.prepareStatement(sql);

                      ResultsFuture<Boolean> executionFuture = stmt.submitExecute(ResultsMode.RESULTSET);

                      executionFuture.addCompletionListener(new BaseCompletionListener<Boolean>(stmt) {

                       

                           protected void processRow(ResultSet rs) {

                              //process the row using any of the normal getXXX calls

                              //...

                              //calls to hasNext may block

                           }

                       

                           protected void onException(Exception t) {}

                       

                           protected void onComplete(ResultSet rs) {}

                       

                      });

                      • 8. Re: Using Teiid for Async Event Processing?
                        markaddleman

                        I'm beginning to see the problem with using windowing in a streaming operation.  If I understand, a plan for a PARTITION would require the entire result set.  Is that correct?

                         

                        Would it be possible to introduce a new partition operation to support streaming: STREAMING_PARTITION ?  The semantics around STREAMING_PARTITION would be to group on an expression as long as the expression returns the same value.  For example the expression were as simple colulmn with ordered values  "a,a,a,b,b,b,a,a."  STREAMING_PARTITION would create 3 partitions (a,b,a) while PARTITION would create two (a,b).

                         

                        I'm sure there is some complication in a federated case...

                        • 9. Re: Using Teiid for Async Event Processing?
                          shawkins

                          Mark,

                           

                          Yes the partitioned window would require all source rows.  You would probably not want a new partitioning construct, rather like I was alluding to earlier you would want some way of correlating source results.  For example a plan opened in "streaming mode"  would:

                          - expect forward only consumption of results

                          - pull/process all incoming results until the translator indicates end of rows, but executions would not be closed.

                          - the plan would be reset and processing reinitiated.  translators would be expected to return the next set of incoming values.

                          - each iteration of exection would maintain its own results buffer, otherwise we could eventually run into the max row number of 2^31-1.

                           

                          Some issues with that approach are:

                          requires some notion of creating a streaming plan (a hint?)

                          code changes to handle the repeated processing

                          you may have resource issues with executions holding connections (but that would be up to the design of your translators)

                           

                          On the otherhand, you get nearly the same processing model using a client poll or by using the onComplete hook above to reissue the query.  The key difference would be that now you are dealing with multiple statements and multiple translator executions.

                           

                          Steve

                          • 10. Re: Using Teiid for Async Event Processing?
                            shawkins

                            Mark,

                             

                            For https://issues.jboss.org/browse/TEIID-1800 the above logic was corrected (I really shouldn't code in the forum editor...) and adapted so that callers just provide a simple callback implementation.  This is part of 7.6 for general use and shouldn't be too hard for you to back port if desired.

                             

                            Steve

                            • 11. Re: Using Teiid for Async Event Processing?
                              markaddleman

                              Great.  Thanks.  We intend to move to 7.6+ asap