workbench.workers package
Submodules
workbench.workers.evel_knievel_all module
EvelKnievelAll worker
-
class workbench.workers.evel_knievel_all.EvelKnievelAll[source]
Bases: object
This worker depends on two workers that throw TypeError and
KeyError Exceptions. Good test case as the dependencies will
sometimes both work, randomly fail individually and sometimes
both of the them will fail, it’s a fail fest!
Initialization
-
dependencies = ['evel_knievel_key', 'evel_knievel_type']
-
execute(input_data)[source]
This worker depends on two workers that throw TypeError and KeyError Exceptions
workbench.workers.evel_knievel_key module
EvelKnievelKey worker
-
class workbench.workers.evel_knievel_key.EvelKnievelKey[source]
Bases: object
This worker pseudo-randomly throws a KeyError Exception. The
pseudo-random part is that the logic is deterministic given a pile
of md5s about 8% will fail but it will always be the same ones
Initialization
-
dependencies = ['meta']
-
execute(input_data)[source]
This worker pseudo-randomly throws a KeyError Exception.
workbench.workers.evel_knievel_type module
EvelKnievelType worker
-
class workbench.workers.evel_knievel_type.EvelKnievelType[source]
Bases: object
This worker pseudo-randomly throws a TypeError Exception. The
pseudo-random part is that the logic is deterministic given a pile
of md5s about 7% will fail but it will always be the same ones
Initialization
-
dependencies = ['meta']
-
execute(input_data)[source]
This worker pseudo-randomly throws a TypeError Exception.
workbench.workers.help_base module
HelpBase worker
-
class workbench.workers.help_base.HelpBase[source]
Bases: object
This worker computes help for any ‘info’ object
-
dependencies = ['info']
-
execute(input_data)[source]
Info objects all have a type_tag of (‘help’,’worker’,’command’, or ‘other’)
-
workbench.workers.help_base.test()[source]
help.py: Unit test
workbench.workers.mem_connscan module
workbench.workers.mem_dlllist module
workbench.workers.mem_meta module
workbench.workers.mem_procdump module
workbench.workers.mem_pslist module
workbench.workers.pcap_bro module
PcapBro worker
-
workbench.workers.pcap_bro.gsleep()[source]
Convenience method for gevent.sleep
-
class workbench.workers.pcap_bro.PcapBro[source]
Bases: object
This worker runs Bro scripts on a pcap file
-
dependencies = ['sample']
-
sample_set_input = True
-
setup_pcap_inputs(input_data)[source]
Write the PCAPs to disk for Bro to process and return the pcap filenames
-
execute(input_data)[source]
Execute
-
subprocess_manager(exec_args)[source]
Bro subprocess manager
-
goto_temp_directory(*args, **kwds)[source]
-
__del__()[source]
Class Cleanup
-
workbench.workers.pcap_bro.test()[source]
pcap_bro.py: Unit test
workbench.workers.pcap_graph module
pcap_graph worker
-
workbench.workers.pcap_graph.gsleep()[source]
Convenience method for gevent.sleep
-
class workbench.workers.pcap_graph.PcapGraph[source]
Bases: object
This worker generates a graph from a PCAP (depends on Bro)
Initialization
-
dependencies = ['pcap_bro']
-
add_node(node_id, name, labels)[source]
Cache aware add_node
-
add_rel(source_id, target_id, rel)[source]
Cache aware add_rel
-
execute(input_data)[source]
Okay this worker is going build graphs from PCAP Bro output logs
-
conn_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro conn.log)
-
http_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro http.log)
-
dns_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro dns.log)
-
weird_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro weird.log)
-
files_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro files.log)
-
__del__()[source]
Class Cleanup
-
workbench.workers.pcap_graph.test()[source]
pcap_graph.py: Unit test
workbench.workers.pcap_http_graph module
pcap_http_graph worker
-
workbench.workers.pcap_http_graph.gsleep()[source]
Convenience method for gevent.sleep
-
class workbench.workers.pcap_http_graph.PcapHTTPGraph[source]
Bases: object
This worker generates a graph from a PCAP (depends on Bro)
Initialization
-
dependencies = ['pcap_bro']
-
add_node(node_id, name, labels)[source]
Cache aware add_node
-
add_rel(source_id, target_id, rel)[source]
Cache aware add_rel
-
execute(input_data)[source]
Okay this worker is going build graphs from PCAP Bro output logs
-
http_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro http.log)
-
weird_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro weird.log)
-
files_log_graph(stream)[source]
Build up a graph (nodes and edges from a Bro dns.log)
-
__del__()[source]
Class Cleanup
-
workbench.workers.pcap_http_graph.test()[source]
pcap_http_graph.py: Unit test
workbench.workers.pe_classifier module
PEClassifier worker (just a placeholder, not a real classifier at this point)
-
class workbench.workers.pe_classifier.PEClassifier[source]
Bases: object
This worker classifies PEFiles as Evil or AOK (TOY not a real classifier at this point)
Initialization
-
dependencies = ['pe_features', 'pe_indicators']
-
execute(input_data)[source]
This worker classifies PEFiles as Evil or AOK (TOY not a real classifier at this point)
-
workbench.workers.pe_classifier.test()[source]
pe_classifier.py: Unit test
workbench.workers.pe_deep_sim module
PE SSDeep Similarity worker
-
class workbench.workers.pe_deep_sim.PEDeepSim[source]
Bases: object
This worker computes fuzzy matches between samples with ssdeep
-
dependencies = ['meta_deep']
-
execute(input_data)[source]
Execute method
-
__del__()[source]
Class Cleanup
-
workbench.workers.pe_deep_sim.test()[source]
pe_deep_sim.py: Unit test
workbench.workers.pe_features module
PE Features worker. This class pulls static features
out of a PE file using the python pefile module.
-
class workbench.workers.pe_features.PEFileWorker(verbose=False)[source]
Bases: object
Create instance of PEFileWorker class. This class pulls static
features out of a PE file using the python pefile module.
Init method
-
dependencies = ['sample', 'tags']
-
execute(input_data)[source]
Process the input bytes with pefile
-
set_dense_features(dense_feature_list)[source]
Set the dense feature list that the Python pefile module should extract.
This is really just sanity check functionality, meaning that these
are the features you are expecting to get, and a warning will spit
out if you don’t get some of these.
-
get_dense_features()[source]
Set the dense feature list that the Python pefile module should extract.
-
set_sparse_features(sparse_feature_list)[source]
Set the sparse feature list that the Python pefile module should extract.
This is really just sanity check functionality, meaning that these
are the features you are expecting to get, and a warning will spit
out if you don’t get some of these.
-
get_sparse_features()[source]
Set the sparse feature list that the Python pefile module should extract.
-
static open_using_pefile(input_name, input_bytes)[source]
Open the PE File using the Python pefile module.
Process the PE File using the Python pefile module.
-
workbench.workers.pe_features.convert_to_utf8(string)[source]
Convert string to UTF8
-
workbench.workers.pe_features.convert_to_ascii_null_term(string)[source]
Convert string to Null terminated ascii
-
workbench.workers.pe_features.test()[source]
pe_features.py: Test
workbench.workers.pe_indicators module
This python class codifies a bunch of rules around suspicious static
features in a PE File. The rules don’t indicate malicious behavior
they simply flag things that may be used by a malicious binary.
Many of the indicators used were inspired by the material in the
‘Practical Malware Analysis’ book by Sikorski and Honig,
ISBN-13: 978-1593272906 (available on Amazon :)
Description:
PE_WARNINGS = PE module warnings verbatim
MALFORMED = the PE file is malformed
COMMUNICATION = network activities
CREDENTIALS = activities associated with elevating or attaining new privileges
KEYLOGGING = activities associated with keylogging
SYSTEM_STATE = file system or registry activities
SYSTEM_PROBE = getting information from the local system (file system, OS config)
SYSTEM_INTEGRITY = compromises the security state of the local system
PROCESS_MANIPULATION = indicators associated with process manipulation/injection
PROCESS_SPAWN = indicators associated with creating a new process
STEALTH_LOAD = indicators associated with loading libraries, resources, etc in a sneaky way
ENCRYPTION = any indicators related to encryption
COM_SERVICES = COM functionality or running as a service
ANTI_DEBUG = anti-debugging indicators
-
class workbench.workers.pe_indicators.PEIndicators[source]
Bases: object
Create instance of Indicators class. This class uses the
static features from the pefile module to look for weird stuff.
Note: All methods that start with ‘check’ will be automatically
included as part of the checks that happen when ‘execute’ is called.
Init method of the Indicators class.
-
dependencies = ['sample']
-
execute(input_data)[source]
Execute the PEIndicators worker
-
check_corrupted_imports()[source]
Various ways the imports table might be corrupted.
-
check_checksum_is_zero()[source]
Checking for a checksum of zero
-
check_checksum_mismatch()[source]
Checking for a checksum that doesn’t match the generated checksum
-
check_empty_section_name()[source]
Checking for an empty section name
-
check_nonstandard_section_name()[source]
Checking for an non-standard section name
-
check_image_size_incorrect()[source]
Checking if the reported image size matches the actual image size
Checking if pefile module reported overlapping header
-
check_section_unaligned()[source]
Checking if any of the sections are unaligned
-
check_section_oversized()[source]
Checking if any of the sections go past the total size of the image
-
check_dll_with_no_exports()[source]
Checking if the PE is a DLL with no exports
-
check_communication_imports()[source]
Checking if the PE imports known communication methods
-
check_elevating_privs_imports()[source]
Checking if the PE imports known methods associated with elevating or attaining new privileges
-
check_keylogging_imports()[source]
Checking if the PE imports known methods associated with elevating or attaining new privileges
-
check_system_state_imports()[source]
Checking if the PE imports known methods associated with changing system state
-
check_system_probe_imports()[source]
Checking if the PE imports known methods associated with probing the system
-
check_system_integrity_imports()[source]
Checking if the PE imports known methods associated with system security or integrity
-
check_crypto_imports()[source]
Checking if the PE imports known methods associated with encryption
-
check_anti_debug_imports()[source]
Checking if the PE imports known methods associated with anti-debug
-
check_com_service_imports()[source]
Checking if the PE imports known methods associated with COM or services
-
check_process_manipulation()[source]
Checking if the PE imports known methods associated with process manipulation/injection
-
check_process_spawn()[source]
Checking if the PE imports known methods associated with spawning a new process
-
check_stealth_load()[source]
Checking if the PE imports known methods associated with loading libraries, resources, etc in a sneaky way
-
check_invalid_entry_point()[source]
Checking the PE File warning for an invalide entry point
-
check_exports()[source]
This is just a stub function right now, might be useful later
-
workbench.workers.pe_indicators.convert_to_ascii_null_term(string)[source]
Convert string to null terminated ascii string
-
workbench.workers.pe_indicators.test()[source]
pe_indicators.py: Unit test
workbench.workers.pe_peid module
PE peid worker, uses the peid_userdb.txt database of signatures
-
workbench.workers.pe_peid.get_peid_db()[source]
Grab the peid_userdb.txt file from local disk
-
class workbench.workers.pe_peid.PEIDWorker[source]
Bases: object
This worker looks up pe_id signatures for a PE file.
-
dependencies = ['sample']
-
execute(input_data)[source]
Execute the PEIDWorker
-
peid_features(pefile_handle)[source]
Get features from PEid signature database
-
workbench.workers.pe_peid.test()[source]
pe_peid.py: Unit test
workbench.workers.strings module
Strings worker
-
class workbench.workers.strings.Strings[source]
Bases: object
This worker extracts all the strings from any type of file
Initialize the Strings worker
-
dependencies = ['sample']
-
execute(input_data)[source]
Execute the Strings worker
-
workbench.workers.strings.test()[source]
strings.py: Unit test
workbench.workers.unzip module
Unzip worker
-
class workbench.workers.unzip.Unzip[source]
Bases: object
This worker unzips a zipped file
-
dependencies = ['sample']
-
execute(input_data)[source]
Execute the Unzip worker
-
__del__()[source]
Class Cleanup
-
workbench.workers.unzip.test()[source]
unzip.py: Unit test
workbench.workers.url module
URLS worker: Tries to extract URL from strings output
-
class workbench.workers.url.URLS[source]
Bases: object
This worker looks for url patterns in strings output
Initialize the URL worker
-
dependencies = ['strings']
-
execute(input_data)[source]
Execute the URL worker
-
workbench.workers.url.test()[source]
url.py: Unit test
workbench.workers.view module
view worker
-
class workbench.workers.view.View[source]
Bases: object
View: Generates a view for any file type
-
dependencies = ['meta']
-
execute(input_data)[source]
-
__del__()[source]
Class Cleanup
-
workbench.workers.view.test()[source]
view.py: Unit test
workbench.workers.view_customer module
view_customer worker
-
class workbench.workers.view_customer.ViewCustomer[source]
Bases: object
ViewCustomer: Generates a customer usage view.
-
dependencies = ['meta']
-
execute(input_data)[source]
Execute Method
-
workbench.workers.view_customer.test()[source]
view_customer.py: Unit test
workbench.workers.view_deep module
view_deep worker
-
class workbench.workers.view_deep.ViewDeep[source]
Bases: object
ViewDeep: Generates a view_deep for any file type
-
dependencies = ['meta']
-
execute(input_data)[source]
-
__del__()[source]
Class Cleanup
-
workbench.workers.view_deep.test()[source]
view_deep.py: Unit test
workbench.workers.view_memory module
view_memory worker
-
class workbench.workers.view_memory.ViewMemory[source]
Bases: object
ViewMemory: Generates a view for meta data on the sample
-
dependencies = ['mem_connscan', 'mem_meta', 'mem_procdump', 'mem_pslist']
-
execute(input_data)[source]
Execute the ViewMemory worker
-
static file_to_pid(filename)[source]
-
workbench.workers.view_memory.test()[source]
view_memory.py: Unit test
workbench.workers.view_memory_deep module
view_memory_deep worker
-
class workbench.workers.view_memory_deep.ViewMemoryDeep[source]
Bases: object
ViewMemoryDeep: Generates a view for meta data on the sample
-
dependencies = ['view_memory', 'mem_connscan', 'mem_meta', 'mem_procdump', 'mem_pslist']
-
execute(input_data)[source]
Execute the ViewMemoryDeep worker
-
workbench.workers.view_memory_deep.test()[source]
view_memory_deep.py: Unit test
workbench.workers.view_pcap module
view_pcap worker
-
class workbench.workers.view_pcap.ViewPcap[source]
Bases: object
ViewPcap: Generates a view for a pcap sample (depends on Bro)
-
dependencies = ['pcap_bro']
-
execute(input_data)[source]
Execute
-
__del__()[source]
Class Cleanup
-
workbench.workers.view_pcap.test()[source]
view_pcap.py: Unit test
workbench.workers.view_pcap_deep module
view_pcap_deep worker
-
class workbench.workers.view_pcap_deep.ViewPcapDeep[source]
Bases: object
ViewPcapDeep: Generates a view for a pcap sample (depends on Bro)
Initialization of ViewPcapDeep
-
dependencies = ['view_pcap']
-
execute(input_data)[source]
ViewPcapDeep execute method
-
__del__()[source]
Class Cleanup
-
workbench.workers.view_pcap_deep.test()[source]
view_pcap_deep.py: Unit test
workbench.workers.view_pdf module
view_pdf worker
-
class workbench.workers.view_pdf.ViewPDF[source]
Bases: object
ViewPDF: Generates a view for PDF files
-
dependencies = ['meta', 'strings']
-
execute(input_data)[source]
Execute the ViewPDF worker
-
workbench.workers.view_pdf.test()[source]
‘ view_pdf.py: Unit test
workbench.workers.view_pdf_deep module
view_pdf_deep worker
-
class workbench.workers.view_pdf_deep.ViewPDFDeep[source]
Bases: object
ViewPDFDeep: Generates a view for PDF files
-
dependencies = ['meta', 'strings']
-
execute(input_data)[source]
Execute the ViewPDFDeep worker
-
workbench.workers.view_pdf_deep.test()[source]
‘ view_pdf_deep.py: Unit test
workbench.workers.view_pe module
view_pe worker
-
class workbench.workers.view_pe.ViewPE[source]
Bases: object
Generates a high level summary view for PE files that incorporates a large set of workers
-
dependencies = ['meta', 'strings', 'pe_peid', 'pe_indicators', 'pe_classifier', 'yara_sigs']
-
execute(input_data)[source]
Execute the ViewPE worker
-
static safe_get(data, key_list)[source]
Safely access dictionary keys when plugin may have failed
-
workbench.workers.view_pe.test()[source]
view_pe.py: Unit test
workbench.workers.view_pe_deep module
view_pe_deep worker
-
class workbench.workers.view_pe_deep.ViewPEDeep[source]
Bases: object
Generates a high level summary view for PE files that incorporates a large set of workers
-
dependencies = ['view_pe', 'pe_indicators']
-
execute(input_data)[source]
Execute the ViewPEDeep worker
-
workbench.workers.view_pe_deep.test()[source]
view_pe_deep.py: Unit test
workbench.workers.view_swf module
view_swf worker
-
class workbench.workers.view_swf.ViewSWF[source]
Bases: object
ViewSWF: Generates a view for SWF files
-
dependencies = ['swf_meta', 'strings']
-
execute(input_data)[source]
Execute the ViewSWF worker
-
workbench.workers.view_swf.test()[source]
‘ view_swf.py: Unit test
workbench.workers.view_swf_deep module
view_swf_deep worker
-
class workbench.workers.view_swf_deep.ViewSWFDeep[source]
Bases: object
ViewSWFDeep: Generates a view for SWF files
-
dependencies = ['view_swf']
-
execute(input_data)[source]
Execute the ViewSWFDeep worker
-
workbench.workers.view_swf_deep.test()[source]
‘ view_swf_deep.py: Unit test
workbench.workers.view_zip module
view_zip worker
-
class workbench.workers.view_zip.ViewZip[source]
Bases: object
ViewZip: Generates a view for Zip files
-
dependencies = ['meta', 'unzip', 'yara_sigs']
-
execute(input_data)[source]
Execute the ViewZip worker
-
__del__()[source]
Class Cleanup
-
workbench.workers.view_zip.test()[source]
– view_zip.py test –
workbench.workers.view_zip_deep module
view_zip_deep worker
-
class workbench.workers.view_zip_deep.ViewZipDeep[source]
Bases: object
ViewZipDeep: Generates a view for Zip files
-
dependencies = ['view_zip']
-
execute(input_data)[source]
Execute the ViewZipDeep worker
-
__del__()[source]
Class Cleanup
-
workbench.workers.view_zip_deep.test()[source]
– view_zip_deep.py test –
workbench.workers.vt_query module
VTQuery worker
-
class workbench.workers.vt_query.VTQuery[source]
Bases: object
This worker query Virus Total, an apikey needs to be provided
VTQuery Init
-
dependencies = ['meta']
-
execute(input_data)[source]
Execute the VTQuery worker
-
workbench.workers.vt_query.test()[source]
– vt_query.py test –
workbench.workers.yara_sigs module
Yara worker
-
workbench.workers.yara_sigs.get_rules_from_disk()[source]
Recursively traverse the yara/rules directory for rules
-
class workbench.workers.yara_sigs.YaraSigs[source]
Bases: object
This worker check for matches against yara sigs.
Output keys: [matches:list of matches]
-
dependencies = ['sample']
-
execute(input_data)[source]
yara worker execute method
-
workbench.workers.yara_sigs.test()[source]
yara_sigs.py: Unit test
Module contents
Workbench Workers