Python Script
LAST UPDATED: OCT 15, 2024
In D3, we write most of our system commands with Python Script. These system Python scripts that we wrote are similar to the custom Python scripts that you can write. Therefore, this section will teach you how to write a proper and useful custom Python script like a D3 expert.
Command & Script
Below are the facts that demonstrate how the D3 command is being implemented and associated with the Python script.
Each system/custom command is associated with a Python function, whose name matches that of the internal name.
All system/custom commands of integration should be written in a system/custom Python script
With the same integration, the custom script contains the codebase of the system script, even though it doesn’t appear within the UI. This means that you can reference any functions, classes, or variables within the system script.
The functions, classes, variables, and modules in the custom scripts can override those defined in the system script if they share the same name.
Input parameters are passed to the parameters of the function of the corresponding commands by order. To reference the parameters, you can use either way
def commandName(param1,param2,param3):
def commandName(*args):
param1,param2,param3 = args[0],args[1],args[2]
For integration command, connection parameters can be referenced by the global variable `runtime`, using either way:
runtime['connector']['serverurl']
or
runtime.get('connector',{}).get('serverurl','<your default value>')
Output data can be separated into result data, return data, raw data, errors log and passdown data, and turn into command output through function `pb.returnOutputModel`. For details, refer to Return Output Model
One custom integration can only define one system-defined command, which includes testconnection, fetchevent, fetchincident, etc.
Convention
Do not define a class named PlaybookRunTime. It is reserved for D3 playbook engine.
Do not define a class named APIError. It is reserved for D3 playbook engine.
Do not define a class named HTTPAPIError. It is reserved for D3 playbook engine.
Do not define a variable or object named pb. It is reserved for D3 playbook engine.
Do not define a variable or object named args. It is reserved for D3 playbook engine.
Do not define a variable or object named runtime. It is reserved for D3 playbook engine.
Do not define a variable or object named input. It is reserved for D3 playbook engine.
Do not use the different function name from the command name in the command script.
Do not import a library which is neither Python standard library nor D3 3rd party library.
Use pb.log function to output the standard output instead of using print function, or any other function.
Data must be returned and output from the command through pb.returnOutputModel()
Do not input any OAuth 2.0 logic into your command since currently D3 custom integration doesn't support OAuth 2.0 authentication feature
Return Output Model
The return output model is a D3 helper function that formats the return data of a command into D3 output model.
pb.returnOutputModel( result, returnData, keyFields, contextData, rawData, errors, passdownData)
READER NOTE
The returnData should return the status of the command which is either Successful, Partially Successful, or Failed. It it suggested to keep both the keyFields and contextData fields as an empty string "", as these fields are deprecated.
Error Handling
The success or failure of a command is determined solely by the errors field. If the errors field is not False, indicating the presence of an error message, the command will fail.
READER NOTE
errors
field is a text field where it should only include one error message.
Although the returnData contains the status of the command, it is not directly related to the success or failure of the command. This means that if returnData is ‘Failed’ and the errors field is empty, the command will still be considered successful.
User Case I
In situations where a command may produce multiple error messages, the errors field, being a text field, is limited in its capacity to store more than one message. To address this limitation, a general message such as "There are more than one errors inside the command. For more information, please refer to the errorMessages field inside the rawData field" can be placed in the errors field. Subsequently, all individual error messages can be logged within the errorMessages field inside the rawData field. In this scenario, the rawData field should be formatted as a JSON object, with the error messages stored as an array. This approach allows for the logging of all errors encountered during the command execution while maintaining clarity and organization of the error data.
User Case II
In scenarios where a single command execution involves multiple requests, it's possible for any of these requests to contain errors within their responses. If you need to handle each error individually but prefer not to have a single error result in the failure of the entire command, there's a workaround. You can opt not to input the error message into the errors field and instead store it in the rawData field. By following this approach, you can mark the command as "Partially Successful" since it contains errors but still returns results. This ensures that each error is appropriately handled without compromising the overall execution of the command.
# This is the JSON format for rawData field that D3 recommend to use in case of handling multiple errors.
rawData = {
results: [<response rawData>]
errorMessages: [<error Messages>]
}
READER NOTE
Retry a command in order to handle a specific error. Please refer to Command Retry Mechanism
Passdown
To enable the scheduled advancement of a timed command, a Passdown data must be configured to contain the next start time. This ensures that the command is pushed according to the specified schedule.
READER NOTEs: For more information on how and when to set passdown data, please reference the real case in the code sample - Fetch Event/Fetch Incident Passdown Data
D3 Python Library
Utility Command & Integration Command Calls
In your custom Python script, you can use the D3 out-of-box System Utility Command or Integration Command.
To call a Utility Command within the Python Script
D3.Utility.{Command Name}<{Command Type}>(parameters)
def utilityCommandCalls(): return D3.Utility.concatToRear <Text>("Join ", "Text")
WARNING
Please be aware that Command Name is the internal name of the command
Command Type is the first parameter type of the command.
Here are some useful and commonly used utility commands in the custom Python Script
Command Internal Name | Command Display Name | Description | Calls | Input | Output |
equals | Text Equals to | Checks if the two input texts are identical | D3.Utility.equals<Text>("SOAR","SOAR") | Input 1 - Text SOAR Input 2 - Text SOAR | Return Data - Data Type: Boolean true |
contains | Contains Text | Checks if the input text contains the specified text | D3.Utility.contains<Text>("Welcome to use the SOAR product","SOAR") | Input - Text Welcome to use the SOAR product Search Value - Text SOAR | Return Data - Data Type: Boolean true |
GetUTCTimeNow | Get Current UTC Time | Gets current UTC time | D3.Utility.GetUTCTimeNow<>() | Site: dropdown list | Return Data - Data Type: Text "2020-05-28 23:08:39" |
ExtractArtifactsToJsonObjectArrayWithArrayKeyValueO | Extract Key/Value Pairs from JSON Object | Extracts values of specified keys from a JSON Object | D3.Utility.GetUTCTimeNow<>() | Input - JSON Object The JSON Object to extract some key's value from Sample Data { "IPAddress": "***.***.***.***", "RiskLevel": "Low", "Type": "Cyber" } Keys - Text Array The keys list Sample Data [ "IPAddress", "RiskLevel" ] | Context Data - Data Type: JSON Object: { "IPAddress": "***.***.***.***", "RiskLevel": "Low" } |
greaterThan | Greater than | Checks if the first number is greater than the second number | D3.Utility.greaterThan<Number>(8000,8000) | Input 1 - Number 8000 Input 2 - Number 8000 | Return Data - Data Type: Boolean false |
READER NOTE
For more information about the Utility Commands, please refer to the Utility Command
To call an Integration Command within the Python Script
D3.Integration.{Integration Name}.{Command Name}(parameters)
READER NOTE
Data type is not necessary
The limitations are:
Only custom Integration Commands can execute other Integration Commands that belong to the same Integration
Custom Utility Commands cannot execute any Integration Commands.
D3 Helper Functions
D3 provides users with pre-defined helper functions to assist developers in writing cleaner, more modular, and efficient code in D3 ways.
Each helper function can be used with pb.{function name}
isJson(jsonString)
Returns
True
if the input is valid JSON, else returnsFalse
.Parameters:
jsonString:
the JSON string to validate
Sample Input: pb.isJson('{"Simple":"Simple JSON"}')
Sample Output: True
log(message)
Log any message to show in the Custom Log tab. The tab will only show when testing command
Sample Input: pb.log("Debug Line")
Sample Output:
returnOutputModel(result, returnData, outputData, contexData, rawData, error, passdownData={})
Generates our D3 output model. Recommended to be used for writing custom commands.
Parameters:
result
: HTML formatted data displayed in the Result tabreturnData
: Simple data that can be directly used by subsequent commandsoutputData
: Should be left empty. This will be automatically generated using Key Fields configurationcontextData
: Contextual data to be shared with other tasksrawData
: Raw data from the commanderror
: Error detailspassdownData
: An Event Intake and Incident Intake related field that can pass down parameter values to the next scheduled instance.
Sample Input:
pb.returnOutputModel("<body><h1>Sample Result Data</h1></body>", "Sample Return Data", "", {"SampleContextData": "ContextData"}, {"SampleRawData": "Rawdata"}, "Sample Error", passdownData = {})
Sample Output:
{
"result": {
"description": "<body><h1>Sample Result Data</h1></body>",
"references": [],
"actions": []
},
"returnData": "Sample Return Data",
"outputData": "",
"contextData": {
"SampleContextData": "ContextData"
},
"rawData": {
"SampleRawData": "Rawdata"
},
"error": "Sample Error",
"passdownData": {},
"customLog": "",
"others": {}
}
uploadFile(fileObject)
Uploads a file to D3 as Playbook File (PB_FILE). The function accepts a JSON object with file metadata, including the file name and content.
Parameter:
fileObject (JSON object): A dictionary representing the file to be uploaded. It must follow the structure:
PY{ "file": (fileName, fileContent) }
fileName (string): The name of the uploaded file.
fileContent (string): The file content in binary format.
Example of fileObject:
{
"file": (
"example.txt",
b"Sample binary content of the file"
)
}
Return:
Upon successful upload, the function returns a JSON object containing the following fields:
fileId
(string): A unique identifier assigned for the uploaded file.fileName
(string): The name of the uploaded file.md5
(string): The MD5 checksum of the uploaded file, used to verify its integrity.sha1
(string): The SHA-1 hash of the uploaded file, used for additional integrity verification.sha256
(string): The SHA-256 hash of the uploaded file, used for enhanced integrity checks.
Example Usage:
fileObject = {
"file": ("example.txt", b"File content in binary")
}
response = uploadFile(fileObject)
# response = {'fileId': '115924', 'fileName': 'example.txt', 'md5': '517A6396037BE94D96EF2D00AB65C913', 'sha1': '2E46DD7BF55755FE5938181E4335252FE207B609', 'sha256': '9D5CFCA834F1F67EFF7A1A57B2FAD25FBB4544C4845B99F0238357315271E2C2'}
The uploadFile
function is often used in conjunction with the formatDownloadFileResult
function to generate an HTML table displaying the metadata of uploaded files and providing direct download links for each file.
def Uploadfile(*args):
rawData = {}
resultData = {}
returnData = "Successful"
contextData = ""
keyFields = {}
error = ""
fileObjects = [{"file": ("example1.txt", b"File example 1")}, {"file": ("example2.txt", b"File example 2")}]
fileResults = []
for fileObject in fileObjects:
result = pb.uploadFile(fileObject)
fileResults.append(result)
resultData, reference = pb.formatDownloadFileResult(fileResults)
return pb.returnOutputModel(resultData, returnData, keyFields, contextData, rawData, error, reference)
downloadFile(fileid, filesource)
Retrieves a file based on the provided fileID and fileSource. It returns the file name and content, enabling access to files from sources such as incident attachments, playbook files, or artifact files.
Parameters:
fileid (string): A unique identifier for the file to be retrieved.
filesource (string): The source of the file. The options are:
IR_ATCHMNT
- Incident attachment filesPB_FILE
- Playbook filesKC_AF_FILE
- Artifact files
Return:
The function returns a tuple containing the file name and file content:
fileName (string): The name of the downloaded file.
fileContent (string): The actual content of the file in binary format.
Example Usage:
response = pb.downloadFile("115924","PB_FILE")
# response = ("example.txt", b"File content in binary")
READER NOTE
Generally, when retrieving binary content through a file, if the file is base64 encoded, the client needs to decode or normalize the binary content for subsequent use. After the file content has been properly decoded, it can be sent to a third-party integration for analysis via an API request or processed through a built-in command.
Debugging & Testing
When coding in the D3 Python script, you can logout any output during command execution by using our helper function pb.log().
Logged data will be displayed in command testing, and in playbook runtime while testing a playbook.
Logged data will be saved even if the command exits due to an exception.
Use the traceback library to get the full stack trace of exceptions.
Stack traces will match the line number of the command script if a command exits due to an exception.
Use
traceback.format_exc()
to get full stack trace during exception handling.
Command Retry Mechanism
Sometimes command may fail due to some unexpected reasons. You can return a special "__RETRY__” as returnData in D3 output model and our system will rerun the command. Also, the retry count can be retrieved as pb.runtime.get('retrycount', 0).
def retrySample():
errors = []
rawData = []
result = []
returnData = "Failed"
# get the retry count
retrycount = pb.runtime.get("retrycount", 0)
# retry if updateEvents throw an expection and the retry count is less than 10. It will stop at the 10th time and return "Failed"
try:
return updateEvents()
except Exception:
if retrycount < 10:
retryOptions = {
"__ACTION__": "__RETRY__",
"__DELAY__": [10, 20, 30, 40]
}
return pb.returnOutputModel(result, retryOptions, "", "", rawData,
errors)
return pb.returnOutputModel(result, returnData, "", "", rawData, errors)
Code Sample
Fetch Event/Fetch Incident
Schedule Command Execution
During a data ingestion schedule, the End Time of each command execution is variable. Therefore, you need to adjust the End Time based on the Start Time of the command input parameter.
Here we offers an algorithm to determine the End Time of each command execution.
Case 1: If Current Time (UTCNowTime) - Start Time >= 1 hour, it means that the schedule is way behind. Therefore, you need to quickly catch up to the Current Time in order to make the schedule responsive to the present timeline.
Solution: Since the MAX catch up time D3 recommend is 1 hour, the End Time should be set to 1 hour after Start Time.
EndTime = StartTime + 1 hour
Case 2: If Current Time (UTCNowTime) - Start Time < 1 hour, it means that the schedule is either a little behind or right on time.
Solution: Since the schedule is either on time or a little behind, the End Time should be set to the current time to make the schedule responsive to the present timeline.
EndTime = UTCNowTime
Tolerance Scope
As the data of the product may have a gap between the data generated in the product and then able to be queried by the REST API service, we often need to set a tolerance scope in minutes for the command to cover a little bit past time to get the data that generated but not able to be ingested by the REST call.
Calculate the Start Time by applying the Tolerance Scope: StartTime = StartTime - ToleranceScope
Passdown Data
Since fetchEvent and fetchIncident commands are usually time-sensitive, it is important to schedule the next start time for the next round of fetch events/incidents. In our system commands, this situation is handled as long as the start time of the command and the schedule interval is filled. If a custom fetchEvent/fetchIncident is needed, the next start time can be handled by the passdown data.
Calculate the next start time which basically will be the current end time of the current task and set it to the passdown data object with the key StartTime and set the format to D3's datetime format "%Y-%m-%d %H:%M:%S".
After generating the Passdown data, it must be set in the return model to be used for the next schedule
Passdown = {
"StartTime": EndTime.strftime("%Y-%m-%d %H:%M:%S")
}
return pb.returnOutputModel(result, returnData, "", "", returnData, errors, passdownData=Passdown )
Sample Code
def fetchEvent(*args):
# args[0]: StartTime
# args[1]: EndTime
# args[2]: topRecentEventNumber
# args[3]: SearchCondition
# args[4]: ToleranceScope
errors = []
returnData = ""
result = ""
returnData = "Successful"
passdown = {}
passdownMinutes = int(args[4])
# Only calculate the endTime when the Tolerance scope has a value which indicate the task is for schedule
def _calcEndTime(start, endTime):
if start < datetime.utcnow() - timedelta(hours = 1):
if endTime < datetime.utcnow() - timedelta(hours = 1):
nextEndtime = endTime + timedelta(hours = 1)
else:
nextEndtime = datetime.utcnow()
else:
if endTime >= datetime.now():
endTime = datetime.now() # auto-adjustment invalid input time
passdown = {
"StartTime": endTime.strftime("%Y-%m-%d %H:%M:%S")
}
return Endtime, passdown
try:
startTime, endTime = args[0].replace(tzinfo=pytz.utc), args[1].replace(tzinfo=pytz.utc)
topRecentEventNumber = int(args[2])
if args[4] > 0:
endTime, passdown = _calcEndTime(startTime, endTime)
startTime = startTime - timedelta(minutes = int(args[4]))
## conn is a variable with the credential setup
params = {
"startDate": startTime,
"endDate": endTime,
"filter": args[3]
}
## remove the datetime in the request field if the year value is 1900
if startTime.year == 1900:
params.pop("startDate")
if endTime.year == 1900:
params.pop("endDate")
returnData, error = conn.sendRequest("GET", "REST endpoint path", params=params)
if not error:
caseItems = returnData.get("results", [])
if topRecentEventNumber > 0:
caseItems = caseItems[:topRecentEventNumber]
returnData["results"] = caseItems
result = {
"Start Time (UTC)": startTime.strftime("%Y-%m-%d %H:%M:%S"),
"End Time (UTC)": endTime.strftime("%Y-%m-%d %H:%M:%S"),
"Events Count": len(caseItems)
}
else:
errors.append(error)
if len(errors) == 0 and len(caseItems) == 0:
returnData = "Successful with No Event Data"
elif len(errors) > 0 and len(caseItems) > 0:
returnData = "Partially Successful"
elif len(errors) > 0 and len(caseItems) == 0:
returnData = "Failed"
except Exception as ex:
errors += list(ex.args)
returnData = "Failed"
return pb.returnOutputModel(result, returnData, "", "", "", errors, passdownData=passdown)
Test Connection
To test your integration's connection, you can manually send a request to the third-party API and monitor the response. If the response indicates any connection issues, the test connection command should fail; otherwise, it should succeed.
READER NOTE
We offer the functionality to schedule your test connection command for periodic connection health checks. Therefore, when building your own test connection, D3 recommends choosing a request with minimal impact and low cost, such as a simple GET request.
Sample Code
def TestConnection():
# Set output model initial value
rawData = ""
resultData = ""
returnData = ""
keyFields = ""
contextData = ""
error = ""
# Process and set output model value
try:
r = requests.get(url="http://ip.jsontest.com/", verify=False)
if(r.ok):
rawData = r.json()
returnData = "Successful"
else:
error = "cannot connect to the site."
except Exception as e:
error = str(e)
returnData = "Failed"
return pb.returnOutputModel(resultData, returnData, keyFields, contextData, rawData, error)