cbeed6396f
Merge commit '6fa8d51479e9a5542c67bec715a1f68e7ed057ba'
309 lines
12 KiB
Python
309 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
import re
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import functools
|
|
from struct import pack, unpack
|
|
from binary_reader import BinaryReader
|
|
from binary_writer import BinaryWriter
|
|
from subprocess import Popen, PIPE
|
|
from zydis_encoder_types import *
|
|
|
|
|
|
def to_json(obj):
|
|
return json.dumps(obj, indent=4)
|
|
|
|
|
|
def get_width_from_enum(enum_value):
|
|
return enum_value.name[enum_value.name.rfind('_') + 1:]
|
|
|
|
|
|
def get_enum_max(enum_class):
|
|
if issubclass(enum_class, IntEnum):
|
|
return max([v.value for v in enum_class])
|
|
elif issubclass(enum_class, IntFlag):
|
|
return functools.reduce(lambda x, y: x | y, [v.value for v in enum_class])
|
|
else:
|
|
raise RuntimeError('Invalid type passed to get_enum_max: ' + enum_class.__name__)
|
|
|
|
|
|
def get_sanitized_enum(reader, enum_class):
|
|
raw_value = reader.read_uint32()
|
|
if issubclass(enum_class, IntEnum):
|
|
return enum_class(raw_value % (get_enum_max(enum_class) + 1))
|
|
elif issubclass(enum_class, IntFlag):
|
|
return enum_class(raw_value & get_enum_max(enum_class))
|
|
else:
|
|
raise RuntimeError('Invalid type passed to get_sanitized_enum: ' + enum_class.__name__)
|
|
|
|
|
|
def get_decomposed_flags(combined_flags):
|
|
enum_class = type(combined_flags)
|
|
flag_str = '|'.join([v.name for v in enum_class if v in combined_flags and v.value != 0])
|
|
if flag_str == '':
|
|
return enum_class(0).name
|
|
return flag_str
|
|
|
|
|
|
def get_combined_flags(flag_str, enum_class):
|
|
return functools.reduce(lambda x, y: x | y, [enum_class[v] for v in flag_str.split('|')])
|
|
|
|
|
|
def get_disasm(zydis_info, machine_mode, stack_width, payload):
|
|
if not zydis_info:
|
|
return ''
|
|
arg_machine_mode = '-' + get_width_from_enum(machine_mode)
|
|
arg_stack_width = '-' + get_width_from_enum(stack_width)
|
|
proc = Popen([zydis_info, arg_machine_mode, arg_stack_width, payload[:30]], stdout=PIPE, stderr=PIPE)
|
|
out = proc.communicate()[0].decode('utf-8')
|
|
if proc.returncode != 0:
|
|
return ''
|
|
match = re.search(r"INTEL[^A]+ABSOLUTE: ([^\r\n]+)", out)
|
|
return match.group(1)
|
|
|
|
|
|
def convert_enc_crash_to_json(crash, return_dict=False):
|
|
reader = BinaryReader(crash)
|
|
machine_mode = get_sanitized_enum(reader, ZydisMachineMode)
|
|
allowed_encoding = get_sanitized_enum(reader, ZydisEncodableEncoding)
|
|
mnemonic = get_sanitized_enum(reader, ZydisMnemonic)
|
|
reader.read_bytes(4)
|
|
prefixes = ZydisInstructionAttributes(reader.read_uint64() & ZYDIS_ENCODABLE_PREFIXES)
|
|
branch_type = get_sanitized_enum(reader, ZydisBranchType)
|
|
branch_width = get_sanitized_enum(reader, ZydisBranchWidth)
|
|
address_size_hint = get_sanitized_enum(reader, ZydisAddressSizeHint)
|
|
operand_size_hint = get_sanitized_enum(reader, ZydisOperandSizeHint)
|
|
operand_count = reader.read_uint8() % (ZYDIS_ENCODER_MAX_OPERANDS + 1)
|
|
reader.read_bytes(7)
|
|
operands = []
|
|
for i in range(ZYDIS_ENCODER_MAX_OPERANDS):
|
|
if i >= operand_count:
|
|
reader.read_bytes(SIZE_OF_ZYDIS_ENCODER_OPERAND)
|
|
continue
|
|
|
|
op_type = ZydisOperandType(ZydisOperandType.ZYDIS_OPERAND_TYPE_REGISTER.value +
|
|
(reader.read_uint32() % get_enum_max(ZydisOperandType)))
|
|
reg_value = get_sanitized_enum(reader, ZydisRegister)
|
|
reg_is4 = bool(reader.read_uint8())
|
|
reader.read_bytes(7)
|
|
mem_base = get_sanitized_enum(reader, ZydisRegister)
|
|
mem_index = get_sanitized_enum(reader, ZydisRegister)
|
|
mem_scale = reader.read_uint8()
|
|
reader.read_bytes(7)
|
|
mem_displacement = reader.read_int64()
|
|
mem_size = reader.read_uint16()
|
|
reader.read_bytes(6)
|
|
ptr_segment = reader.read_uint16()
|
|
reader.read_bytes(2)
|
|
ptr_offset = reader.read_uint32()
|
|
imm = reader.read_uint64()
|
|
op = {'operand_type': op_type.name}
|
|
if op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_REGISTER:
|
|
op['reg'] = {
|
|
'value': reg_value.name,
|
|
'is4': reg_is4,
|
|
}
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_MEMORY:
|
|
op['mem'] = {
|
|
'base': mem_base.name,
|
|
'index': mem_index.name,
|
|
'scale': mem_scale,
|
|
'displacement': str(mem_displacement),
|
|
'size': mem_size,
|
|
}
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_POINTER:
|
|
op['ptr'] = {
|
|
'segment': ptr_segment,
|
|
'offset': ptr_offset,
|
|
}
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_IMMEDIATE:
|
|
op['imm'] = {
|
|
'value': str(imm)
|
|
}
|
|
else:
|
|
raise RuntimeError('Invalid operand type: ' + op_type.name)
|
|
operands.append(op)
|
|
evex_broadcast = get_sanitized_enum(reader, ZydisBroadcastMode)
|
|
evex_rounding = get_sanitized_enum(reader, ZydisRoundingMode)
|
|
evex_sae = bool(reader.read_uint8())
|
|
evex_zeroing_mask = bool(reader.read_uint8())
|
|
reader.read_bytes(2)
|
|
mvex_broadcast = get_sanitized_enum(reader, ZydisBroadcastMode)
|
|
mvex_conversion = get_sanitized_enum(reader, ZydisConversionMode)
|
|
mvex_rounding = get_sanitized_enum(reader, ZydisRoundingMode)
|
|
mvex_swizzle = get_sanitized_enum(reader, ZydisSwizzleMode)
|
|
mvex_sae = bool(reader.read_uint8())
|
|
mvex_eviction_hint = bool(reader.read_uint8())
|
|
reader.read_bytes(2)
|
|
test_case = {
|
|
'machine_mode': machine_mode.name,
|
|
'allowed_encodings': get_decomposed_flags(allowed_encoding),
|
|
'mnemonic': mnemonic.name,
|
|
'prefixes': get_decomposed_flags(prefixes),
|
|
'branch_type': branch_type.name,
|
|
'branch_width': branch_width.name,
|
|
'address_size_hint': address_size_hint.name,
|
|
'operand_size_hint': operand_size_hint.name,
|
|
'operands': operands,
|
|
'evex': {
|
|
'broadcast': evex_broadcast.name,
|
|
'rounding': evex_rounding.name,
|
|
'sae': evex_sae,
|
|
'zeroing_mask': evex_zeroing_mask,
|
|
},
|
|
'mvex': {
|
|
'broadcast': mvex_broadcast.name,
|
|
'conversion': mvex_conversion.name,
|
|
'rounding': mvex_rounding.name,
|
|
'swizzle': mvex_swizzle.name,
|
|
'sae': mvex_sae,
|
|
'eviction_hint': mvex_eviction_hint,
|
|
},
|
|
}
|
|
if return_dict:
|
|
return test_case
|
|
return to_json(test_case)
|
|
|
|
|
|
def convert_re_enc_crash_to_json(crash, zydis_info, return_dict=False):
|
|
reader = BinaryReader(crash)
|
|
machine_mode = ZydisMachineMode(reader.read_uint32())
|
|
stack_width = ZydisStackWidth(reader.read_uint32())
|
|
payload = reader.read_bytes().hex().upper()
|
|
test_case = {
|
|
'machine_mode': machine_mode.name,
|
|
'stack_width': stack_width.name,
|
|
'payload': payload,
|
|
'description': get_disasm(zydis_info, machine_mode, stack_width, payload),
|
|
}
|
|
if return_dict:
|
|
return test_case
|
|
return to_json(test_case)
|
|
|
|
|
|
def convert_enc_json_to_crash(test_case_json, from_dict=False):
|
|
if from_dict:
|
|
test_case = test_case_json
|
|
else:
|
|
test_case = json.loads(test_case_json)
|
|
writer = BinaryWriter()
|
|
writer.write_uint32(ZydisMachineMode[test_case['machine_mode']])
|
|
writer.write_uint32(get_combined_flags(test_case['allowed_encodings'], ZydisEncodableEncoding))
|
|
writer.write_uint32(ZydisMnemonic[test_case['mnemonic']])
|
|
writer.write_padding(4)
|
|
writer.write_uint64(get_combined_flags(test_case['prefixes'], ZydisInstructionAttributes))
|
|
writer.write_uint32(ZydisBranchType[test_case['branch_type']])
|
|
writer.write_uint32(ZydisBranchWidth[test_case['branch_width']])
|
|
writer.write_uint32(ZydisAddressSizeHint[test_case['address_size_hint']])
|
|
writer.write_uint32(ZydisOperandSizeHint[test_case['operand_size_hint']])
|
|
operand_count = len(test_case['operands'])
|
|
writer.write_uint8(operand_count)
|
|
writer.write_padding(7)
|
|
for i in range(ZYDIS_ENCODER_MAX_OPERANDS):
|
|
if i >= operand_count:
|
|
writer.write_padding(SIZE_OF_ZYDIS_ENCODER_OPERAND)
|
|
continue
|
|
|
|
op = test_case['operands'][i]
|
|
op_type = ZydisOperandType[op['operand_type']]
|
|
writer.write_uint32(op_type - ZydisOperandType.ZYDIS_OPERAND_TYPE_REGISTER.value)
|
|
if op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_REGISTER:
|
|
writer.write_uint32(ZydisRegister[op['reg']['value']])
|
|
writer.write_uint8(int(op['reg']['is4']))
|
|
writer.write_padding(7)
|
|
writer.write_padding(SIZE_OF_ZYDIS_ENCODER_OPERAND - 16)
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_MEMORY:
|
|
writer.write_padding(12)
|
|
writer.write_uint32(ZydisRegister[op['mem']['base']])
|
|
writer.write_uint32(ZydisRegister[op['mem']['index']])
|
|
writer.write_uint8(op['mem']['scale'])
|
|
writer.write_padding(7)
|
|
writer.write_int64(int(op['mem']['displacement']))
|
|
writer.write_uint16(op['mem']['size'])
|
|
writer.write_padding(6)
|
|
writer.write_padding(SIZE_OF_ZYDIS_ENCODER_OPERAND - 48)
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_POINTER:
|
|
writer.write_padding(44)
|
|
writer.write_uint16(op['ptr']['segment'])
|
|
writer.write_padding(2)
|
|
writer.write_uint32(op['ptr']['offset'])
|
|
writer.write_padding(SIZE_OF_ZYDIS_ENCODER_OPERAND - 56)
|
|
elif op_type == ZydisOperandType.ZYDIS_OPERAND_TYPE_IMMEDIATE:
|
|
writer.write_padding(52)
|
|
writer.write_uint64(int(op['imm']['value']))
|
|
else:
|
|
raise RuntimeError('Invalid operand type: ' + op_type.name)
|
|
writer.write_uint32(ZydisBroadcastMode[test_case['evex']['broadcast']])
|
|
writer.write_uint32(ZydisRoundingMode[test_case['evex']['rounding']])
|
|
writer.write_uint8(int(test_case['evex']['sae']))
|
|
writer.write_uint8(int(test_case['evex']['zeroing_mask']))
|
|
writer.write_padding(2)
|
|
writer.write_uint32(ZydisBroadcastMode[test_case['mvex']['broadcast']])
|
|
writer.write_uint32(ZydisConversionMode[test_case['mvex']['conversion']])
|
|
writer.write_uint32(ZydisRoundingMode[test_case['mvex']['rounding']])
|
|
writer.write_uint32(ZydisSwizzleMode[test_case['mvex']['swizzle']])
|
|
writer.write_uint8(int(test_case['mvex']['sae']))
|
|
writer.write_uint8(int(test_case['mvex']['eviction_hint']))
|
|
writer.write_padding(2)
|
|
return writer.get_data()
|
|
|
|
|
|
def convert_re_enc_json_to_crash(test_case_json, from_dict=False):
|
|
if from_dict:
|
|
test_case = test_case_json
|
|
else:
|
|
test_case = json.loads(test_case_json)
|
|
writer = BinaryWriter()
|
|
writer.write_uint32(ZydisMachineMode[test_case['machine_mode']])
|
|
writer.write_uint32(ZydisStackWidth[test_case['stack_width']])
|
|
writer.write_bytes(bytes.fromhex(test_case['payload']))
|
|
return writer.get_data()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description='Provides two-way conversion between crash files and human-readable test cases')
|
|
parser.add_argument('input_type', choices=['enc', 're-enc'])
|
|
parser.add_argument('input_format', choices=['crash', 'json'])
|
|
parser.add_argument('input_file')
|
|
parser.add_argument('output_file', help='Pass "stdout" to print result to stdout')
|
|
parser.add_argument('--zydis-info')
|
|
parser.add_argument('--extract-single', type=int, default=-1)
|
|
parser.add_argument('--append', action='store_true')
|
|
args = parser.parse_args()
|
|
|
|
if args.input_format == 'crash':
|
|
read_mode = 'rb'
|
|
write_mode = 'w'
|
|
else:
|
|
read_mode = 'r'
|
|
write_mode = 'wb'
|
|
with open(args.input_file, read_mode) as f:
|
|
content = f.read()
|
|
|
|
if args.input_format == 'crash':
|
|
if args.input_type == 'enc':
|
|
result = convert_enc_crash_to_json(content)
|
|
else:
|
|
result = convert_re_enc_crash_to_json(content, args.zydis_info)
|
|
else:
|
|
if args.extract_single >= 0:
|
|
content = to_json(json.loads(content)[args.extract_single])
|
|
if args.input_type == 'enc':
|
|
result = convert_enc_json_to_crash(content)
|
|
else:
|
|
result = convert_re_enc_json_to_crash(content)
|
|
|
|
if args.output_file == 'stdout':
|
|
if write_mode == 'wb':
|
|
sys.stdout.buffer.write(bytes(result))
|
|
else:
|
|
print(result)
|
|
else:
|
|
if args.append and write_mode == 'w':
|
|
with open(args.output_file, 'r') as f:
|
|
existing_db = json.loads(f.read())
|
|
existing_db.append(json.loads(result))
|
|
result = to_json(existing_db)
|
|
with open(args.output_file, write_mode) as f:
|
|
f.write(result)
|