Skip to main content
Skip table of contents

D3 Python Conventions

Overview

In D3, we support custom Python scripts to be executed within the D3 Python custom image. As an admin user, you can write both Utility Commands and Integration Commands in Python.

For example, you can write a custom Integration Command such as IP Reputation Check, that enriches your data from third-party sources. Then you can write a custom Utility Command in Python that transforms the data to your preferred structure.

All Utility Commands can be created and viewed in the Utility Commands module. All Integration Commands can be created and viewed in their individual Integrations. Both types of commands can be implemented in Python or Codeless Playbook.

General Information

Python Version: 3.9.12

Available Third-party Libraries:

Command Type

Utility Command

Utility command is used for:

  • Custom data transformation or manipulation that cannot be achieved through D3’s out-of-box Utility commands

  • Setting up nested playbooks

The custom utility command can be assigned as a Basic Utility, System Utility or Cyber Utility, and be used in Command Task or Conditional Task.

Integration Command

Many security products support REST API integration, and D3 offers additional do-it-yourself (DIY) integration through custom Python scripts if an integration or command is not addressed by the current version of D3 SOAR. Users can follow the API documentation, along with D3’s Python code convention, to set up integration commands.

READER NOTE

When an integration or command is not covered within the D3 VSOC integration system, or the built-in command cannot perfectly match your use cases, you may write your own Python scripts and execute it within the D3 Python Docker Image to create Integration Custom Commands (ICCs). ICCs provide a high level of flexibility and customization for users to tailor their integrations to precise operational needs.

WARNING

Contact D3 if the requirement changes are related to Python environment configuration. These include:

  • Python Version Management

  • Python Library Add/Delete

  • Python Library Version Management

  • Environment Variables Setup

  • File Path Management

  • Execute System Command-Lines for Image

Custom Command

Special Use Cases on Creating Custom Command

Case 1: Create Custom Command based on System Command

  • Clone Command

Case 2: Create Data Ingestion Commands (fetchEvent/fetchIncident)

  • Fields Setup

  • Fields Mapping Setup

  • Scheduled Time Passdown

Case 3: Create Codeless Playbook

Creating Your Custom Integration

Step 1: Navigate to Configuration on the top menu.

Step 2: Click on the Integrations icon on the vertical left sidebar.

Step 3: Click on the + button. A new Add Integration window will pop up.

Step 4: Enter an integration name in the Add Integration window and click on the Save button.

Frame 1 (10)-20240320-232433.png

Command Implementation

Python Command Setup

Command Basic Setup

New Utility Command

Step 1: Navigate to Configuration on the top menu.

Step 2: On the left sidebar, go to Utility Commands.

Step 3: Click the + button. A new Add Command window will pop up.

New Integration Command

Step 1: Navigate to Configuration on the top menu.

Step 2: Click on the Integrations icon on the left sidebar.

Step 3: Search the name of the integration to which you wish to add the command.

Step 4: Select the the target integration under the search box.

Step 5: Click the + Custom Command button on the right panel. An Add Command window will pop up.

Frame 2 (1)-20240321-174145.png

Step 6: Enter a name under the Custom Command section and click the + Add button.

Frame 3 (1)-20240321-174931.png

Reader Note

A display name is required (under the Custom Command section); it must be unique. Set the display name to be the same as the Python function name.

  • Changing the display name induces changes on the internal name and affect function-reference.

  • Changing the display name induces changes on the Webhook URL of the custom command.

  • For every new command added through the + Custom Connection button, there should be a corresponding function in your Python script to handle it.

For custom Event Intake or Incident Intake commands, the function name should be fetchEvent/fetchIncident.

PY
def fetchEvent():
  pass
def fetchIncident():
  pass

Command Input Setup

Adding new input parameters for your command:

Step 1: Navigate to Configuration on the top menu.

Step 2: Click on the Integrations icon on the left sidebar.

Step 3: Enter the name of your integration and press the Enter key.

Step 4: Click on this integration.

Step 5: Click on the command of interest.

Frame 4-20240321-183722.png

Step 6: Click on the + New Input Parameter button. A New Parameter window will appear.

a. Parameter Name: The internal name of the parameter. It should match the parameter name in the function definition.

b. Display Name: The display name of the parameter.

