Tuesday, November 17, 2015

Publish Data From WSO2 ESB to WSO2 DAS.

Prerequisites

  • WSO2 ESB (Enterprise Service Bus - 4.9.0).
  • WSO2 DAS (Data Analytic Server - 3.0.0) .
The purpose of writing this blog post is to demonstrate how to publish a data stream from WSO2 ESB to WSO2 DAS and persist the data in a MySQL database.
  
First you need login to the WSO2 ESB management console and add an event sink to the ESB. To do that select configure tag and click on Event Sinks. Now you can see the event Sinks Configuration console. Click on Add Event Sink then you will receive a console to enter details about the event sink as shown in the figure 2.

Figure 1


Enter a proper name to the event sink. Then enter the user name and the password.
Receiver URL : tcp:// <your IP address > <thrift port of DAS>
Authenticator URL : ssl:// <your  IP address><authentication port of DAS>

By default the thrift port and the authentication port will be configured as 7611 and 7711 respectively. However, since DAS server was started with an offset, those ports will be opened on 7613 and 7713 from the DAS server.





Create a custom Proxy service and edit the proxy service as shown in figure 3.

How to create custom proxy service to access the Redmine REST API through Redmine Connector ?? Click Here

                                          Figure 3


Edit the already created proxy service in design view add a Publish Event mediator to the proxy service as shown in figure 5 .
Click on design view of the proxy service.

Figure 4



Figure 5



Enter the Stream definition to the publish event mediator as shown in figure 6.
Stream Name :  Name of the data stream.
Stream Version : Version of the data stream.
Event Sink : Select a proper event sink from the drop down list.

Add the correlated attributes of the data stream. Here I added two attributes called id and name. For each attribute you have to select the attribute value from the drop down list.
If you define an attribute for a static value you need to select the value option and if you define an attribute for a dynamic value you need to select expression option from the drop down list.
For the Value/Expression field you need to give the Xpath if the attribute is dynamic other wise you need to add a static value if the attribute is static. Then Select the type of the attribute.
Now save the all settings of the proxy service and go to the proxy service list. Then click on Try this service as shown in the figure 7.


Figure 6


Figure 7

  
Now login to the DAS and go to the main tag and select streams . Now you should be able to see the data stream which was published from the ESB in the stream list. Now you need  to persist this data stream . To do that click on edit . Click on  Next[Persist Event] button at the bottom of the Edit Event Stream Console  as shown in the figure 9.

Figure 8
                                                                         


Figure 9 


Then put a tick to the Persist Event Stream option and to the Persist Attribute option as highlighted in figure 10 and then click on Save Event Stream .

                                                               Figure 10                                                                     
Now you need to add an event receiver to listen  this data stream. To add an event receiver select Main tag and click on Receivers then click on Add Event Receiver. Then you have to enter the details of the event receiver as shown in figure 12.



Figure 11

  Enter a proper name for the event receiver .Select wso2event form the drop down list for the input event adapter type. Select the data stream name form the drop down list for the event stream . Select wso2event as the message format. Then click on Add Event receiver.



Figure 12
Now you are done with persisting the data stream definition and adding the listener to  the data stream. Again you need to run the ESB proxy service to publish the data.
Now select the Main tag and click on Data Explorer. Then select a table from the drop down list and   click on Search. Then you should be able to view the data published from the ESB as shown in figure 14.

Figure 13

                                                          

Figure 14


How Store the published data in a MySQL database?


 Create a MySQL database.




Create a table in the database.




Download MySQL JDBC connector .jar  and put it in <wso2 DAS Server>/repository/components/lib .
Create a data source in DAS. Select configure tag and click on Datasources and then click on Add Datasource as shown in figure 15.


Figure 15
   

Figure 16
                                

·         Datasource type : RDBMS
·         Name : Proper Name
·         Driver : com.mysql.Driver
·         URL : jdbc:mysql://localhost:3306/<database name>
·         User Name : User Name of the MySQL
·         Password : password of MySQL



