Datameer is an extremely user friendly software platform for self-service end-to-end big data discovery and analytics. You can do just about anything in the realm of descriptive analytics using the Datameer GUI. You can automate a complex series of steps just by pointing and clicking with software that is as easy to use as Microsoft Office.

But there are some activities that require using the Datameer REST API. A prospect recently asked me “How can I run the same Datameer workbook on 50 different data sets that share the same column layout without doing the same thing 50 times manually using the GUI?” The answer is creating a script that invokes the Datameer REST API. The beauty of a REST API is that you can use the scripting language of your choice to tackle the problem. I ended up creating sample scripts in both Bash and python to illustrate that you can pick your language.

The first order of business is defining the problem. In this case, we needed a mechanism to iterate over a set of data sources all of which shared the same column layout. While doing that iteration, the script needs to update the data source of the Datameer Workbook in question. In fact, the script also had to create a new data source first, and then update the workbook data source. And finally, while iterating through the list, the script needs to execute the newly updated workbook. Further, I chose to implement a mechanism to check that the workbook had completed prior to starting the next workbook, rather than move on to the next cycle without waiting for the workbook job to complete. The data source I used in Datameer is called a Data Link. Data Links extract a sample data set from the source, without duplicating the entire source set. When you prefer to not only extract a sample but also get the entire data set available in Datameer’s HDFS folder, then use Import Jobs.

Pseudo code for my example looks like this:

WHILE LOOPING THROUGH LIST OF DATA SOURCES

CREATE A NEW DATA LINK for the CURRENT DATA SOURCE

UPDATE THE WORKBOOK DEFINITION TO USE THE NEW DATA LINK

RUN THE WORKBOOK

CHECK THAT THE WORKBOOK IS DONE

I ended up creating two functions for steps that got repeated a lot, which were 1) updating JSON files that contain the definitions of Data Links and Workbooks, and 2) checking on job status.

Notes on doing a REST API call:
In bash: I used the curl command to do the REST call. I also used back ticks to place the output of the curl command into a variable. The line of code looks like this:

# Note the use of variables to store username, password, host, port, etc.
# Use of the correct quotes is important.
RET_VAL=`curl -u $USER:$PASSWORD -X GET "http://$HOST:$PORT/rest/job-configuration/job-status/$CONFIGURATION_ID"`
# Unfortunately, I chose a pretty ugly method to parse the variable RET_VAL. A combo of grep, awk and tr.
STATUS=`echo "$RET_VAL" | grep "jobStatus" | awk -F: '{print $2}' | tr -d '[:space:]' | tr -d \"`

In python: I used a python module called requests to invoke Datameer’s REST API.

# The python implementation is cleaner. Run the request, using the auth parameter and the HTTPBasicAuth function to handle authentication.
ret_val = requests.get("http://" + host + ":" + port + "/rest/job-configuration/job-status/" + configuration_id, auth=HTTPBasicAuth(user, password))
# Then convert the return value to JSON.
params = ret_val.json()
# And finally pull out the parameter that you want.
status = params['jobStatus']

Below you can find the source code in both Bash and python.

The Bash example.


#!/bin/bash

# Example script for creating several datalinks on similar tables and then creating
# workbooks to run on those data links.

# Script assumes you have one working data link and one working workbook, and have
# modified the underlying JSON to include some tags that sed can look for and replace.

# Datameer environment parameters
USER=your_user
PASSWORD=your_password
HOST=localhost
PORT=7996

# Script parameters
# Parameter 1: DATALINK_TEMPLATE_FILE - Contains datalink JSON to be modified. Script does not alter
#              this file. Instead, it creates a temporary file and runs that.
# Parameter 2: WORKBOOK_TEMPLATE_FILE - Contains workbook JSON to be modified. Script does not alter
#              this file. Instead, it creates a temporary file and runs that.
# Parameter 3: LIST_OF_TABLES - Contains a list of the target tables to be processed.

