Skip to main content
Skip table of contents

D3 Python Conventions

Overview

In D3, we support custom Python scripts to be executed within the D3 Python custom image. As an admin user, you can write both Utility Commands and Integration Commands in Python.

For example, you can write a custom Integration Command such as IP Reputation Check, that enriches your data from third-party sources. Then you can write a custom Utility Command in Python that transforms the data to your preferred structure.

All Utility Commands can be created and viewed in the Utility Commands module. All Integration Commands can be created and viewed in their individual Integrations. Both types of commands can be implemented in Python or Codeless Playbook

General Information

Python Version: 3.9.12

Available Third-party Libraries:

Command Type

Utility Command

It is used for

  • custom data transformation or manipulation that cannot be achieved through D3’s out of box Utility commands

  • setting up nested playbooks

The custom utility command can be assigned as a Basic Utility, System Utility or Cyber Utility, and used in Command Task or Conditional Task.

Integration Command

Many security products support REST API integration, and D3 offers additional do-it-yourself integration through custom Python scripts if such integration or command is not covered by the current version of D3 SOAR. Users can follow the API documentation along with this D3’s Python code convention to set up integration commands.

Command Implementation

Python Command Setup

Command basic setup

New Utility Command

Step 1: Navigate to Configuration on the top menu.

Step 2: On the left sidebar, go to Utility Commands.

Step 3: Click the button +. A new Add Command window will pop up.

New Integration Command

Step 1: Navigate to Configuration on the top menu.

Step 2: On the left sidebar, go to Integration.

Step 3: For quick findings, you can search with the Integration name which you want to add the command to

Step 4: Under the search box, select the the target integration.

Step 5: On the right side, click the button + New Command. A new connection window will pop up.

Reader Note

Set the command name to be the same as python function name

For custom Event Intake or Incident Intake commands, the function name should be fetchEvent/fetchIncident.

PY
def fetchEvent():
pass
def fetchIncident():
pass

Command input setup

Add new input parameters for your command

a. Parameter Name: The internal name of the parameter. It should match the parameter name in the function definition

b. Display Name: The display name of the parameter.

c. Parameter Type: The data type of the parameter.

  • Parameter Type options: Text, Number, DateTime, Boolean, Text Array, Number Array, DateTime Array, Boolean Array, JSON Array, JSON Object, Unknown, File, File Array, JSON Array Array

d. Is Required?: whether or not the parameter is required when running the command

e. Parameter Index: the order of the parameter. Starting from 0.

f. Input Type: The data type of this parameter. For more information regarding the Input type, please refer to this document.

g. Default Value: whether the parameter has a default value. It can be empty.

h. Description: The description of the parameter.

i. Sample Data: Sample data of the parameter.

Parameter Type

Custom Input

Command output setup

Each command should return data using pb.returnOutputModel, and users need to add additional fields such as Key Field at the output tabs.

resultData — HTML formatted data displayed in Result tab.

Reader Note

HTML string can be generated from JSON or JSON array by our helper python functions: ConvertJsonObjectArrayToHTMLString and ConvertJsonObjectToHTMLString. (More details check library below)

returnData — Return Data is analogous to the typical returns for function in programming. It is the data to be passed down directly to the next command or to be used in conditional tasks.

keyFields — Set up the Key Fields from the Output configuration at the Output tab of the Python script editing screen. If raw data have too much data, pick out important data fields.

Generally, the common cyber security identifiers such as unique ID, file hash value, CVE number, IP addresses, etc, are suggested to be a key field.

The Key Fields JSON path should be referred to the JSON path in Context Data.

contextData — Context Data is a contextual subset of the Raw Data. Context Data should store information that can be derived for Return Data.

Example of Recorded Future Check IP Reputation Context Data:

JSON
[
  {
    "risk": {
      "score": 0,
      "level": 0,
      "context": {
        "phishing": {
          "score": 0,
          "rule": {
            "count": 0,
            "maxCount": 1
          }
        },
        "public": {
          "summary": [],
          "score": 0,
          "mostCriticalRule": "",
          "rule": {
            "maxCount": 52
          }
        },
        "c2": {
          "score": 0,
          "rule": {
            "count": 0,
            "maxCount": 2
          }
        }
      }
    },
    "entity": {
      "id": "ip:8.8.8.8",
      "name": "8.8.8.8",
      "type": "IpAddress"
    },
    "ipAddress": "8.8.8.8",
    "riskLevel": "Low"
  }
]

rawData — Raw Data should store the raw message returned from 3rd-party API calls.

Example of Recorded Future Check IP Reputation:

JSON
{
  "data": {
    "results": [
      {
        "risk": {
          "score": 0,
          "level": 0,
          "context": {
            "phishing": {
              "score": 0,
              "rule": {
                "count": 0,
                "maxCount": 1
              }
            },
            "public": {
              "summary": [],
              "score": 0,
              "mostCriticalRule": "",
              "rule": {
                "maxCount": 52
              }
            },
            "c2": {
              "score": 0,
              "rule": {
                "count": 0,
                "maxCount": 2
              }
            }
          }
        },
        "entity": {
          "id": "ip:8.8.8.8",
          "name": "8.8.8.8",
          "type": "IpAddress"
        },
        "ipAddress": "8.8.8.8",
        "riskLevel": "Low"
      }
    ]
  },
  "counts": {
    "returned": 1,
    "total": 1
  }
}

Error — Error information you want to show if there is any.

OutputModel in playbook configuration:

For more information regarding Playbook Task Configuration, please refer to this document.

Integration fetchEvent/fetchIncident pass down data

fetchEvent and fetchIncident commands are used to ingest raw data from third-party integrationsources into our D3 system and create Events and Incidents respectively.

For more details about Integration Commands, please refer to this document.

For more details about how to schedule and ingest Events and Incidents, please refer to this document.

fetchEvent and fetchIncident commands should also utilize another field in our D3 OutputModel: Passdown data.

Since fetchEvent and fetchIncident commands are usually time-sensitive, it is important to schedule the next start time for the next round of fetch events/incidents. In our system commands, this situation is handled as long as the start time of the command and the schedule interval is filled. If a custom fetchEvent/fetchIncident is needed, the next start time can be handled by the passdown data.

Reader Note

All parameters in a function can utilize the Passdown data from last fetchEvent/fetchIncident.

  1. Setup command input as mentioned above.

  2. Add a timespan parameter. Please note that this timespan should equal to or larger than your schedule interval time in order to avoid any fetchevent delay.

    • Time Span: the input to calculate the next fetch event input start time

    • Schedule Interval: the interval time to actually execute the fetch event command.

  1. Write your main function logic.

  2. Define your passdown data.

CODE
passdown = {
"startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
}

In this example, next startTime is calculated by checking the current time and adding timeSpan.

Put your passdown data in the outputModel.

Next time in the schedule when this command is executed, the startTime will be the one in your passdown data.

CODE
passdownData = passdown if timeSpan else {}

Integration connection parameter setup

Due to the fact that Integration has to connect to third-party services, a connection has to be established for API calls.

In D3, you can create different types of connections with your desired parameters.

  1. Configure your connection parameters

  1. Add a custom parameter

  1. Add a new connection and fill in the connection data

Reader Note

  • If the parameter is set to be sensitive, the field will be masked.

    image-20240111-193637.png
  1. For integration commands, the global variable runtime['connector'] is the dictionary that contains Connection Parameters. The key for Connection Parameters are lowercase and without space

e.g. runtime['connector']['serverurl'] or runtime.get("connector").get("'serverurl'", ""

Command functionality definition

Utility command example

Use case: Convert datetime format

Method: Define the input datetime format (old Pattern) and convert it into another datetime format (newPattern)

  • Input — Datetime

  • Output — Datetime

Integration command example

Fetch event sample code

PY
# define a _sendRequest helper function to handle all requests in this integration
def _sendRequest(self, method, resource=None, params=None, body=None):
    try:
        # this integration needs a connector
        connector = pb.runtime.get("connector")
        if not connector:
            raise Exception("Connection is required")

        # get the connector server url
        serverurl = connector.get("serverurl", "").strip()
        if not validators.url(serverurl):
            raise Exception("Server Url is not valid in format")
        # get the rest of connector parameters
        serverurl = serverurl.rstrip("/")
        username = connector.get("username", "").strip()
        apikey = connector.get("apikey", "").strip()
        version = connector.get("version", "").strip()

        # send request and error handling
        url = f"{serverurl}/{version}/"
        headers = {"Accept": "application/json", "Content-Type": "application/json", "Accept-Encoding": None}
        r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, auth=(username, apikey))
        data = r.json() if "application/json" in r.headers["Content-Type"].lower() else r.text
        if not r.ok:
            return (False, data, f"Status Code: {r.status_code}, Message: {r.reason}, {r.text}")
        else:
            return (True, data, None)
    # handle ConnectionError the exception
    except requests.ConnectionError:
        raise Exception("The remote server refused to connect, please check Server URL in connection or API service availability.")
    except Exception as ex:
        raise Exception(_fmtExeptionErrMsg(ex))

