Skip to main content
Skip table of contents

Python Script

LAST UPDATED: OCT 15, 2024

In D3, we write most of our system commands with Python Script. These system Python scripts that we wrote are similar to the custom Python scripts that you can write. Therefore, this section will teach you how to write a proper and useful custom Python script like a D3 expert.

Command & Script

Below are the facts that demonstrate how the D3 command is being implemented and associated with the Python script.

  1. Each system/custom command is associated with a Python function, whose name matches that of the internal name.

  2. All system/custom commands of integration should be written in a system/custom Python script

  3. With the same integration, the custom script contains the codebase of the system script, even though it doesn’t appear within the UI. This means that you can reference any functions, classes, or variables within the system script.

  4. The functions, classes, variables, and modules in the custom scripts can override those defined in the system script if they share the same name.

  5. Input parameters are passed to the parameters of the function of the corresponding commands by order. To reference the parameters, you can use either way

PY
def commandName(param1,param2,param3):

def commandName(*args):
  param1,param2,param3 = args[0],args[1],args[2]
  1. For integration command, connection parameters can be referenced by the global variable `runtime`, using either way:

PY
runtime['connector']['serverurl'] 

or

PY
runtime.get('connector',{}).get('serverurl','<your default value>')
  1. Output data can be separated into result data, return data, raw data, errors log and passdown data, and turn into command output through function `pb.returnOutputModel`. For details, refer to Return Output Model

  2. One custom integration can only define one system-defined command, which includes testconnection, fetchevent, fetchincident, etc.

Convention

  • Do not define a class named PlaybookRunTime. It is reserved for D3 playbook engine.

  • Do not define a class named APIError. It is reserved for D3 playbook engine.

  • Do not define a class named HTTPAPIError. It is reserved for D3 playbook engine.

  • Do not define a variable or object named pb. It is reserved for D3 playbook engine.

  • Do not define a variable or object named args. It is reserved for D3 playbook engine.

  • Do not define a variable or object named runtime. It is reserved for D3 playbook engine.

  • Do not define a variable or object named input. It is reserved for D3 playbook engine.

  • Do not use the different function name from the command name in the command script.

  • Do not import a library which is neither Python standard library nor D3 3rd party library.

  • Use pb.log function to output the standard output instead of using print function, or any other function.

  • Data must be returned and output from the command through pb.returnOutputModel()

  • Do not input any OAuth 2.0 logic into your command since currently D3 custom integration doesn't support OAuth 2.0 authentication feature

Return Output Model

The return output model is a D3 helper function that formats the return data of a command into D3 output model.

CODE
pb.returnOutputModel( result, returnData, keyFields, contextData, rawData, errors, passdownData)

READER NOTE

The returnData should return the status of the command which is either Successful, Partially Successful, or Failed. It it suggested to keep both the keyFields and contextData fields as an empty string "", as these fields are deprecated.

Error Handling

The success or failure of a command is determined solely by the errors field. If the errors field is not False, indicating the presence of an error message, the command will fail.

READER NOTE

errors field is a text field where it should only include one error message.

Although the returnData contains the status of the command, it is not directly related to the success or failure of the command. This means that if returnData is ‘Failed’ and the errors field is empty, the command will still be considered successful.

User Case I

In situations where a command may produce multiple error messages, the errors field, being a text field, is limited in its capacity to store more than one message. To address this limitation, a general message such as "There are more than one errors inside the command. For more information, please refer to the errorMessages field inside the rawData field" can be placed in the errors field. Subsequently, all individual error messages can be logged within the errorMessages field inside the rawData field. In this scenario, the rawData field should be formatted as a JSON object, with the error messages stored as an array. This approach allows for the logging of all errors encountered during the command execution while maintaining clarity and organization of the error data.

User Case II