# Assign parameters to global variables.
DATALINK_TEMPLATE_FILE=$1
WORKBOOK_TEMPLATE_FILE=$2
LIST_OF_TABLES=$3

# Temporary file that contains edits when they happen.
TEMP_FILE=zzz_temp_file.json

# Sub Routine: do_change. Runs sed utility to do update.
# Positional arguments
# 1. Value to look for.
# 2. New value to put in.
do_change () {
# Get positional arguments for function.
CUR_VAL=$1
NEW_VAL=$2

	echo "Updating value $CUR_VAL with $NEW_VAL..."
	sed -e "s/$CUR_VAL/$NEW_VAL/g" -i bak $TEMP_FILE
}

# Sub Routine: check_job_status. Stay in loop until job completes.
# Positional arguments
# 1. Job ID.
check_job_status () {
# Get positional arguments for function.
CONFIGURATION_ID=$1

	echo "Checking job status for $JOB_ID"
	echo "'http://localhost:7996/rest/job-configuration/job-status/$JOB_ID'"

    # Run loop until status shows job is done.
    while [[ 1 == 1 ]]; do
	    RET_VAL=`curl -u $USER:$PASSWORD -X GET "http://$HOST:$PORT/rest/job-configuration/job-status/$CONFIGURATION_ID"`
    	STATUS=`echo "$RET_VAL" | grep "jobStatus" | awk -F: '{print $2}' | tr -d '[:space:]' | tr -d \"`
    	echo $STATUS

    	sleep .5

    	if [[ $STATUS = "COMPLETED" || $STATUS = "ERROR" || $STATUS = "COMPLETED_WITH_WARNINGS"  || $STATUS = "ABORTING" || $STATUS = "_NONE" ]]
    	then
    		break
    	fi
    done
}

# Main loop for running through all tables in the list.
while read table_name; do

	# Edit Datalink JSON
	cp $DATALINK_TEMPLATE_FILE $TEMP_FILE
	do_change "
<TABLE>" $table_name $DATALINK_TEMPLATE_FILE
	do_change "" $table_name $DATALINK_TEMPLATE_FILE

	# Issue command to create new Datalink. Capture result in variable.
    RET_VAL=`curl -u $USER:$PASSWORD -X POST -d @$TEMP_FILE "http://${HOST}:${PORT}/rest/import-job"`
	# Display result to screen.
    echo "$RET_VAL"
    # Parse result to get configuration id.
    CONFIGURATION_ID=`echo "$RET_VAL" | grep "configuration" | awk -F: '{print $2}' | tr -d '[:space:]'`

	#Display result to screen.
	echo $CONFIGURATION_ID

	# Now run the Datalink to get sample data.
	curl -u $USER:$PASSWORD -X POST "http://${HOST}:${PORT}/rest/job-execution?configuration=${CONFIGURATION_ID}"	

	# Pause until the data link finishes.
	check_job_status $CONFIGURATION_ID	

	# Edit Workbook JSON to include the new datalink
	cp $WORKBOOK_TEMPLATE_FILE $TEMP_FILE
	do_change "" $table_name $WORKBOOK_TEMPLATE_FILE	

	# Issue command to create new workbook
    RET_VAL=`curl -u $USER:$PASSWORD -X POST -d @$TEMP_FILE "http://$HOST:$PORT/rest/workbook"`
	# Display result to screen.
    echo "$RET_VAL"
    # Parse result to get configuration id.
    CONFIGURATION_ID=`echo "$RET_VAL" | grep "configuration" | awk -F: '{print $2}' | tr -d '[:space:]'`

	# Now run the new workbook.
	#Display result to screen.
	echo $CONFIGURATION_ID
	# Now run the Workbook to get sample data.
	curl -u $USER:$PASSWORD -X POST "http://${HOST}:${PORT}/rest/job-execution?configuration=${CONFIGURATION_ID}"

	# Pause until the workbook finishes.
	check_job_status $CONFIGURATION_ID	

done < $LIST_OF_TABLES

And the python example.

# Example script for creating several datalinks on similar tables and then creating
# workbooks to run on those data links.