c. Parameter Type: The data type of the parameter.

  • Parameter Type options: Text, Number, DateTime, Boolean, Text Array, Number Array, DateTime Array, Boolean Array, JSON Array, JSON Object, Unknown, File, File Array, JSON Array Array

d. Is Required?: Whether or not the parameter is required when running the command.

e. Parameter Index: The order of the parameter, starting from 0.

f. Input Type: The data type of this parameter. For more information regarding the Input type, please refer to this document.

g. Default Value: Whether the parameter has a default value. It can be empty.

h. Description: The description of the parameter.

i. Sample Data: Sample data of the parameter.

Parameter Type

READER NOTE

Parameter Type Options:

  • Text - default value: ““

  • Number (float) - default value: 0(int)

  • DateTime - default value: datetime.datetime(1900, 1, 1, 0, 0)

  • Boolean - default value: []

  • JSON Object - default value: {}

  • Text Array, Number Array, DateTime Array, Boolean Array, JSON Array, Unknown, File, File Array, JSON Array Array - default value: []

Input Type Options:

  • Custom Input: match and support parameter type, change input method according to parameter type

  • Text Template: text input

  • HTML Template: HTML input

  • Drop Down Option: selection, only support Text parameter type

  • Multi Select Option: multiple selection, only support Text Array parameter type

Custom Input

Command Output Setup

Each command should return data using pb.returnOutputModel, and users need to add additional fields such as Key Field at the output tabs.

resultData — HTML formatted data displayed in Result tab; it is used for data visualization.

Result Data is used for data visualization.

Reader Note

HTML string can be generated from JSON or JSON array by our helper Python functions:

  • ConvertJsonObjectArrayToHTMLString

  • ConvertJsonObjectToHTMLString.

Check library below for more details.

returnData — Determine the success/failure of the command.  

keyFields — Should be left empty.  This will be automatically generated using Key Fields configuration.

Set up the Key Fields from the Output configuration at the bottom of the Python script editing screen.

The Key Fields JSON path should be referred to the JSON path in Context Data.

contextData — Contextual data to be shared with other tasks.

Example of Recorded Future Check IP Reputation Context Data:

JSON
[
  {
    "risk": {
      "score": 0,
      "level": 0,
      "context": {
        "phishing": {
          "score": 0,
          "rule": {
            "count": 0,
            "maxCount": 1
          }
        },
        "public": {
          "summary": [],
          "score": 0,
          "mostCriticalRule": "",
          "rule": {
            "maxCount": 52
          }
        },
        "c2": {
          "score": 0,
          "rule": {
            "count": 0,
            "maxCount": 2
          }
        }
      }
    },
    "entity": {
      "id": "ip:***.***.***.***",
      "name": "***.***.***.***",
      "type": "IpAddress"
    },
    "ipAddress": "***.***.***.***",
    "riskLevel": "Low"
  }
]

rawData — Raw data from the command; this stores the raw message returned from 3rd-party API calls.

Example of Recorded Future Check IP Reputation:

JSON
{
  "data": {
    "results": [
      {
        "risk": {
          "score": 0,
          "level": 0,
          "context": {
            "phishing": {
              "score": 0,
              "rule": {
                "count": 0,
                "maxCount": 1
              }
            },
            "public": {
              "summary": [],
              "score": 0,
              "mostCriticalRule": "",
              "rule": {
                "maxCount": 52
              }
            },
            "c2": {
              "score": 0,
              "rule": {
                "count": 0,
                "maxCount": 2
              }
            }
          }
        },
        "entity": {
          "id": "ip:***.***.***.***",
          "name": "***.***.***.***",
          "type": "IpAddress"
        },
        "ipAddress": "***.***.***.***",
        "riskLevel": "Low"
      }
    ]
  },
  "counts": {
    "returned": 1,
    "total": 1
  }
}

Error — Error details.

OutputModel in playbook configuration:

For more information regarding Playbook Task Configuration, please refer to this document.

READER NOTE

The outcome of a command's execution hinges directly on the contents of the error field. Should the error field not be False (eg. an error message inside the error field), the command will fail.

Integration fetchEvent/fetchIncident pass down data

fetchEvent and fetchIncident commands are used to ingest raw data from third-party integrationsources into our D3 system and create Events and Incidents respectively.

For more details about Integration Commands, please refer to this document.

For more details about how to schedule and ingest Events and Incidents, please refer to this document.