Then click on Test Connection and then you will receive a success message as shown in figure 17.

Figure 17

  Now select Main tag and click on Scripts . Then you will be able to see the Available Analytics Scripts console as shown in figure 18. Click on Add New Analytics Script to add a new script.

Figure 18

Now you will receive the Add new Analytics Script  console . Enter a proper name for the Script name. Enter the Spark SQL Queries in the text editor .
Here is an example format for the Spark SQL query.

create temporary table xxx using CarbonAnalytics options (tableName "<data stream name>", schema  "attribute data type");
select * from xxx limit 20;
create temporary table yyy using CarbonJDBC options (datasource "<data source name>", tableName "<table name >");
insert into table yyy select * from xxx       

Figure 19
Then click on Execute Script.  Then you can see the data table under the Execution Results as shown in figure 20 .

Figure 20

 Now go to command prompt and enter the following SQL query.
Select * from newtable;
Then you should be able to see the data which were published from ESB stored in the MySQL table as shown in figure 21.










                   Figure 21

15 comments:

  1. Thanx Lakmini.
    Is there any way to publish multiple services data from WSO2 ESB to WSO2 DAS at one time

    ReplyDelete
    Replies
    1. Hi Lakshitha,
      You can create multiple data streams. Then add event receivers per each data stream in DAS as mentioned in above blog post.

      Delete
  2. This comment has been removed by the author.

    ReplyDelete
  3. I have few questions on this topic. It would greatly help me understand if you could answer these questions.

    1) What is the purpose of persisting data in MySQL (at the end of your blog) using SparkSQL script when you have already persisted Event Stream (stored in underlying H2 Database in EVENT_STORE Record Store) using Home > Main > Manage > Streams ?
    2) Are you suggesting that “Persisting Event Stream” is different from “Persisting Data”? If so, Is Persisting Event Stream acting as a pass-thru database (underlying H2 Database in EVENT_STORE Record Store) before we could eventually store DATA permanently in an external database table (say MySQL table)?
    3) “Persisting Event Stream” data can be purged periodically and so these data are meant to be purged at some point in time. So in order to keep data permanently and use for analytics and reporting purposes, we push that the data to some external database (Say MySQL - will have the data model that suits our business requirements). Please correct me if assumption/understanding is wrong.
    4) Your example shows MySQL RDBMS as the database for persisting data. Can we use NoSQL Cassandra or any other NoSQL database for this instead of RDBMS?

    ReplyDelete
  4. 1)The purpose of persisting data in MySQL is to keep the records externally(in an external database). But if you want you can store the data in H2 database in DAS permanently.

    2). When persist an event stream the data will store in the db in DAS. You can view those tables from "Data Explorer" in DAS.

    3)If we add a schedule for data purging then data will be purged from db in DAS. But if you want you can keep records permanently in db in DAS without adding data purging schedule.

    4)You can use other database instead of MySQL.
    [1].https://docs.wso2.com/display/DAS301/Configuring+Data+Persistence+With+Cassandra

    ReplyDelete
    Replies
    1. Now my questions -
      1) Even though the persisting event stream the data can be stored permanently in the underlying DB in DAS, they can ONLY be viewed from Data Explorer in DAS. These tables cannot be viewed from an external application (say some reporting tool wants to create reports by accessing tables from the DB in DAS). The reason being the way those event streams/tables are stored in the DB - all event streams (tables) and data are stored in the form of ANX___*_ tables as shown below -

      See This Example: I have created 2 event streams in DAS - TEST_STREAM1 and TEST_STREAM2 which you can access as tables from Data Explorer in DAS. But the tables with those names don't exists in the DB (Here instead of H2, I am using MySQL as underlying DB for persisting events). Instead, they are stored in ANX___*_ tables as shown below -

      mysql> show tables;
      +-----------------------------------------------+
      | Tables_in_ANALYTICS_EVENT_STORE |
      +-----------------------------------------------+
      | ANX___7LkGv7Go_ |
      | ANX___7LpgNZQg_ |
      | ANX___8GECxAtQ_ |
      | ANX___8GMmoCp8_ |
      | ANX___8GO8JC2Q_ |
      +-----------------------------------------------+

      If you query ANX___8GECxAtQ_ table, you can see those event streams -
      mysql> select * from ANX___8GECxAtQ_;
      +-------------------+---------------+------+-----------------------------+
      | record_id | timestamp | data | partition_key |
      +-------------------+---------------+------+-----------------------------+
      | TEST_STREAM1 | 1455645182271 | | 32 |
      | TEST_STREAM2 | 1455645182377 | | 62 |
      +-------------------+---------------+------+-----------------------------+

      This way we cannot access tables (event streams) externally. So having external database would be a good choice, right?
      Do you agree?

      Q2) When you create Dashboards, gadgets and charts, you only have option to access event stream data, there is no option to access tables from external database. Could you please guide me on this?

      Q3) Is there any Reports Building functionality besides creating dashboards, gadgets and charts?

      Q4) Using CASSANDRA (NoSQL) as external database: I have already read the document (available on WSO2) you have sent me. I have tried several times following the instructions, I was not able to configure CASSANDRA successfully. If I configure it using XML, the management console won’t let me test whether the connection is healthy or not. If I configure it using Management console itself (going to Datasources and creating new datasource), it says “Invalid Datasource Type” for CASSANDRA. I don’t see any practical example on how to configure CASSANDRA as external database in DAS.

      Is it possible for you to show me how to configure this with screenshots like the way you did for MySQL in your blog?

      Delete
    2. Hi,
      You can create gadget using external databases. Please refer [1]
      [1].http://lcbolgger.blogspot.com/2016/04/analytics-dashboard-how-to-create.html

      Delete
  5. Hi Lakmini Chathurika,

    I am working with WSO2 DAS I see that I can purge data older than 1 day, but how to purge data lesser than a day for the execution Plans streams as I am working on DAS real time anlytics

    ReplyDelete
    Replies
    1. Hi,
      you can purge data for a given time range.Please refer [1].

      [1].https://docs.wso2.com/pages/viewpage.action?pageId=48288726

      Delete
    2. Hi Lakmini,

      I tried to purge data within a time interval as mentioned in the DAS documentation, but then the data is not been deleted in tat time interval. Is there anything that needs to be added ?

      Delete
  6. This comment has been removed by the author.

    ReplyDelete
  7. Hello Lakmini
    I get an error in DAS Console, when i try a proxy service with an eventstream/eventsink in ESB. The eventstream is not listed in DAS web dashboard.
    How do I link the eventstream between WSO2 ESB and WSo2 DAS?

    thank you in advance. - Peter

    the error on DAS command console is:

    [2017-02-15 21:21:12,841] ERROR {org.wso2.carbon.databridge.core.internal.queue.QueueWorker} - Dropping wrongly formatted event sent for -1234
    org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting salesforcestream:1.0.0 of event bundle with events 2
    at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:181)
    at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.toEventList(ThriftEventConverter.java:90)
    at org.wso2.carbon.databridge.core.internal.queue.QueueWorker.run(QueueWorker.java:73)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: org.wso2.carbon.databridge.core.exception.EventConversionException: No StreamDefinition for streamId salesforcestream:1.0.0 present in cache
    at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:166)
    ... 7 more
    [2017-02-15 21:21:12,850] ERROR {org.wso2.carbon.databridge.core.internal.queue.QueueWorker} - Dropping wrongly formatted event sent for -1234
    org.wso2.carbon.databridge.core.exception.EventConversionException: Error when converting salesforcestream:1.0.0 of event bundle with events 1
    at org.wso2.carbon.databridge.receiver.thrift.converter.ThriftEventConverter.createEventList(ThriftEventConverter.java:181)

    ReplyDelete