D3 Python Conventions
Overview
In D3, we support custom Python scripts to be executed within the D3 Python custom image. As an admin user, you can write both Utility Commands and Integration Commands in Python.
For example, you can write a custom Integration Command such as IP Reputation Check, that enriches your data from third-party sources. Then you can write a custom Utility Command in Python that transforms the data to your preferred structure.
All Utility Commands can be created and viewed in the Utility Commands module. All Integration Commands can be created and viewed in their individual Integrations. Both types of commands can be implemented in Python or Codeless Playbook
General Information
Python Version: 3.9.12
Available Third-party Libraries:
boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html
dateparser: https://pypi.org/project/dateparser/
defang: https://pypi.org/project/defang/
eml_parser: https://eml-parser.readthedocs.io/en/latest/
exchangelib: https://pypi.org/project/exchangelib/
exchangelib: https://pypi.org/project/exchangelib/
gitpython: https://gitpython.readthedocs.io/en/stable/
google-api-python-client: https://github.com/googleapis/google-api-python-client
google-auth-oauthlib: https://pypi.org/project/google-auth-oauthlib/
google-auth: https://pypi.org/project/google-auth/
google-cloud-pubsub: https://cloud.google.com/pubsub/docs/reference/libraries
google-cloud: https://cloud.google.com/python/docs/setup
html2text: https://pypi.org/project/html2text/
imap-tools==0.34.0: https://pypi.org/project/imap-tools/
inflect: https://pypi.org/project/inflect/
ioc-finder: https://pypi.org/project/ioc-finder/
iocextract: https://github.com/InQuest/python-iocextract
Jinja2: https://pypi.org/project/Jinja2/
jsonpath-ng: https://pypi.org/project/jsonpath-ng/
mail-parser==3.15.0: https://docs.python.org/3/library/email.parser.html
natsort: https://pypi.org/project/natsort/
nessrest: https://pypi.org/project/nessrest/0.31/
oauth2client: https://pypi.org/project/oauth2client/
pandas: https://pandas.pydata.org/
psutil: https://pypi.org/project/psutil/
pyeti-python3: https://pypi.org/project/pyeti-python3/
pymssql: https://pypi.org/project/pymssql/
pyodbc: https://pypi.org/project/pyodbc/
python-dateutil==2.8.0: https://dateutil.readthedocs.io/en/stable/
pywin32: platform_system == "Windows": https://pypi.org/project/pywin32/
selenium: https://selenium-python.readthedocs.io/
setuptools: https://pypi.org/project/setuptools/
taxii2-client==1.0.1: https://pypi.org/project/taxii2-client/
urllib3: https://pypi.org/project/urllib3/
validators: https://validators.readthedocs.io/en/latest/
xmltodict: https://pypi.org/project/xmltodict/
xmltojson: https://pypi.org/project/xmltojson/
sqlparse==0.4.2: https://pypi.org/project/sqlparse/
psutil==5.9.0: https://pypi.org/project/psutil/
google-auth==2.6.6: https://developers.google.com/identity/protocols/oauth2
oauth2client==4.1.3: https://pypi.org/project/oauth2client/
dict2xml==1.7.1: https://pypi.org/project/dict2xml/
luqum==0.11.0: https://pypi.org/project/luqum/
pyodbc==4.0.32: https://pypi.org/project/pyodbc/
filelock == 3.7.1: https://pypi.org/project/filelock/
cymruwhois == 1.6: https://pypi.org/project/cymruwhois/
extract-msg==0.37.0: https://pypi.org/project/extract-msg/
python-libnmap==0.7.3: https://pypi.org/project/python-libnmap/
Command Type
Utility Command
It is used for
custom data transformation or manipulation that cannot be achieved through D3’s out of box Utility commands
setting up nested playbooks
The custom utility command can be assigned as a Basic Utility, System Utility or Cyber Utility, and used in Command Task or Conditional Task.
Integration Command
Many security products support REST API integration, and D3 offers additional do-it-yourself integration through custom Python scripts if such integration or command is not covered by the current version of D3 SOAR. Users can follow the API documentation along with this D3’s Python code convention to set up integration commands.
Command Implementation
Python Command Setup
Command basic setup
New Utility Command
Step 1: Navigate to Configuration on the top menu.
Step 2: On the left sidebar, go to Utility Commands.
Step 3: Click the button +. A new Add Command window will pop up.
New Integration Command
Step 1: Navigate to Configuration on the top menu.
Step 2: On the left sidebar, go to Integration.
Step 3: For quick findings, you can search with the Integration name which you want to add the command to
Step 4: Under the search box, select the the target integration.
Step 5: On the right side, click the button + New Command. A new connection window will pop up.
Reader Note
Set the command name to be the same as python function name
For custom Event Intake or Incident Intake commands, the function name should be fetchEvent/fetchIncident.
def fetchEvent():
pass
def fetchIncident():
pass
Command input setup
Add new input parameters for your command
a. Parameter Name: The internal name of the parameter. It should match the parameter name in the function definition
b. Display Name: The display name of the parameter.
c. Parameter Type: The data type of the parameter.
Parameter Type options: Text, Number, DateTime, Boolean, Text Array, Number Array, DateTime Array, Boolean Array, JSON Array, JSON Object, Unknown, File, File Array, JSON Array Array
d. Is Required?: whether or not the parameter is required when running the command
e. Parameter Index: the order of the parameter. Starting from 0.
f. Input Type: The data type of this parameter. For more information regarding the Input type, please refer to this document.
g. Default Value: whether the parameter has a default value. It can be empty.
h. Description: The description of the parameter.
i. Sample Data: Sample data of the parameter.
Parameter Type
Custom Input
Command output setup
Each command should return data using pb.returnOutputModel, and users need to add additional fields such as Key Field at the output tabs.
resultData
— HTML formatted data displayed in Result tab.
Reader Note
HTML string can be generated from JSON or JSON array by our helper python functions: ConvertJsonObjectArrayToHTMLString and ConvertJsonObjectToHTMLString. (More details check library below)
returnData
— Return Data is analogous to the typical returns for function in programming. It is the data to be passed down directly to the next command or to be used in conditional tasks.
keyFields
— Set up the Key Fields from the Output configuration at the Output tab of the Python script editing screen. If raw data have too much data, pick out important data fields.
Generally, the common cyber security identifiers such as unique ID, file hash value, CVE number, IP addresses, etc, are suggested to be a key field.
The Key Fields JSON path should be referred to the JSON path in Context Data.
contextData
— Context Data is a contextual subset of the Raw Data. Context Data should store information that can be derived for Return Data.
Example of Recorded Future Check IP Reputation Context Data:
[
{
"risk": {
"score": 0,
"level": 0,
"context": {
"phishing": {
"score": 0,
"rule": {
"count": 0,
"maxCount": 1
}
},
"public": {
"summary": [],
"score": 0,
"mostCriticalRule": "",
"rule": {
"maxCount": 52
}
},
"c2": {
"score": 0,
"rule": {
"count": 0,
"maxCount": 2
}
}
}
},
"entity": {
"id": "ip:8.8.8.8",
"name": "8.8.8.8",
"type": "IpAddress"
},
"ipAddress": "8.8.8.8",
"riskLevel": "Low"
}
]
rawData
— Raw Data should store the raw message returned from 3rd-party API calls.
Example of Recorded Future Check IP Reputation:
{
"data": {
"results": [
{
"risk": {
"score": 0,
"level": 0,
"context": {
"phishing": {
"score": 0,
"rule": {
"count": 0,
"maxCount": 1
}
},
"public": {
"summary": [],
"score": 0,
"mostCriticalRule": "",
"rule": {
"maxCount": 52
}
},
"c2": {
"score": 0,
"rule": {
"count": 0,
"maxCount": 2
}
}
}
},
"entity": {
"id": "ip:8.8.8.8",
"name": "8.8.8.8",
"type": "IpAddress"
},
"ipAddress": "8.8.8.8",
"riskLevel": "Low"
}
]
},
"counts": {
"returned": 1,
"total": 1
}
}
Error
— Error information you want to show if there is any.
OutputModel in playbook configuration:
For more information regarding Playbook Task Configuration, please refer to this document.
Integration fetchEvent/fetchIncident pass down data
fetchEvent
and fetchIncident
commands are used to ingest raw data from third-party integrationsources into our D3 system and create Events and Incidents respectively.
For more details about Integration Commands, please refer to this document.
For more details about how to schedule and ingest Events and Incidents, please refer to this document.
fetchEvent
and fetchIncident
commands should also utilize another field in our D3 OutputModel: Passdown data.
Since fetchEvent and fetchIncident commands are usually time-sensitive, it is important to schedule the next start time for the next round of fetch events/incidents. In our system commands, this situation is handled as long as the start time of the command and the schedule interval is filled. If a custom fetchEvent/fetchIncident is needed, the next start time can be handled by the passdown data.
Reader Note
All parameters in a function can utilize the Passdown data from last fetchEvent
/fetchIncident
.
Setup command input as mentioned above.
Add a timespan parameter. Please note that this timespan should equal to or larger than your schedule interval time in order to avoid any fetchevent delay.
Time Span: the input to calculate the next fetch event input start time
Schedule Interval: the interval time to actually execute the fetch event command.
Write your main function logic.
Define your passdown data.
passdown = {
"startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
}
In this example, next startTime
is calculated by checking the current time and adding timeSpan
.
Put your passdown data in the outputModel
.
Next time in the schedule when this command is executed, the startTime will be the one in your passdown data.
passdownData = passdown if timeSpan else {}
Integration connection parameter setup
Due to the fact that Integration has to connect to third-party services, a connection has to be established for API calls.
In D3, you can create different types of connections with your desired parameters.
Configure your connection parameters
Add a custom parameter
Add a new connection and fill in the connection data
Reader Note
If the parameter is set to be sensitive, the field will be masked.
For integration commands, the global variable runtime['connector'] is the dictionary that contains Connection Parameters. The key for Connection Parameters are lowercase and without space
e.g. runtime['connector']['serverurl']
or runtime.get("connector").get("'serverurl'", ""
Command functionality definition
Utility command example
Use case: Convert datetime format
Method: Define the input datetime format (old Pattern) and convert it into another datetime format (newPattern)
Input — Datetime
Output — Datetime
Integration command example
Fetch event sample code
# define a _sendRequest helper function to handle all requests in this integration
def _sendRequest(self, method, resource=None, params=None, body=None):
try:
# this integration needs a connector
connector = pb.runtime.get("connector")
if not connector:
raise Exception("Connection is required")
# get the connector server url
serverurl = connector.get("serverurl", "").strip()
if not validators.url(serverurl):
raise Exception("Server Url is not valid in format")
# get the rest of connector parameters
serverurl = serverurl.rstrip("/")
username = connector.get("username", "").strip()
apikey = connector.get("apikey", "").strip()
version = connector.get("version", "").strip()
# send request and error handling
url = f"{serverurl}/{version}/"
headers = {"Accept": "application/json", "Content-Type": "application/json", "Accept-Encoding": None}
r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, auth=(username, apikey))
data = r.json() if "application/json" in r.headers["Content-Type"].lower() else r.text
if not r.ok:
return (False, data, f"Status Code: {r.status_code}, Message: {r.reason}, {r.text}")
else:
return (True, data, None)
# handle ConnectionError the exception
except requests.ConnectionError:
raise Exception("The remote server refused to connect, please check Server URL in connection or API service availability.")
except Exception as ex:
raise Exception(_fmtExeptionErrMsg(ex))
def fetchEvent(args):
error = ""
rawData = []
result = ""
keyFields = ""
passdown = {}
try:
# get the command parameters. args index following the defined index.
startTime = args[0]
endTime = args[1]
index = args[2].strip() if args[2] else ""
limit = _getValidInputValue(args[3], 1, 100, 20)
offset = int(args[4]) if int(args[4]) >= 0 else 0
timeSpan = int(args[6]) if int(args[6]) > 0 else 0
queryString = args[5]
# prepare body and send request
body = _prepareRequestBody(startTime, endTime, limit, offset, queryString, "timestamp", timeSpan)
pb.log(json.dumps(body))
success, rawData, error = _sendRequest("GET", f"data/{index}/_search", data=json.dumps(body))
# structure the result output
if not success:
error = "Search events failed: " + error
else:
result = rawData.get("hits", {}).get("hits")
if result:
keyFields = {
"EventIDs": [sub.get("_id") for sub in result],
"EventIndexes": [sub.get("_index") for sub in result],
"EventTypes": [sub.get("_type") for sub in result]
}
passdown = {
"startTime": startTime + timedelta( minutes = timeSpan) if startTime +timedelta(minutes=timeSpan) < datetime.datetime.utcnow() else startTime
}
except Exception as ex:
error = "Search events failed: " + _fmtExeptionErrMsg(ex)
pb.log(traceback.format_exc())
# return the D3 outputmodel with the request result
return pb.returnOutputModel(
result,
rawData if not error else "Failed",
keyFields, "",
rawData,
error,
passdownData = passdown if timeSpan else {}
)
Reader Note
Please be aware that, all custom commands within an integration will be put into one python script.
For advanced users, it will be a good practice to create helper functions such as _sendrequest to avoid code duplication
Command retry
Sometimes command may fail due to some unexpected reasons. You can return a special "__RETRY__” as returnData in D3 output model and our system will rerun the command. Also, the retry count can be retrieved as pb.runtime.get('retrycount', 0)
.
Sample code as follow:
def retrySample():
errors = []
rawData = []
result = []
returnData = "Failed"
# get the retry count
retrycount = pb.runtime.get("retrycount", 0)
# retry if updateEvents throw an expection and the retry count is less than 10. It will stop at the 10th time and return "Failed"
try:
return updateEvents()
except Exception:
if retrycount < 10:
return pb.returnOutputModel(result, "RETRY", "", "", rawData, errors)
return pb.returnOutputModel(result, returnData, "", "", rawData, errors)
D3 Python Coding Conventions
Follow camelCase to create method name.
Do not use print function, or any other function that outputs to the standard output.
Do not define a class named PlaybookRunTime. It is reserved for D3 playbook engine.
Do not define a class named APIError. It is reserved for D3 playbook engine.
Do not define a class named HTTPAPIError. It is reserved for D3 playbook engine.
Do not define a variable or object named pb. It is reserved for D3 playbook engine.
Do not define a variable or object named args. It is reserved for D3 playbook engine.
Do not define a variable or object named runtime. It is reserved for D3 playbook engine.
Do not define a variable or object named input. It is reserved for D3 playbook engine.
Do not use the different method name from the command name in the command script.
Do not import a library which is neither Python standard library nor D3 3rd party library.
Debugging and testing
Use pb.log()
to log data during command execution.
Logged data will be displayed in command testing, and in playbook runtime while testing a playbook.
Logged data will be saved even if the command exits due to an exception.
Use the traceback library to get the full stack trace of exceptions.
Stack traces will match the line number of the command script if a command exits due to an exception.
Use
traceback.format_exc()
to get full stack trace during exception handling.
D3 Python Library
Global Runtime Variables
Integration command only
runtime["connector"]
{
"custom_passwordvaultconnection": "",
"schedulehealthchecking": "False",
"schedulehealthcheckinginterval": "5"
-- any defined connection paramater will also be here --
}
runtime["retrycount"]
return the number of the retry count
Command execution helper functions
In your custom python script, you can also use the D3 out-of-box System Utility Command or Integration Command.
To execute Utility Commands:
Follow this format:
D3.Utility.{Command Name}<{Command Type}>(parameters)
def Demo():
return D3.Utility.concatToRear <Text>("Join ", "Text")
Please be aware that Command Name is the internal name of the command.
Command Type is the first parameter type of the command. Here is the possible parameter data type:
Text
Number
DateTime
Boolean
Text Array
Number Array
DateTime Array
Boolean Array
JSON Array
JSON Object
Unknown
It will return our D3 Outputmodel
To execute Integration Commands:
Similar to the format of executing a Utility Command. Here is the format for executing Integration Command: D3.Integration.{Integration Name}.{Command Name}(parameters)
.
Note that data type is not necessary
The limitations are:
Only custom Integration Commands can execute other Integration Commands that belong to the same Integration
Custom Utility Commands cannot execute any Integration Commands.
sample ipReputationCheck()
is a system command. sample fetchEvent is a custom command:
def ipReputationCheck():
return pb.returnOutputModel('', '', '', '', {"EventRawData": "rawData"}, '')
def fetchEvent():
return D3.Integration.Demo_Integration.ipReputationCheck()
General helper functions:
Each helper function can be used with pb.{function name}
isJson(jsonString)
return
True
orFalse
if it is JSONParameters:
jsonString:
the JSON string to validate
Sample Input: pb.isJson('{"Simple":"Simple JSON"}')
Sample Output: True
log(message)
Log any message to show in the Custom Log tab. The tab will only show when testing command
Sample Input: pb.log("Debug Line")
Sample Output:
returnOutputModel
( result
, returnData
, outputData
, contexData
, rawData
, error
, passdownData={}
)
It will generate our D3 output model. It is recommended to be used in writing custom commands.
Parameters:
result
: HTML formatted data displayed in Result tabreturnData
: Simple data that can be directly used by the next commandsoutputData
: Should be left empty. This will be automatically generated using Key Fields configurationcontextData
: Contextual data to be shared with other tasksrawData
: Raw data from the commanderror
: Error detailspassdownData
: A Event Intake and Incident Intake related field that can pass down parameter values to the next scheduled instance.
Sample Input:
pb.returnOutputModel("<body><h1>Sample Result Data</h1></body>", "Sample Return Data", "", {"SampleContextData": "ContextData"}, {"SampleRawData": "Rawdata"}, "Sample Error", passdownData = {})
Sample Output:
{
"result": {
"description": "<body><h1>Sample Result Data</h1></body>",
"references": [],
"actions": []
},
"returnData": "Sample Return Data",
"outputData": "",
"contextData": {
"SampleContextData": "ContextData"
},
"rawData": {
"SampleRawData": "Rawdata"
},
"error": "Sample Error",
"passdownData": {},
"customLog": "",
"others": {}
}
uploadFile(file)
Upload the file to D3 playbook file table
Parameters:
File JSON object to be uploaded to the file table. Format as follows:
{"file": (filename,content)}
.content
is binary data of the file.
Sample Input: pb.uploadFile({"file": ("Sample File","U2FtcGxlIFRleHQ=")})
Sample Output:
{
"fileId": "1993",
"fileName": "Sample File",
"md5": "91BF9C6701F7107E4010735EAE0ED2D5",
"sha1": "07C6F613BD4398515A56C4B72C793967FD7B5F8C",
"sha256": "3E1A4B0BC62756ED64CA6073F3A62FA6ADE4B513AE4E95ED6DA54754CE7C8E2A"
}
downloadFile
(fileid
, filesource
) > IR_ATCHMNT,
PB_FILE
, KC_AF_FILE
Download the file from D3 file tables
Parameters:
fileid
: the file ID in the file tablefilesource
: one of the three D3 file table:IR_ATCHMNT
: Incident attachment filesPB_FILE
: Playbook filesKC_AF_FILE
: Artifact files
Sample Input: pb.downloadFile("1993", "PB_FILE") ('Sample File', b'U2FtcGxlIFRleHQ=')
Sample Output: ('Sample File', b'U2FtcGxlIFRleHQ=')
Note that it works best paired with the decode function
Sample Input: filename, filecontent = pb.downloadFile("1993", "PB_FILE") filecontent.decode("utf-8")
Sample Output: Sample Text
ConvertJsonObjectArrayToHTMLString(inputArray, showHeader=True, isCol="col")
Format the JSON Object array to an HTML table string to be put into the Result field in the D3 outputmodel.
Parameters:
inputArray
: the JSON object array to be convertedshowHeader
: whether to show the key name as header in the table. Default value is True. Can be set to False.isCol
: whether to format the table in a column style. Default value is "col". Can be "row"
Sample Input:
pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }])
Sample Output
<table class='cc-table horizontal-table'><tr><th>fieldName</th><th>value</th><th>displayName</th></tr><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>
Sample Input:
pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], False)
Sample Output:
<table class='cc-table horizontal-table'><tr><td>EventKey</td><td>9635</td><td>Unique Event Key</td></tr><tr><td>EventName</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Event name</td></tr><tr><td>Severity</td><td>Medium</td><td>Severity</td></tr><tr><td>Status</td><td>New</td><td>Status</td></tr></table>
Sample Input:
pb.ConvertJsonObjectArrayToHTMLString([{ "fieldName": "EventKey", "value": "9635", "displayName": "Unique Event Key" }, { "fieldName": "EventName", "value": "741fc933-0846-4f24-9cfb-6dc0d0432212", "displayName": "Event name" }, { "fieldName": "Severity", "value": "Medium", "displayName": "Severity" }, { "fieldName": "Status", "value": "New", "displayName": "Status" }], True, "row")
Sample Output:
<table class='cc-table vertical-table'><tr><th>fieldName</th><td>EventKey</td><td>EventName</td><td>Severity</td><td>Status</td></tr><tr><th>value</th><td>9635</td><td>741fc933-0846-4f24-9cfb-6dc0d0432212</td><td>Medium</td><td>New</td></tr><tr><th>displayName</th><td>Unique Event Key</td><td>Event name</td><td>Severity</td><td>Status</td></tr></table>