Skip to main content

streams

Binary Data

The 'content' stream returned by job.get_data() supports methods such as read(), seek(), tell().

# Check this file should be processed the file
# this prevents all files travelling over the network (which is terrible if you are processing 200/s)
if b"12345" != job.get_data().read(5):
return status.OPT_OUT
# reset pointer (important!)
job.get_data().seek(0)
# read all bytes
main = job.get_data().read(0)

Tags

Data streams have a tags property, which is a dict of key:value pairs describing the data stream.

Currently, the tags will contain at least 'sha1', 'sha256', 'md5', 'mime', 'magic' (file output), and 'file_format_legacy' (categorised type of the file such as 'RAR' or 'Win32 DLL').

Non-Content Streams

There may be some situations where you want to process non-'content' streams, or specific behaviour for different file types.

The Azul framework identifies the file type of data streams and allows you to filter on them.

The 'file_format_legacy' is intended to match with the types from virustotal. Types are listed in identify.py in azul-bedrock.

class MyPlugin(BinaryPlugin):
SETTINGS = add_settings(
filter_data_types={"content": ["executable/windows/dos"],'blob': ['archive/gzip', 'archive/bzip2']},
)

def execute(self, job: Job):
# get_data returns None if no stream has matching label
single = job.get_data(label="invalid")
# get_data cannot match on specific file_format_legacy
single = job.get_data(label=azm.DataLabel.CONTENT)
# get_data raises Exception if multiple streams have matching label
blobby = job.get_data(label=azm.DataLabel.TEXT)
# get_all_data returns a list of streams with matching label and/or file_format_legacy
files = job.get_all_data(file_format="executable/windows/dos")
blobs = job.get_all_data(label=azm.DataLabel.TEXT)
gzips = job.get_all_data(file_format="archive/gzip")
bzips = job.get_all_data(file_format="archive/bzip2", label=azm.DataLabel.TEXT)
class FilteredLookForMZ(BinaryPlugin):
SETTINGS = add_settings(
filter_data_types={azm.DataLabel.CONTENT: ['executable/windows/pe32', 'executable/windows/dos', 'executable/windows/dll32']},
)

def execute(self, job: Job):
mz_thing = job.get_data()

Text reports

It is common to generate a text report or tool output log.

Content added this way will be shown in the Azul UI.

def execute(job: Job):
raw_tool_result = run_tool_subprocess(
data,
)
try:
tool_result = raw_tool_result.decode('cp1252')
except UnicodeDecodeError:
tool_result = 'decoding error: ...'
# Do some error handling
self.add_text(tool_result)

You can also set the 'language' parameter if the output is source code of some kind. This should be a language name supported by prism.js.

Examples 'html', 'js'/'javascript', 'bash'/'shell', 'c', 'dotnet', 'php', 'go', 'powershell', 'python', 'regex', 'vb'/'

def execute(job: Job):
self.add_text("eval(var5);", language="js")

Data

You may wish to add some binary data as part of your output.

This could be something like a pcap or jpg.

def execute(job: Job):
pcap: bytes = run_and_get_me_a_pcap(job.get_data().read())
self.add_data(label=azm.DataLabel.PCAP, tags={}, data=pcap)