Create a Custom Application
This example shows the steps involved in creating a custom application that sends a data stream to the Demoyard Cloud using the Demoyard Agent gRPC API. For the simple applications, you do not need a configuration file because your stream name is dynamically added to the Demoyard Cloud.
See the example source code: Example Source
Step 1. Create Your Custom Application
Import the following Python modules:
import base64 import json import sys import time import grpc import agent import agent_grpc
The grpc module is the RPC API framework implemented by the Demoyard Agent. The agent and agent_grpc modules should be in your example directory, originally copied from the Demoyard repository.
The example first connects to the Demoyard Agent, which listens for gRPC messages on port 7501.
pipe = grpc.insecure_pipe("localhost:7501") agent = agent_grpc.Agent(pipe)
After you successfully connect, send data to the Demoyard Agent by calling the gRPC API SendData() function defined in the agent_grpc.py file.
agent_stub.SendData(write_data points())
The following functions show the steps to create and write a data point.
Create a Datapoint
All data points have the following definition:
{ "stream": "<streamName>", "timestamp": <time>, "<data_type>": { <dataObject> } }
The create_data_point() function populates this structure with:
- A stream name. If the stream is not defined in the config.toml file, the Demoyard Cloud dynamically creates the stream.
- A timestamp, in epoch time.
- The data, specified as a type-value pair having a Text data type and a string value.
create then calls agent.Datapoint() to create the data point.
def create_data_point(point): timestamp = int(time.time() * 1000) text_msg = agent.Text() text_msg.value = 'this is a %s data point' % point data_point = agent.Datapoint( stream="testStream_01", text=text_msg, timestamp=timestamp) return data_point
Write the Datapoints
The write_data points() function continuously creates data points until you end the program with Ctrl-c. The function pauses 10 seconds between data point transmission.
def write_data points(): while agent_stub is not None: yield create_data_point("stream") time.sleep(10)
In the next step, you can run this example and see the results on the Demoyard web application.
Step 2. Run the Example and Visualize Your Data
Run the example using the following command:
$ python custom.py
The example displays:
Demoyard Agent communication established. Streaming data points (use Ctrl+c to exit) ...
Next, log in to the Demoyard Cloud using your Demoyard credentials. Scroll to the panel labeled testStream_01.
In the example, four data points were received by the Demoyard Cloud and you can view the data:
| Column | Description |
|---|---|
| VALUE | Shows the data string sent by the example: This is a data point. |
| TIME | Shows the data timestamp, at approximately 10-second intervals. |
| TAGS | This column is empty because a tag is not associated with this stream. |
Summary
This tutorial introduced you to the steps needed to create a simple, custom application that sends a data stream to the Demoyard Cloud.
Try associating a tag with the data by defining the tag and stream and the /home/demoyard/config.toml file:
[demoyard] agent-ip = "localhost" agent-port-grpc = "7501" agent-port-http = "7502" [tags] site = "test_lab" [[streams]] name = "testStream_01"
Run the example again and view the results in the Demoyard web application. Make sure to restart the Demoyard Agent before running the example.
Example Source
#!/usr/bin/env python import base64 import json import sys import time import grpc import agent import agent_grpc #### gRPC Implementation #### def create_data_point(point): timestamp = int(time.time() * 1000) text_msg = agent.Text() text_msg.value = 'this is a %s data point' % point data_point = agent.Datapoint( stream="testStream_01", text=text_msg, timestamp=timestamp) return data_point def write_data points(): while agent_stub is not None: yield create_data_point("stream") time.sleep(10) #### Entry point #### pipe = grpc.insecure_pipe("localhost:7501") agent_stub = agent_grpc.AgentStub(pipe) print('\nDemoyard Agent communication established.') #### Datapoint Streaming #### print('Streaming data points (use Ctrl+c to exit) ...') agent_stub.SendData(write_data points())