fetchEvent and fetchIncident commands should also utilize another field in our D3 OutputModel: Passdown data.

Since fetchEvent and fetchIncident commands are usually time-sensitive, it is important to schedule the next start time for the next round of fetch events/incidents. In our system commands, this situation is handled as long as the start time of the command and the schedule interval is filled. If a custom fetchEvent/fetchIncident is needed, the next start time can be handled by the passdown data.

Reader Note

All parameters in a function can utilize the Passdown data from last fetchEvent/fetchIncident.

  1. Set up command input as mentioned above.

  2. Add a timespan parameter. Please note that this timespan should equal to or larger than your schedule interval time in order to avoid any fetchevent delay.

    • Time Span: the input to calculate the next fetch event input start time

    • Schedule Interval: the interval time to actually execute the fetch event command.

  1. Write your main function logic.

  2. Define your passdown data.

CODE
passdown = {
"startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
}

In this example, next startTime is calculated by checking the current time and adding timeSpan.

Put your passdown data in the outputModel.

Next time in the schedule when this command is executed, the startTime will be the one in your passdown data.

CODE
passdownData = passdown if timeSpan else {}

Integration Connection Parameter Setup

Because an integration must connect to third-party services, a connection must be established for API calls. In D3, you can create different types of connections with your desired parameters.

  1. Configure your connection parameters.

  1. Add a custom parameter.

    image-20240320-233225.png
  2. Add a new connection and fill in the connection data.

Reader Note

  • If the parameter is set to be sensitive, the field will be masked.

    image-20240111-193637.png
  1. For integration commands, the global variable runtime['connector'] is the dictionary that contains Connection Parameters. The key for Connection Parameters are lowercase and without space

e.g. runtime['connector']['serverurl'] or runtime.get("connector").get("serverurl", "")

Command Functionality Definition

Utility command example

Use case: Convert datetime format

Method: Define the input datetime format (old Pattern) and convert it into another datetime format (newPattern)

  • Input — Datetime

  • Output — Datetime

Integration Command Example

Fetch event sample code

PY
# define a _sendRequest helper function to handle all requests in this integration
def _sendRequest(self, method, resource=None, params=None, body=None):
    try:
        # this integration needs a connector
        connector = pb.runtime.get("connector")
        if not connector:
            raise Exception("Connection is required")

        # get the connector server url
        serverurl = connector.get("serverurl", "").strip()
        if not validators.url(serverurl):
            raise Exception("Server Url is not valid in format")
        # get the rest of connector parameters
        serverurl = serverurl.rstrip("/")
        username = connector.get("username", "").strip()
        apikey = connector.get("apikey", "").strip()
        version = connector.get("version", "").strip()

        # send request and error handling
        url = f"{serverurl}/{version}/"
        headers = {"Accept": "application/json", "Content-Type": "application/json", "Accept-Encoding": None}
        r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, auth=(username, apikey))
        data = r.json() if "application/json" in r.headers["Content-Type"].lower() else r.text
        if not r.ok:
            return (False, data, f"Status Code: {r.status_code}, Message: {r.reason}, {r.text}")
        else:
            return (True, data, None)
    # handle ConnectionError the exception
    except requests.ConnectionError:
        raise Exception("The remote server refused to connect, please check Server URL in connection or API service availability.")
    except Exception as ex:
        raise Exception(_fmtExeptionErrMsg(ex))

def fetchEvent(args):
    error = ""
    rawData = []
    result = ""
    keyFields = ""
    passdown = {}
    try:
        # get the command parameters. args index following the defined index.
        startTime = args[0]
        endTime = args[1]
        index = args[2].strip() if args[2] else ""
        limit = _getValidInputValue(args[3], 1, 100, 20)
        offset = int(args[4]) if int(args[4]) >= 0 else 0
        timeSpan = int(args[6]) if int(args[6]) > 0 else 0
        queryString = args[5]
        # prepare body and send request
        body = _prepareRequestBody(startTime, endTime, limit, offset, queryString, "timestamp", timeSpan)
        pb.log(json.dumps(body))
        success, rawData, error = _sendRequest("GET", f"data/{index}/_search", data=json.dumps(body))
        # structure the result output
        if not success:
            error = "Search events failed: " + error
        else:
            result = rawData.get("hits", {}).get("hits")
        if result:
            keyFields = {
                "EventIDs":  [sub.get("_id") for sub in result],
                "EventIndexes": [sub.get("_index") for sub in result],
                "EventTypes": [sub.get("_type") for sub in result]
            }
        passdown = {
            "startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
        }
    except Exception as ex:
        error = "Search events failed: " + _fmtExeptionErrMsg(ex)
        pb.log(traceback.format_exc())
    # return the D3 outputmodel with the request result
    return pb.returnOutputModel(
        result,
        rawData if not error else "Failed",
        keyFields, "",
        rawData,
        error,
        passdownData = passdown if timeSpan else {}
        )