In scenarios where a single command execution involves multiple requests, it's possible for any of these requests to contain errors within their responses. If you need to handle each error individually but prefer not to have a single error result in the failure of the entire command, there's a workaround. You can opt not to input the error message into the errors field and instead store it in the rawData field. By following this approach, you can mark the command as "Partially Successful" since it contains errors but still returns results. This ensures that each error is appropriately handled without compromising the overall execution of the command.

CODE
# This is the JSON format for rawData field that D3 recommend to use in case of handling multiple errors.

rawData = {
results: [<response rawData>]
errorMessages: [<error Messages>]
}

READER NOTE

Retry a command in order to handle a specific error. Please refer to Command Retry Mechanism

Passdown

To enable the scheduled advancement of a timed command, a Passdown data must be configured to contain the next start time. This ensures that the command is pushed according to the specified schedule.

READER NOTEs: For more information on how and when to set passdown data, please reference the real case in the code sample - Fetch Event/Fetch Incident Passdown Data

D3 Python Library

Utility Command & Integration Command Calls

In your custom Python script, you can use the D3 out-of-box System Utility Command or Integration Command.

To call a Utility Command within the Python Script

D3.Utility.{Command Name}<{Command Type}>(parameters)

CODE
def utilityCommandCalls(): return D3.Utility.concatToRear <Text>("Join ", "Text")

WARNING

Please be aware that Command Name is the internal name of the command
Command Type is the first parameter type of the command.

Here are some useful and commonly used utility commands in the custom Python Script

Command Internal Name

Command Display Name

Description

Calls

Input

Output

equals

Text Equals to

Checks if the two input texts are identical

D3.Utility.equals<Text>("SOAR","SOAR")

Input 1 - Text

SOAR

Input 2 - Text

SOAR

Return Data - Data Type: Boolean

true

contains

Contains Text

Checks if the input text contains the specified text

D3.Utility.contains<Text>("Welcome to use the SOAR product","SOAR")

Input - Text

Welcome to use the SOAR product

Search Value - Text

SOAR

Return Data - Data Type: Boolean

true

GetUTCTimeNow

Get Current UTC Time

Gets current UTC time

D3.Utility.GetUTCTimeNow<>()

Site: dropdown list

Return Data - Data Type: Text

"2020-05-28 23:08:39"

ExtractArtifactsToJsonObjectArrayWithArrayKeyValueO

Extract Key/Value Pairs from JSON Object

Extracts values of specified keys from a JSON Object

D3.Utility.GetUTCTimeNow<>()

Input - JSON Object

The JSON Object to extract some key's value from

Sample Data

{

"IPAddress": "***.***.***.***",

"RiskLevel": "Low",

"Type": "Cyber"

}

Keys - Text Array

The keys list

Sample Data

[

"IPAddress",

"RiskLevel"

]

Context Data - Data Type: JSON Object:

{

"IPAddress": "***.***.***.***",

"RiskLevel": "Low"

}

greaterThan

Greater than

Checks if the first number is greater than the second number

D3.Utility.greaterThan<Number>(8000,8000)

Input 1 - Number

8000

Input 2 - Number

8000

Return Data - Data Type: Boolean

false

READER NOTE

For more information about the Utility Commands, please refer to the Utility Command

To call an Integration Command within the Python Script

D3.Integration.{Integration Name}.{Command Name}(parameters)

READER NOTE

Data type is not necessary

The limitations are:

  • Only custom Integration Commands can execute other Integration Commands that belong to the same Integration

  • Custom Utility Commands cannot execute any Integration Commands.

D3 Helper Functions

D3 provides users with pre-defined helper functions to assist developers in writing cleaner, more modular, and efficient code in D3 ways.

Each helper function can be used with pb.{function name}

isJson(jsonString)

  • Returns True if the input is valid JSON, else returns False.

  • Parameters:

    • jsonString: the JSON string to validate

Sample Input: pb.isJson('{"Simple":"Simple JSON"}')

Sample Output: True

log(message)

  • Log any message to show in the Custom Log tab. The tab will only show when testing command

