3 Replies Latest reply on Sep 30, 2015 7:29 AM by Andrew Dinn

    Instrumenting Spark Streaming with Byteman

    Saleh Mohamed Newbie

      Hi,

       

      I am trying to modify spark streaming program using byteman (see the Java and Byteman rule snippets below) .

       

      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
            );
      
      
      JavaDStream<String> lines = messages.map(tuple2 -> tuple2._2());
      JavaDStream<String> words = lines.flatMap(str -> Arrays.asList(str.split(" ")));
      
      
      

       

       

      RULE accessing the parameters of "createDirectStream" method
      CLASS org.apache.spark.streaming.kafka.KafkaUtils
      METHOD createDirectStream
      AT ENTRY
      IF true
      DO
        traceln("**** topic = " + $7);
      ENDRULE
      
      
      RULE accessing the parameters of "flatMap" method
      CLASS org.apache.spark.streaming.api.java.JavaDStream
      METHOD flatMap
      AT ENTRY
      BIND value = $1
      IF (value.contains("trace")
      DO
        $1 = $1 + "|" + System.currentTimeMillis();
        traceln("new value = " + $1);
      ENDRULE
      
      

       

      The first rule which is applied to KafkaUtils.createDirectStream method (line 1) works fine. But the second rule which is applied to JavaDStream.flatMap  method (line 13)

      doesn't seem to work at all, and byteman does not through any error or exception. Any insight please. One thing I know is that Spark is written with Scala,

      but Scala source code is compiled to Java bytecode and the resulting executable code runs on a JVM. And this shouldn't be the problem as the first rule works fine. Any help please.

       

      regards

        • 1. Re: Instrumenting Spark Streaming with Byteman
          Andrew Dinn Master

          Hi Saleh,

           

          Can you confirrm that this is the correct text for your rule? if it is then I believe you have a typo in trhe pocndtion (missing bracket at end). Try fixing that and then let me know what happens.

           

          If this is the case ten run your JVM with -Dorg.jboss.byteman.verbose on the java command line. You ought to see a parse error message.

          Better is to check your rules offline efore runnign th eprogram. If you have downloaded a Byteman zip release then you can use the bmcheck script in th ebin directory to check your rules as follows

           

            bmcheck.sh -cp /path/to/appcode.jar -cp /path/to/otherappcode.jar myscript.btm

           

          (On windows youcan  use bmcheck.bat)

           

          If you are using maven to run your tests then you can use the Byteman maven rulecheck plugin to check you rules. See the Byteman docs page for the tutorial on how to use this plugin.

           

            http://www.jboss.org/byteman/documentation

           

          regards,

           

           

          Andrew Dinn

          • 2. Re: Instrumenting Spark Streaming with Byteman
            Saleh Mohamed Newbie

            Hi Andrew,

            Thank you for the response. As for the missing bracket, it was not the main problem. But I finally figured out how to do it. The method "flatMap" accepts a function as parameter and in my case I passed lambda expression "str -> Arrays.asList(str.split(" "))". With this, byteman returns a reference to that expression. But after changing the rule to as shown below, I was able to access the string object "str".

             

            ##Add timestamp to the trace packets inside the Arrays.asList method
            RULE inside "JavaDStream.flatMap" method
            CLASS Arrays
            METHOD asList
            AT ENTRY
            BIND value = $1
            IF (value.get(0).toString().contains("trace"))
            DO
                      str = value.get(0);
                      str = str + "||at flatMap:" + System.currentTimeMillis();
                      $1.set(0, str);
                      traceln("byteman at flatMap: " + str)
            ENDRULE
            
            • 3. Re: Instrumenting Spark Streaming with Byteman
              Andrew Dinn Master

              Hi Saleh,

               

              Thanks for following this up andI am glad you sorted out the problem. I would still recommend using bmcheck or the maven rulecheck plugin if you can. Both of those should at lest have told you that the rule could not be injected into the target class.

               

              regards,

               

               

              Andrew Dinn