Categories

Versions

Using the Streaming extension

TheStreamingextension allows to build streaming analytic processes and deploy them on an Apache Flink or Spark streaming cluster. The design follows the known operator based approach inside a special nested operator calledStreaming Nest.

The data flow works with Apache Kafka topics as data source and sink for the streaming process. To see how to establish Kafka connections see the entry for theKafka Connectorextension.

Install the Streaming extension

To install the extension, go to theExtensionsmenu, open theRapidMinerMarketplace (Updates and Extensions), and search forStreaming Extension. For more detail, seeAdding extensions.

Connect to a Streaming Cluster

The extension requires a connection to a streaming cluster, where the process can be deployed. For this the connection uses RapidMiner's connection framework. This allows managing connections centrally and to reuse connections between operators. The processes are technology independent, so the same process can be either executed on a Flink or Spark cluster with changing anything, except the connection object.

Connect to Flink

To create a working FlinkConnection objectin the repository, the necessary properties must be added. These are the host and port information, the applied parallelism level and the address of the remote dashboard.

连接到火花

To create a working SparkConnection object, the necessary properties must be added. TheSpark Settingstab handles the basic connection and requires the host address and port number and the link to the remote dashboard.

TheSpark Propertiestab and the HDFS tabs can hold specific properties of the cluster and depend on the used spark version and server settings.

TheHDFS Settingstab takes optionally the URL and path to the HDFS file system.

Building a Streaming Process

The starting point for any streaming process is theStreaming Nestoperator. Inside this process, the streaming process is configured and its content is then deployed on a streaming cluster, which is defined via the connection input port.

Inside the streaming nest, only operators from the Streaming extension will be translated into Flink or Spark operations and deployed on the streaming server. Other operators like Multiply can help to organize the workflow. Kafka Connection objects point to the input and output data streams and RapidMiner models can be used in combination with the Apply Model on Streams (seebelow).

Build an ETL Process

This process merges two incoming data streams from two separate Kafka topics and then filters for a specific key in the data. The results are then written back to a new Kafka topic, where they can for example be used for training a model.

Example use cases:

  • The two incoming Kafka streams are events from two separate production plant monitoring services. From the millions of incoming events, only those tagged as "warning" are important for an early warning system.
  • The streams could also be click events from two web-shop sites and only events with the key "cancel" should be analyzed, for example to trigger a retention event.

The process below shows how two data streams from different assets are first merged together by aUnionoperator. The resulting stream is filtered to contain only events with the "warning" tag. The filtered result is written back to a new Kafka topic.

Apply Model on Streams

Based on the example above, the second example shows how to train and apply any RapidMiner model on a streaming cluster.

The first step is to retrieve the data and train a model on historic data. For example, theRead Kafkaoperator from theKafka Connectorextension can be used to retrieve past events with a "warning" tag from the plant monitoring. Ak-Meansclustering model can detect sub-groups of alarms, that help to automatically distinguish between different types of problems. If there are labeled data available, a supervised learning model could also be trained; for example a model to predict the severity level of an alarm.

Now the trained model is placed in inside a Streaming workflow. This model is then applied on the filtered alarm events stream and the results with a prediction are pushed to yet another topic.

Monitoring Processes

The Streaming extension adds a new panel to RapidMiner Studio, the “Streaming Dashboard”. It can be added to the User Interface underView -> Show Panel -> Streaming Dashboard.

It lists all deployed streaming processes and allows to monitor and manage them. The execution of a Streaming Nest operator creates an entry in the Streaming Dashboard (called workflow). The name of the workflow, the status, the location of the defining RapidMiner process and the start time are listed on the dashboard.

Also all deployed streaming jobs are listed as well. Individual jobs, as well as the whole workflow can be stopped through the dashboard. The entry of the workflow can be removed from the dashboard by using the corresponding button.

The Streaming Dashboard also allows to open the platform specific (Flink or Spark) remote dashboard.

Apply RapidMiner Models on Streams

In order to use the Apply Model on Stream operator, some changes to the streaming cluster are needed. The streaming engine (Flink or Spark) needs an extra plug-in so it knows how to handle RapidMiner models and of course the RapidMiner execution engine is needed to actually run the models.

Install RapidMiner Studio on the Cluster

The installation guide for RapidMiner Studio can be foundhere. If special models (for example from theDeep Learningextension) are used, the extension*.jarfile is needed as well and needs to be placed in the.RapidMiner/extensions在集群上的文件夹。可以扩展文件downloaded from the RapidMiner marketplace or copied from the.RapidMinerof the local installation.

Install RapidMiner Plug-Ins

The RapidMiner plug-in file is shipped with the extension and can be found in the RapidMiner Home folder under this path.RapidMiner/extensions/workspace/rmx_streamingafter the Streaming extension is installed from the marketplace.

There are two files in this folder calledrapidminer-as-plugin-common.jarandrapidminer-as-plugin.jar, with the version number of the extension at the end.

Place therapidminer-as-plugin-common.jarfile into the/libfolder of your cluster installation (for example/opt/flink/lib/).

Preparing therapidminer-as-plugin-common.jarrequires a few more steps: - Create an emptylibfolder. - Copy into this folder the content of thelibfolder from the fresh RapidMiner installation (without thejdbcandpluginssubfolders). - Copy therapidminer-as-plugin.jarin this folder as well. - Create aplugin.propertiesfile and write the following into it:

plugin.class=com.rapidminer.extension.streaming.raap.wrapper.RapidMinerPlugIn plugin.id=rm-as-plugin plugin.version=9.9.0 plugin.description=RapidMiner as Plugin plugin.license=Apache Licents 2.0

Create a zip-archive with thelibfolder and the plugin.properties inside and name itrm-as-plugin.zip(make sure the name is correct, as the code will look for exactly this file).

This zip file then placed in a new folder%RM_INSTALLATION%/lib/rm-as-pluginthe RapidMiner installation (so it's a subfolder from where the*.jar文件的复制)。

Afterwards the cluster instance should be restarted, so that Flink or Spark can load the new plugin.

Executing a RapidMiner Model

Now theApply Model on Streamoperator can be used. The two required parameters are the locations of the RapidMiner installation folder (where therm-as-plugin) was installed and the RapidMiner Home folder, where the extension and user data are stored.

On a typical linux cluster the paths would be something like/opt/rapidminer-9-9-0and/home/$UserName/.RapidMiner