Sample Input: pb.log("Debug Line")

Sample Output:

returnOutputModel(result, returnData, outputData, contexData, rawData, error, passdownData={})

  • Generates our D3 output model. Recommended to be used for writing custom commands.

  • Parameters:

    • result: HTML formatted data displayed in the Result tab

    • returnData: Simple data that can be directly used by subsequent commands

    • outputData: Should be left empty. This will be automatically generated using Key Fields configuration

    • contextData: Contextual data to be shared with other tasks

    • rawData: Raw data from the command

    • error: Error details

    • passdownData: An Event Intake and Incident Intake related field that can pass down parameter values to the next scheduled instance.

Sample Input:

PY
pb.returnOutputModel("<body><h1>Sample Result Data</h1></body>", "Sample Return Data", "", {"SampleContextData": "ContextData"}, {"SampleRawData": "Rawdata"}, "Sample Error", passdownData = {})

Sample Output:

JSON
{
  "result": {
    "description": "<body><h1>Sample Result Data</h1></body>",
    "references": [],
    "actions": []
  },
  "returnData": "Sample Return Data",
  "outputData": "",
  "contextData": {
    "SampleContextData": "ContextData"
  },
  "rawData": {
    "SampleRawData": "Rawdata"
  },
  "error": "Sample Error",
  "passdownData": {},
  "customLog": "",
  "others": {}
}

uploadFile(fileObject)

Uploads a file to D3 as Playbook File (PB_FILE). The function accepts a JSON object with file metadata, including the file name and content.

Parameter:

  • fileObject (JSON object): A dictionary representing the file to be uploaded. It must follow the structure:

    PY
    {
        "file": (fileName, fileContent)
    }
    • fileName (string): The name of the uploaded file.

    • fileContent (string): The file content in binary format.

Example of fileObject:

PY
{
    "file": (
        "example.txt",
        b"Sample binary content of the file"
    )
}

Return:

Upon successful upload, the function returns a JSON object containing the following fields:

  • fileId (string): A unique identifier assigned for the uploaded file.

  • fileName (string): The name of the uploaded file.

  • md5 (string): The MD5 checksum of the uploaded file, used to verify its integrity.

  • sha1 (string): The SHA-1 hash of the uploaded file, used for additional integrity verification.

  • sha256 (string): The SHA-256 hash of the uploaded file, used for enhanced integrity checks.

Example Usage:

PY
fileObject = {
    "file": ("example.txt", b"File content in binary")
}

response = uploadFile(fileObject)
# response = {'fileId': '115924', 'fileName': 'example.txt', 'md5': '517A6396037BE94D96EF2D00AB65C913', 'sha1': '2E46DD7BF55755FE5938181E4335252FE207B609', 'sha256': '9D5CFCA834F1F67EFF7A1A57B2FAD25FBB4544C4845B99F0238357315271E2C2'}


The uploadFile function is often used in conjunction with the formatDownloadFileResult function to generate an HTML table displaying the metadata of uploaded files and providing direct download links for each file.

PY
def Uploadfile(*args):
    rawData = {}
    resultData = {}
    returnData = "Successful"
    contextData = ""
    keyFields = {}
    error = ""
    fileObjects = [{"file": ("example1.txt", b"File example 1")}, {"file": ("example2.txt", b"File example 2")}]
    fileResults = []
    for fileObject in fileObjects:
        result = pb.uploadFile(fileObject)
        fileResults.append(result)
    resultData, reference = pb.formatDownloadFileResult(fileResults)
    return pb.returnOutputModel(resultData, returnData, keyFields, contextData, rawData, error, reference)
Frame 24 (5)-20240925-023816.png

downloadFile(fileid, filesource)

Retrieves a file based on the provided fileID and fileSource. It returns the file name and content, enabling access to files from sources such as incident attachments, playbook files, or artifact files.

