首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

检测实时数据流中的复杂事件(3)

检测实时数据流中的复杂事件(3)

第 5 步. 检查 Node.js        代码EventDetection 应用程序是一个完整但简单的应用程序,不需要任何自定义工作即可运行。要理解该应用程序,可查看它的代码:
  • 打开 app.js 文件,查看应用程序逻辑。app.js 中的代码围绕 6 个主要步骤进行组织:
    • 提取在使用 Streaming Analytics REST API 时所需的环境信息。
    • 检查 Streams 实例是否正在运行,如有必要,通过 Streaming Analytics REST API 启动该实例。
    • 如果该实例已在运行,可检查 Streams 事件检测作业是否已在运行。如果一个作业正在运行,则取消它。
    • 使用 Streaming Analytics REST API 将一个 Streams 应用程序包部署到 Streaming Analytics              服务。该应用程序包中包含的 Streams 应用程序可分析天气数据并执行事件检测。
    • 处理 Streams 应用程序检测到的事件并将它们显示在这个网页上。
    • 处理 3,000 个事件后取消与 Streams 应用程序对应的作业。
去除无用代码,找出在何处执行这些步骤。本节的剩余部分将更详细地分析两个步骤。
  • 查看步骤 1(d) — 将 Streams 应用程序包部署到 Streaming Analytics            服务的代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    ...
    // -----New form-----
    var form = new FormData();

    // -----File part-----
    form.append('file', fs.createReadStream('EventDetection.sab'), {
      contentType: 'application/octet-stream'
    });

    // -----JSON Part-----

    jsonObject = JSON.stringify({
      "jobName" :  "EventDetectionSample",
      "submissionParameters" :
      {
          "route" : app_uri,
      },
    });
    console.info('JSON object part: ' + jsonObject);

    var buffer = new Buffer(jsonObject);
    form.append('my_buffer', buffer, {
      contentType: 'application/json',
      // The line below is not an actual file.  The name with the .json
      // extension is needed for the data in the buffer to be recognized
      // as json.
      "filename": "jparams.json"
    });

    // -----POST Params-----
    var uri_string = sa_props.jobs_path + '?bundle_id=EventDetection.sab';

    // -----SUBMIT POST-----
    var jsonPostRes = {};

    form.submit({
      protocol: 'https:',
      host: sa_props.rest_host,
      path: uri_string,
      headers: {'Authorization' : authbuf}
    }, function(err, res) {
       ...





    上述代码对          Streaming Analytics REST API 执行一次 HTTP POST,使用 form-data 包提交一个多部分表单:
    • 该表单的一部分是一个名为 EventDetection.sab 的文件。.sab 文件是一个 Streams 应用程序包,是在 Streams 开发环境中编译一个              Streams 应用程序的结果。EventDetection.sab 包含了执行事件检测的 Streams 应用程序。
    • 另一部分是一个 JSON 对象,它包含应用程序包部署方面的其他信息,比如为 Streaming Analytics 实例中运行的作业所提供的名称,以及此应用程序的              submission-time 参数。Streams 应用程序仅有一个 submission-time 参数。您将该路径传递给 Node.js 应用程序,以便              Streams 应用程序可在检测到事件时将其发回到 Node.js 应用程序。
  • 现在查看步骤 1(e) 的代码,该代码处理 Streams 应用程序检测到的事件。此代码在 app.js 文件的开头附近,而不是在步骤序列中,因为只要从 Streams          应用程序中发送了一个事件,就会异步调用它。Streams 应用程序向 Node.js 应用程序执行一次 HTTP POST,将事件发送到应用程序。下面的代码处理事件的 JSON            载荷并执行适当的操作:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // POST handler for the events being sent back from the Streams application
    app.post('/', function(req, res){
        status_step[4] = "Processing Events";

        if (!cancelling) {
          console.info("In POST function");
          var jsonString = req.body.jsonString;
          console.info("POST message is: " + jsonString);
          var payload = JSON.parse(jsonString);
         
          if (payload.eventType == 'MaxMin Temp') {
            // Max or min temperature change
            maxmin = payload;
          }
          else {
            // Regular event
            eventCount++;
            console.info("Event total = " + eventCount);

            // Add event to the array used by the web user interface
            events.push(new Event(eventCount, payload));
          
            // Cancel the Streams job if we've reached the event target
            if (eventCount == eventTarget) {
              cancelling = true;
              console.info("EVENT TARGET REACHED...");
              console.info("STREAMS JOB WILL BE CANCELLED.");
              finalCancel(jobNumber.toString());
            }
          }
        }
        res.send();
    });




第 6 步. 检查 Streams        应用程序代码使用的 Streams 应用程序是一个完整的 Streams 应用程序,不需要任何自定义工作即可运行。您下载(或克隆或分解)的源代码中包含了该应用程序的源代码及其预先构建的 .sab        文件。要理解 Streams 应用程序,可查看它的代码:
  • 打开 EventDetection.spl 文件(位于项目的 spl 子目录中)。该应用程序的源代码是用 SPL 编写的,这是一种面向数据流和数据处理操作符的语言。
  • 跳过用于定位操作符声明的代码,并将其与您在 Streaming Analytic 控制台中看到的流程图相对比。本节的剩余部分将更详细地分析两个操作符。
  • 检查可检测复杂事件的操作符的代码。下面的代码段给出了一个名为 MatchRegex          的操作符,它可检测一个流中的一系列数据元组模式。代码注释说明了该操作符将检测的 M            形状模式的性质:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    //
    // The first complex event is called "M-shape".  It triggers when the graph of the
    // temperature for a weather station form's an M shape over a period of time.
    //
    // Detecting M shape patterns in weather data is not that useful, but recognizing an M shape in
    // financial trading is valuable and is referred to as a "double-top" stock pattern.
    //
    // See http://hirzels.com/martin/papers/debs12-cep.pdf for more information on this complex event
    // detection method, the double-top pattern and other patterns.
    //
    stream<WeatherSummary weatherValues, rstring event> TempMEvent = MatchRegex(WeatherSummary)
    {
      param
        pattern     : ". rise+ drop+ rise+ drop* deep";
        partitionBy : stationCode;   
        predicates  : {
      rise = tempInF>First(tempInF)  && tempInF>=Last(tempInF),
      drop = tempInF>=First(tempInF) && tempInF<Last(tempInF),
      deep = tempInF<First(tempInF)    && tempInF<Last(tempInF) };
      output
        TempMEvent : weatherValues=WeatherSummary, event="M-Shape Temp";
    }




    该操作符的声明使用正则表达式语法以及一组也在操作符声明中定义的谓词来定义您尝试检测的模式。该操作符根据气象站报告的值集来查找气象站温度的            M 形状。这个 MatchRegex 操作符使用之前在 SPL 代码中定义的 WeatherSummary            流,并生成一个名为 TempMEvent 的流。该操作符按气象站的 ID            将气象站的读数数据划为不同的分组,并维护检测每个气象站事件所必要的状态。
  • 查看可将事件发回给 Node.js            应用程序的操作符序列(包含两个操作符):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //
    // Send events to the application user interface by converting them to json and HTTPPost-ing
    // to the Node.js app
    //
    stream <rstring jsonString> JSONOutput = com.ibm.streamsx.json::TupleToJSON(OutputEvents)
    {
    }

    () as HttpEvents = HTTPPost(JSONOutput) {
      param
        headerContentType : "application/json";
        url : ((rstring) getSubmissionTimeValue("route"));
    }





    前面代码段中的第一个操作符将一个流中的一个元组转换为          JSON 字符串。这个操作符使用之前在 SPL 代码中定义的一个名为 OutputEvents 的流并生成一个名为            JSONOutput 的流。下个操作符(名为 HTTPPost)使用 JSONOutput          流并通过 HTTP POST 将该 JSON 字符串发送到 Node.js 应用程序的路径。
返回列表