def fetchEvent(args):
    error = ""
    rawData = []
    result = ""
    keyFields = ""
    passdown = {}
    try:
        # get the command parameters. args index following the defined index.
        startTime = args[0]
        endTime = args[1]
        index = args[2].strip() if args[2] else ""
        limit = _getValidInputValue(args[3], 1, 100, 20)
        offset = int(args[4]) if int(args[4]) >= 0 else 0
        timeSpan = int(args[6]) if int(args[6]) > 0 else 0
        queryString = args[5]
        # prepare body and send request
        body = _prepareRequestBody(startTime, endTime, limit, offset, queryString, "timestamp", timeSpan)
        pb.log(json.dumps(body))
        success, rawData, error = _sendRequest("GET", f"data/{index}/_search", data=json.dumps(body))
        # structure the result output
        if not success:
            error = "Search events failed: " + error
        else:
            result = rawData.get("hits", {}).get("hits")
        if result:
            keyFields = {
                "EventIDs":  [sub.get("_id") for sub in result],
                "EventIndexes": [sub.get("_index") for sub in result],
                "EventTypes": [sub.get("_type") for sub in result]
            }
        passdown = {
            "startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
        }
    except Exception as ex:
        error = "Search events failed: " + _fmtExeptionErrMsg(ex)
        pb.log(traceback.format_exc())
    # return the D3 outputmodel with the request result
    return pb.returnOutputModel(
        result,
        rawData if not error else "Failed",
        keyFields, "",
        rawData,
        error,
        passdownData = passdown if timeSpan else {}
        )

Reader Note

Please be aware that, all custom commands within an integration will be put into one python script.

For advanced users, it will be a good practice to create helper functions such as _sendrequest to avoid code duplication

Command retry

Sometimes command may fail due to some unexpected reasons. You can return a special "__RETRY__” as returnData in D3 output model and our system will rerun the command. Also, the retry count can be retrieved as pb.runtime.get('retrycount', 0).

Sample code as follow:

PY
def retrySample():
    errors = []
    rawData = []
    result = []
    returnData = "Failed"
    # get the retry count
    retrycount = pb.runtime.get("retrycount", 0)
    # retry if updateEvents throw an expection and the retry count is less than 10. It will stop at the 10th time and return "Failed"
    try:
        return updateEvents()
    except Exception:
        if retrycount < 10:
            return pb.returnOutputModel(result, "RETRY", "", "", rawData, errors)
    return pb.returnOutputModel(result, returnData, "", "", rawData, errors)

D3 Python Coding Conventions

  • Follow camelCase to create method name.

  • Do not use print function, or any other function that outputs to the standard output.

  • Do not define a class named PlaybookRunTime. It is reserved for D3 playbook engine.

  • Do not define a class named APIError. It is reserved for D3 playbook engine.

  • Do not define a class named HTTPAPIError. It is reserved for D3 playbook engine.

  • Do not define a variable or object named pb. It is reserved for D3 playbook engine.

  • Do not define a variable or object named args. It is reserved for D3 playbook engine.

  • Do not define a variable or object named runtime. It is reserved for D3 playbook engine.

  • Do not define a variable or object named input. It is reserved for D3 playbook engine.

  • Do not use the different method name from the command name in the command script.

  • Do not import a library which is neither Python standard library nor D3 3rd party library.

Debugging and testing

Use pb.log() to log data during command execution.

  • Logged data will be displayed in command testing, and in playbook runtime while testing a playbook.

  • Logged data will be saved even if the command exits due to an exception.

Use the traceback library to get the full stack trace of exceptions.

  • Stack traces will match the line number of the command script if a command exits due to an exception.

  • Use traceback.format_exc() to get full stack trace during exception handling.

D3 Python Library

Global Runtime Variables

Integration command only

runtime["connector"]

CODE
{
  "custom_passwordvaultconnection": "",
  "schedulehealthchecking": "False",
  "schedulehealthcheckinginterval": "5"
  -- any defined connection paramater will also be here --
}

runtime["retrycount"]

return the number of the retry count

Command execution helper functions

In your custom python script, you can also use the D3 out-of-box System Utility Command or Integration Command.

To execute Utility Commands:

  • Follow this format: D3.Utility.{Command Name}<{Command Type}>(parameters)

PY
def Demo():
return D3.Utility.concatToRear <Text>("Join ", "Text")
  • Please be aware that Command Name is the internal name of the command.

Command Type is the first parameter type of the command. Here is the possible parameter data type:

  • Text

  • Number

  • DateTime

  • Boolean

  • Text Array

  • Number Array

  • DateTime Array

  • Boolean Array

  • JSON Array

  • JSON Object

  • Unknown

  • It will return our D3 Outputmodel

To execute Integration Commands:

Similar to the format of executing a Utility Command. Here is the format for executing Integration Command: D3.Integration.{Integration Name}.{Command Name}(parameters).

  • Note that data type is not necessary

  • The limitations are:

    • Only custom Integration Commands can execute other Integration Commands that belong to the same Integration

    • Custom Utility Commands cannot execute any Integration Commands.