Parameters:

  • fileid (string): A unique identifier for the file to be retrieved.

  • filesource (string): The source of the file. The options are:

    • IR_ATCHMNT - Incident attachment files

    • PB_FILE - Playbook files

    • KC_AF_FILE - Artifact files

Return:

The function returns a tuple containing the file name and file content:

  • fileName (string): The name of the downloaded file.

  • fileContent (string): The actual content of the file in binary format.

Example Usage:

PY
response = pb.downloadFile("115924","PB_FILE")
# response = ("example.txt", b"File content in binary")

READER NOTE

Generally, when retrieving binary content through a file, if the file is base64 encoded, the client needs to decode or normalize the binary content for subsequent use. After the file content has been properly decoded, it can be sent to a third-party integration for analysis via an API request or processed through a built-in command.

Debugging & Testing

When coding in the D3 Python script, you can logout any output during command execution by using our helper function pb.log().

  • Logged data will be displayed in command testing, and in playbook runtime while testing a playbook.

  • Logged data will be saved even if the command exits due to an exception.

Use the traceback library to get the full stack trace of exceptions.

  • Stack traces will match the line number of the command script if a command exits due to an exception.

  • Use traceback.format_exc() to get full stack trace during exception handling.

Command Retry Mechanism

Sometimes command may fail due to some unexpected reasons. You can return a special "__RETRY__” as returnData in D3 output model and our system will rerun the command. Also, the retry count can be retrieved as pb.runtime.get('retrycount', 0).

PY
def retrySample():
    errors = []
    rawData = []
    result = []
    returnData = "Failed"
    # get the retry count
    retrycount = pb.runtime.get("retrycount", 0)
    # retry if updateEvents throw an expection and the retry count is less than 10. It will stop at the 10th time and return "Failed"
    try:
        return updateEvents()
    except Exception:
        if retrycount < 10:
            retryOptions = {
                "__ACTION__": "__RETRY__",
                "__DELAY__": [10, 20, 30, 40]
            }
            return pb.returnOutputModel(result, retryOptions, "", "", rawData,
                                        errors)
    return pb.returnOutputModel(result, returnData, "", "", rawData, errors)

Code Sample

Fetch Event/Fetch Incident

Schedule Command Execution

During a data ingestion schedule, the End Time of each command execution is variable. Therefore, you need to adjust the End Time based on the Start Time of the command input parameter.

Here we offers an algorithm to determine the End Time of each command execution.

Case 1: If Current Time (UTCNowTime) - Start Time >= 1 hour, it means that the schedule is way behind. Therefore, you need to quickly catch up to the Current Time in order to make the schedule responsive to the present timeline.

Solution: Since the MAX catch up time D3 recommend is 1 hour, the End Time should be set to 1 hour after Start Time.

EndTime = StartTime + 1 hour

Case 2: If Current Time (UTCNowTime) - Start Time < 1 hour, it means that the schedule is either a little behind or right on time.

Solution: Since the schedule is either on time or a little behind, the End Time should be set to the current time to make the schedule responsive to the present timeline.

EndTime = UTCNowTime

Tolerance Scope

As the data of the product may have a gap between the data generated in the product and then able to be queried by the REST API service, we often need to set a tolerance scope in minutes for the command to cover a little bit past time to get the data that generated but not able to be ingested by the REST call.

Calculate the Start Time by applying the Tolerance Scope: StartTime = StartTime - ToleranceScope

Passdown Data

Since fetchEvent and fetchIncident commands are usually time-sensitive, it is important to schedule the next start time for the next round of fetch events/incidents. In our system commands, this situation is handled as long as the start time of the command and the schedule interval is filled. If a custom fetchEvent/fetchIncident is needed, the next start time can be handled by the passdown data.

Calculate the next start time which basically will be the current end time of the current task and set it to the passdown data object with the key StartTime and set the format to D3's datetime format "%Y-%m-%d %H:%M:%S".

After generating the Passdown data, it must be set in the return model to be used for the next schedule