# Script assumes you have one working data link and one working workbook, and have
# modified the underlying JSON to include some tags that sed can look for and replace.

import sys
import shutil
import requests
from requests.auth import HTTPBasicAuth
import json
import time

# Datameer environment parameters
user = "your_user"
password = "your_password"
host = "localhost"
port = "7996"

# Get input parameters
datalink_template_file = sys.argv[1]
workbook_template_file = sys.argv[2]
list_of_tables = sys.argv[3]

# Temp file to use for creating dynamic JSON to create Datameer objects.
temp_file = "./zzz_temp_file.json"
temp_file2 = "./zzz_temp_file.2.json"

# Function for updating JSON file contents
# Open one file for read, another for write. Make changes. At end, swap files.
def do_change(cur_val, new_val, file_name, file_name2):
    with open(file_name, "rt") as f_in:
        with open(file_name2, "wt") as f_out:
            for line in f_in:
                f_out.write(line.replace(cur_val, new_val))
    f_in.close()
    f_out.close()
    shutil.copy(file_name2, file_name)

def check_job_status(configuration_id):
    print ("Checking job status for" + configuration_id)
    print ("'http://localhost:7996/rest/job-configuration/job-status/" + configuration_id + "'")

        # Run loop until status shows job is done.
    while 1 == 1:
        ret_val = requests.get("http://" + host + ":" + port + "/rest/job-configuration/job-status/" + configuration_id, auth=HTTPBasicAuth(user, password))
        params = ret_val.json()
        print (params)
    	status = params['jobStatus']
        print (status)
    	time.sleep(1)

    	# Check status and break out of loop if when job is done.
    	if (status == "COMPLETED") or (status == "ERROR") or (status == "COMPLETED_WITH_WARNINGS") or (status == "ABORTING" ) or (status == "_NONE"):
    		break

# Main loop iterating through list of tables.
with open(list_of_tables,'r') as table_file:

    for table_name in table_file:

        table_name = table_name.rstrip()
        if not table_name: continue

        print ("Copying " + datalink_template_file + " for " + table_name)
        # Copy the template file to a temporary working file.
        shutil.copy(datalink_template_file, temp_file)

        do_change ("<TABLE_NAME>", table_name, temp_file, temp_file2)
        do_change ("<LINK_NAME>", table_name, temp_file, temp_file2)

        with open(temp_file, 'r') as content_file:
            payload = content_file.read()

        # Add new datalink to Datameer.
        ret_val = requests.post("http://" + host + ":" + port + "/rest/import-job", auth=HTTPBasicAuth(user, password), data=payload)
        print (ret_val.json())
        params = ret_val.json()
        configuration_id = str(params['configuration-id'])
        print (configuration_id)

        # Now start the datalink to get the sample data.
        ret_val = requests.post("http://" + host + ":" + port + "/rest/job-execution?configuration=" + configuration_id, auth=HTTPBasicAuth(user, password))

        # Pause until the data link finishes.
        check_job_status (configuration_id)

        # Edit Workbook JSON to include the new datalink
        shutil.copy(workbook_template_file, temp_file)
        do_change ("<LINK_NAME>", table_name, temp_file, temp_file2)
        with open(temp_file, 'r') as content_file:
            payload = content_file.read()

        # Issue command to create new workbook
        ret_val = requests.post("http://" + host + ":" + port + "/rest/workbook", auth=HTTPBasicAuth(user, password), data=payload)        

        # Display result to screen.
        print(ret_val)
        # Parse result to get configuration id.
        params = ret_val.json()
        configuration_id = str(params['configuration-id'])
        print (configuration_id)	

        # Now run the Workbook to do calculation.
        ret_val = requests.post("http://" + host + ":" + port + "/rest/job-execution?configuration=" + configuration_id, auth=HTTPBasicAuth(user, password))

        # Pause until the workbook finishes.
        check_job_status (configuration_id)

Back to the bash example.
Back to the python example.

Advertisements