Reader Note

Please be aware that, all custom commands within an integration will be put into one Python script.

For advanced users, it will be a good practice to create helper functions such as _sendrequest to avoid code duplication

Command Retry

Sometimes a command may fail due to some unexpected reasons. You can return a special "__RETRY__” as returnData in D3 output model and our system will re-run the command. Also, the retry count can be retrieved as pb.runtime.get('retrycount', 0).

Sample code as follow:

PY
def retrySample():
    errors = []
    rawData = []
    result = []
    returnData = "Failed"
    # get the retry count
    retrycount = pb.runtime.get("retrycount", 0)
    # retry if updateEvents throw an expection and the retry count is less than 10. It will stop at the 10th time and return "Failed"
    try:
        return updateEvents()
    except Exception:
        if retrycount < 10:
            return pb.returnOutputModel(result, "RETRY", "", "", rawData, errors)
    return pb.returnOutputModel(result, returnData, "", "", rawData, errors)

D3 Python Coding Conventions

  • Follow camelCase to create method name.

  • Do not use print function, or any other function that outputs to the standard output.

  • Do not define a class named PlaybookRunTime. It is reserved for D3 playbook engine.

  • Do not define a class named APIError. It is reserved for D3 playbook engine.

  • Do not define a class named HTTPAPIError. It is reserved for D3 playbook engine.

  • Do not define a variable or object named pb. It is reserved for D3 playbook engine.

  • Do not define a variable or object named args. It is reserved for D3 playbook engine.

  • Do not define a variable or object named runtime. It is reserved for D3 playbook engine.

  • Do not define a variable or object named input. It is reserved for D3 playbook engine.

  • Do not use the different method name from the command name in the command script.

  • Do not import a library which is neither Python standard library nor D3 3rd party library.

Debugging and Testing

Use pb.log() to log data during command execution.

  • Logged data will be displayed in command testing, and in playbook runtime while testing a playbook.

  • Logged data will be saved even if the command exits due to an exception.

Use the traceback library to get the full stack trace of exceptions.

  • Stack traces will match the line number of the command script if a command exits due to an exception.

  • Use traceback.format_exc() to get full stack trace during exception handling.

D3 Python Library

Global Runtime Variables

Integration Command Only

runtime["connector"]

CODE
{
  "custom_passwordvaultconnection": "",
  "schedulehealthchecking": "False",
  "schedulehealthcheckinginterval": "5"
  -- any defined connection paramater will also be here --
}

runtime["retrycount"]

return the number of the retry count

Command Execution Helper Functions

In your custom Python script, you can also use the D3 out-of-box System Utility Command or Integration Command.

To Execute Utility Commands:

  • Follow this format: D3.Utility.{Command Name}<{Command Type}>(parameters)

PY
def Demo():
return D3.Utility.concatToRear <Text>("Join ", "Text")
  • Please be aware that Command Name is the internal name of the command.

Command Type is the first parameter type of the command. Here is the possible parameter data type:

  • Text

  • Number

  • DateTime

  • Boolean

  • Text Array

  • Number Array

  • DateTime Array

  • Boolean Array

  • JSON Array

  • JSON Object

  • Unknown

  • It will return our D3 Outputmodel

To Execute Integration Commands:

Similar to the format of executing a Utility Command. Here is the format for executing Integration Command: D3.Integration.{Integration Name}.{Command Name}(parameters).

  • Note that data type is not necessary

  • The limitations are:

    • Only custom Integration Commands can execute other Integration Commands that belong to the same Integration

    • Custom Utility Commands cannot execute any Integration Commands.

sample ipReputationCheck() is a system command. sample fetchEvent is a custom command:

PY
def ipReputationCheck():
    return pb.returnOutputModel('', '', '', '', {"EventRawData": "rawData"}, '')
CODE
def fetchEvent():
    return D3.Integration.Demo_Integration.ipReputationCheck()

General Helper Functions:

Each helper function can be used with pb.{function name}

isJson(jsonString)

  • return True or False if it is JSON

  • Parameters:

    • jsonString: the JSON string to validate

Sample Input: pb.isJson('{"Simple":"Simple JSON"}')

Sample Output: True

log(message)

  • Log any message to show in the Custom Log tab. The tab will only show when testing command

Sample Input: pb.log("Debug Line")

Sample Output:

returnOutputModel( result, returnData, outputData, contexData, rawData, error, passdownData={})

  • It will generate our D3 output model. It is recommended to be used in writing custom commands.

  • Parameters:

    • result: HTML formatted data displayed in Result tab

    • returnData: Simple data that can be directly used by the next commands

    • outputData: Should be left empty. This will be automatically generated using Key Fields configuration

    • contextData: Contextual data to be shared with other tasks

    • rawData: Raw data from the command

    • error: Error details

    • passdownData: A Event Intake and Incident Intake related field that can pass down parameter values to the next scheduled instance.

Sample Input:

pb.returnOutputModel("<body><h1>Sample Result Data</h1></body>", "Sample Return Data", "", {"SampleContextData": "ContextData"}, {"SampleRawData": "Rawdata"}, "Sample Error", passdownData = {})

Sample Output:

PY
{
  "result": {
    "description": "<body><h1>Sample Result Data</h1></body>",
    "references": [],
    "actions": []
  },
  "returnData": "Sample Return Data",
  "outputData": "",
  "contextData": {
    "SampleContextData": "ContextData"
  },
  "rawData": {
    "SampleRawData": "Rawdata"
  },
  "error": "Sample Error",
  "passdownData": {},
  "customLog": "",
  "others": {}
}

uploadFile(file)

  • Upload the file to D3 playbook file table.

  • Parameters:

    • File JSON object to be uploaded to the file table. Format as follows: {"file": (filename,content)}. content is binary data of the file.

Sample Input: pb.uploadFile({"file": ("Sample File","*****=")})

Sample Output:

PY
{
  "fileId": "*****",
  "fileName": "Sample File",
  "md5": "*****",
  "sha1": "*****",
  "sha256": "*****"
}

downloadFile(fileid, filesource) > IR_ATCHMNT, PB_FILE, KC_AF_FILE

  • Download the file from D3 file tables

  • Parameters:

    • fileid: the file ID in the file table

    • filesource: one of the three D3 file table:

      • IR_ATCHMNT: Incident attachment files

      • PB_FILE: Playbook files

      • KC_AF_FILE: Artifact files

Sample Input: pb.downloadFile("*****", "PB_FILE") ('Sample File', b'*****=')

Sample Output: ('Sample File', b'*****=')

Note that it works best paired with the decode function

Sample Input: filename, filecontent = pb.downloadFile("*****", "PB_FILE") filecontent.decode("utf-8")

Sample Output: Sample Text

ConvertJsonObjectArrayToHTMLString(inputArray, showHeader=True, isCol="col")

  • Format the JSON Object array to an HTML table string to be put into the Result field in the D3 outputmodel.

  • Parameters:

    • inputArray: the JSON object array to be converted

    • showHeader: whether to show the key name as header in the table. Default value is True. Can be set to False.

    • isCol: whether to format the table in a column style. Default value is "col". Can be "row"

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "*****", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }])

Sample Output

HTML
<table class='cc-table horizontal-table'><tr><th>fieldName</th><th>value</th><th>displayName</th></tr><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>*****</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "*****", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], False)

Sample Output:

HTML
<table class='cc-table horizontal-table'><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>*****</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>

Sample Input:

pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "*****", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], True, "row")

Sample Output:

HTML
<table class='cc-table vertical-table'><tr><th>fieldName</th><td>EventKey</td><td>EventName</td><td>Severity</td><td>Status</td></tr><tr><th>value</th><td>9635</td><td>*****</td><td>Medium</td><td>New</td></tr><tr><th>displayName</th><td>Unique Event Key</td><td>Event name</td><td>Severity</td><td>Status</td></tr></table>

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.