CODE
Passdown = {
    "StartTime": EndTime.strftime("%Y-%m-%d %H:%M:%S")
}
return pb.returnOutputModel(result, returnData, "", "", returnData, errors, passdownData=Passdown )

Sample Code

CODE
def fetchEvent(*args):
    # args[0]: StartTime
    # args[1]: EndTime
    # args[2]: topRecentEventNumber
    # args[3]: SearchCondition
    # args[4]: ToleranceScope
    
    errors = []
    returnData = ""
    result = ""
    returnData = "Successful"
    
    passdown = {}    
    passdownMinutes = int(args[4])
    
    # Only calculate the endTime when the Tolerance scope has a value which indicate the task is for schedule
    def _calcEndTime(start, endTime):
        if start < datetime.utcnow() - timedelta(hours = 1):
            if endTime < datetime.utcnow() - timedelta(hours = 1):
                nextEndtime = endTime + timedelta(hours = 1)
            else:
                nextEndtime = datetime.utcnow()
        else:
            if endTime >= datetime.now():
                endTime = datetime.now() # auto-adjustment invalid input time


        passdown = { 
            "StartTime": endTime.strftime("%Y-%m-%d %H:%M:%S")
        }
     
        return Endtime, passdown     


    try:
        startTime, endTime = args[0].replace(tzinfo=pytz.utc), args[1].replace(tzinfo=pytz.utc)
        topRecentEventNumber = int(args[2])
        
        if args[4] > 0:
            endTime, passdown = _calcEndTime(startTime, endTime)
            startTime = startTime - timedelta(minutes = int(args[4]))


        ## conn is a variable with the credential setup
        params = {
            "startDate": startTime,
            "endDate": endTime,
            "filter": args[3]
        }
        
        ## remove the datetime in the request field if the year value is 1900
        if startTime.year == 1900:
            params.pop("startDate")
        if endTime.year == 1900:
            params.pop("endDate")


        returnData, error = conn.sendRequest("GET", "REST endpoint path", params=params)
        if not error:
            caseItems = returnData.get("results", [])
            if topRecentEventNumber > 0:
                caseItems = caseItems[:topRecentEventNumber]
                returnData["results"] = caseItems
            result = {
                "Start Time (UTC)": startTime.strftime("%Y-%m-%d %H:%M:%S"),
                "End Time (UTC)": endTime.strftime("%Y-%m-%d %H:%M:%S"),
                "Events Count": len(caseItems)
            }
        else:    
            errors.append(error)
        
        if len(errors) == 0 and len(caseItems) == 0:
            returnData = "Successful with No Event Data"
        elif len(errors) > 0 and len(caseItems) > 0:
            returnData = "Partially Successful"
        elif len(errors) > 0 and len(caseItems) == 0:
            returnData = "Failed"            
            
    except Exception as ex:
        errors += list(ex.args)
        returnData = "Failed" 
        
    return pb.returnOutputModel(result, returnData, "", "", "", errors, passdownData=passdown)

Test Connection

To test your integration's connection, you can manually send a request to the third-party API and monitor the response. If the response indicates any connection issues, the test connection command should fail; otherwise, it should succeed.

READER NOTE

We offer the functionality to schedule your test connection command for periodic connection health checks. Therefore, when building your own test connection, D3 recommends choosing a request with minimal impact and low cost, such as a simple GET request.

Sample Code

CODE
def TestConnection():
    # Set output model initial value
    rawData = ""
    resultData = ""
    returnData = ""
    keyFields = ""
    contextData = ""
    error = ""

    # Process and set output model value
    try:
        r = requests.get(url="http://ip.jsontest.com/", verify=False)
        if(r.ok):
          rawData = r.json()
          returnData = "Successful"
        else:
          error = "cannot connect to the site."
    except Exception as e:
        error = str(e)
        returnData = "Failed"
    return pb.returnOutputModel(resultData, returnData, keyFields, contextData, rawData, error)
JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.