Sample Data Streaming Clients
Below are sample clients that could be used to connect to an endpoint and consume data.
Sample Java Client for Streaming data
import java.io.*;
import java.lang.String;
import java.net.HttpURLConnection;
import java.net.URL;
public class testhttpbasicauth {
public static void main(String[] args) throws IOException, InterruptedException {
String streamURL = "https://<customername>.<domain>.com/stream/<datasource>_<streamname>/subscription/part/1/data.json";
String charset = "UTF-8";
HttpURLConnection connection = null;
InputStream inputStream = null;
String userName="myuser";
String password = "mypassword";
while(true){
try
{
connection = getConnection(streamURL, userName, password);
inputStream = connection.getInputStream();
int responseCode = connection.getResponseCode();
if (responseCode >= 200 && responseCode <= 299)
{
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, charset));
String line = reader.readLine();
while(line != null)
{
System.out.println(line);
line = reader.readLine();
}
else
{
handleNonSuccessResponse(connection);
}
}
catch (Exception e)
{
e.printStackTrace();
if (connection != null)
{
handleNonSuccessResponse(connection);
}
}
finally
{
if (inputStream != null)
{
inputStream.close();
}
}
Thread.sleep(10000);//Pause 10 seconds before reconnect
}
}
private static void handleNonSuccessResponse(HttpURLConnection connection) throws IOException
{
int responseCode = connection.getResponseCode();
String responseMessage = connection.getResponseMessage();
System.out.println("Non-success response: " + responseCode + " -- " + responseMessage);
}
private static HttpURLConnection getConnection(String urlString, String userName, String password) throws IOException
{
String userpass = userName + ":" + password;
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setReadTimeout(1000 * 60 * 60);
connection.setConnectTimeout(1000 * 10);
connection.setRequestProperty("Accept-Encoding", "UTF-8");
String basicAuth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(userpass.getBytes());
connection.setRequestProperty ("Authorization", basicAuth);
return connection;
}
} |
Sample C# Client for Streaming data
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace EndPointReader
{
class Program
{
static void Main(string[] args)
{
String streamURL = "https://<customername>.<domain>.com/stream/<datasource>_<streamname>/subscription/part/1/data.json";
HttpWebResponse webresponse = null;
int responseCode = 0;
while (true)
{
try
{
//Create the HttpWebRequest object
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(streamURL);
//Set credentials
req.Credentials = new NetworkCredential ("myuser","mypassword");
//Set the connection timeout to 10 second
req.Timeout = 10000;
//Set the ReadWrite timeout to 60 second
req.ReadWriteTimeout = 60000;
req.Headers.Add("Accept-Encoding: UTF-8");
webresponse = (HttpWebResponse)req.GetResponse();
responseCode = (int)webresponse.StatusCode;
// Get the stream associated with the response.
Stream responseStream = webresponse.GetResponseStream();
StreamReader reader = new StreamReader(responseStream);
if (responseCode >= 200 && responseCode <= 299)
{
Console.WriteLine("Successfully Retrived the response: " + responseCode);
string line = reader.ReadLine();
while (line != null)
{
Console.WriteLine(line);
line = reader.ReadLine();
}
}
else
{
string responseMessage = reader.ReadToEnd();
Console.WriteLine("Non-success response: " + responseCode + " \r\n" + responseMessage);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Thread.Sleep(1000 * 10);
}
}
}
} |
Sample Python 2.7 Client for Streaming data
#!/usr/bin/env python
#include the httplib library
import httplib
#include the time library
import time
#Read buffer size
BUFFERSIZE=1024
#Pause seconds
PAUSESECONDS=10
#User Name
user='myuser'
#Password
password = 'mypassword'
auth_header = 'Basic %s' % (":".join([user,password]).encode('Base64').strip('\r\n'))
conn = httplib.HTTPSConnection("<customername>.<domain>.com")
#main loop
while True:
#catch all errors
try:
#
conn.request("GET", "/stream/<datasource>_<streamname>/subscription/part/1/data.json", {'Authorization':auth_header})
response = conn.getresponse()
if response.reason == "OK":
print "Start reading data:"
#Read from the stream
while True:
data = response.read(BUFFERSIZE)
#handle data
print data,
else:
#handle non-success response
print "Non-success response: " + response.status
#make sure to close the connection before the next iteration
conn.close()
except httplib.InvalidURL as e:
print "Wrong URL used ", e.reason
except httplib.HTTPException as e:
print e.reason
except:
print "Unknown exception"
print "Pause for " , str(PAUSESECONDS) , " seconds before retry"
#Pause 10 seconds before reconnect
time.sleep(PAUSESECONDS)
|
Sample Python 3.7 Client for Streaming data
#!/usr/bin/env python
#include the http.client library
import http.client
import urllib.request
import base64
import traceback
import json
#include the time library
import time
#Pause seconds
PAUSESECONDS=10
#User Name)
user='myuser'
#Password
password = 'mypassword'
userAndPass = base64.b64encode(":".join([user,password]).encode()).decode()
#main loop
try:
from urllib.request import Request, urlopen
except ImportError:
print (ImportError)
while True:
#catch all errors
try:
request = Request("https://<customername>.<domain>.com/stream/<datasource>_<streamname>/subscription/<subscription_name>/part/1/data.json")
request.add_header('Authorization','Basic %s' % userAndPass)
response = urlopen(request)
if response.status == 200:
print ("Start reading data:")
for line in response:
print (line.decode("utf-8",'ignore'))
else:
#handle non-success response
print (response.status)
except http.client.InvalidURL as e:
print (e)
except http.client.HTTPException as e:
print (e)
except Exception as e:
print (e)
print ("Pause for " , str(PAUSESECONDS) , " seconds before retry")
#Pause 10 seconds before reconnect
time.sleep(PAUSESECONDS) |
Sample Ruby 1.9 Client for Streaming data
require "net/https"
require 'net/http'
require "uri"
class Net::HTTPResponse
attr_reader :socket
end
#Pause seconds
PAUSESECONDS=10
streamUrl ="https://<customername>.<domain>.com/stream/<datasource>_<streamname>/subscription/part/1/data.json"
while true do
print "Connecting.."+"\n"
begin
uri = URI.parse(streamUrl)
http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout=10
http.read_timeout=3600
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
request = Net::HTTP::Get.new(uri.request_uri,{"Accept-Encoding"=>"UTF-8"})
request.basic_auth 'myusername', 'mypassword'
print "connected, start reading data"+"\n"
http.request(request) do | response |
resCode = response.code
if resCode.to_i >= 200 and resCode.to_i <= 299
response.read_body do | chunk |
puts chunk
end
else
print "status code :"+resCode+"\n"
end
end
#handle non-success response
rescue Exception => e
begin
http.finish()
puts"Exception while connecting...";
puts e.message
end
http.finish()
sleep PAUSESECONDS #Pause 10 seconds before reconnect
end
end |