sample ipReputationCheck() is a system command. sample fetchEvent is a custom command:

PY
def ipReputationCheck():
    return pb.returnOutputModel('', '', '', '', {"EventRawData": "rawData"}, '')
CODE
def fetchEvent():
    return D3.Integration.Demo_Integration.ipReputationCheck()

General helper functions:

Each helper function can be used with pb.{function name}

isJson(jsonString)

  • return True or False if it is JSON

  • Parameters:

    • jsonString: the JSON string to validate

Sample Input: pb.isJson('{"Simple":"Simple JSON"}')

Sample Output: True

log(message)

  • Log any message to show in the Custom Log tab. The tab will only show when testing command

Sample Input: pb.log("Debug Line")

Sample Output:

returnOutputModel( result, returnData, outputData, contexData, rawData, error, passdownData={})

  • It will generate our D3 output model. It is recommended to be used in writing custom commands.

  • Parameters:

    • result: HTML formatted data displayed in Result tab

    • returnData: Simple data that can be directly used by the next commands

    • outputData: Should be left empty. This will be automatically generated using Key Fields configuration

    • contextData: Contextual data to be shared with other tasks

    • rawData: Raw data from the command

    • error: Error details

    • passdownData: A Event Intake and Incident Intake related field that can pass down parameter values to the next scheduled instance.

Sample Input:

pb.returnOutputModel("<body><h1>Sample Result Data</h1></body>", "Sample Return Data", "", {"SampleContextData": "ContextData"}, {"SampleRawData": "Rawdata"}, "Sample Error", passdownData = {})

Sample Output:

PY
{
  "result": {
    "description": "<body><h1>Sample Result Data</h1></body>",
    "references": [],
    "actions": []
  },
  "returnData": "Sample Return Data",
  "outputData": "",
  "contextData": {
    "SampleContextData": "ContextData"
  },
  "rawData": {
    "SampleRawData": "Rawdata"
  },
  "error": "Sample Error",
  "passdownData": {},
  "customLog": "",
  "others": {}
}

uploadFile(file)

  • Upload the file to D3 playbook file table

  • Parameters:

    • File JSON object to be uploaded to the file table. Format as follows: {"file": (filename,content)}. content is binary data of the file.

Sample Input: pb.uploadFile({"file": ("Sample File","U2FtcGxlIFRleHQ=")})

Sample Output:

PY
{
  "fileId": "1993",
  "fileName": "Sample File",
  "md5": "91BF9C6701F7107E4010735EAE0ED2D5",
  "sha1": "07C6F613BD4398515A56C4B72C793967FD7B5F8C",
  "sha256": "3E1A4B0BC62756ED64CA6073F3A62FA6ADE4B513AE4E95ED6DA54754CE7C8E2A"
}

downloadFile(fileid, filesource) > IR_ATCHMNT, PB_FILE, KC_AF_FILE

  • Download the file from D3 file tables

  • Parameters:

    • fileid: the file ID in the file table

    • filesource: one of the three D3 file table:

      • IR_ATCHMNT: Incident attachment files

      • PB_FILE: Playbook files

      • KC_AF_FILE: Artifact files

Sample Input: pb.downloadFile("1993", "PB_FILE") ('Sample File', b'U2FtcGxlIFRleHQ=')

Sample Output: ('Sample File', b'U2FtcGxlIFRleHQ=')

Note that it works best paired with the decode function

Sample Input: filename, filecontent = pb.downloadFile("1993", "PB_FILE") filecontent.decode("utf-8")

Sample Output: Sample Text

ConvertJsonObjectArrayToHTMLString(inputArray, showHeader=True, isCol="col")

  • Format the JSON Object array to an HTML table string to be put into the Result field in the D3 outputmodel.

  • Parameters:

    • inputArray: the JSON object array to be converted

    • showHeader: whether to show the key name as header in the table. Default value is True. Can be set to False.

    • isCol: whether to format the table in a column style. Default value is "col". Can be "row"

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }])

Sample Output

HTML
<table class='cc-table horizontal-table'><tr><th>fieldName</th><th>value</th><th>displayName</th></tr><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], False)

Sample Output:

HTML
<table class='cc-table horizontal-table'><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], True, "row")

Sample Output:

HTML
<table class='cc-table vertical-table'><tr><th>fieldName</th><td>EventKey</td><td>EventName</td><td>Severity</td><td>Status</td></tr><tr><th>value</th><td>9635</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Medium</td><td>New</td></tr><tr><th>displayName</th><td>Unique Event Key</td><td>Event name</td><td>Severity</td><td>Status</td></tr></table>

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.