Files
xinzhu.yin c157e774e5 1.1.0版本
2026-04-16 16:51:05 +08:00

61 lines
2.7 KiB
Python

import os
import warnings
import shutil
from ctypes import sizeof
from datetime import datetime
from UniTAP.dev.modules.capturer.result_object import ResultObject
from UniTAP.dev.modules.capturer.types import CapturedDataType
from UniTAP.dev.ports.modules.capturer.bulk.private_bulk_types import BulkHeader
class ResultBulkObject(ResultObject):
"""
Class `ResultBulkObject` inherited from class `ResultObject`.
Class `ResultBulkObject` allows saving captured data to file `save_to_bin_file`.
Also has all the `ResultObject` functionality.
"""
def __init__(self, assume_scrambler_disabled: bool = False):
super().__init__()
self.assume_scrambler_disabled = assume_scrambler_disabled
def save_to_bin_file(self, directory_name: str):
"""
Saving captured bulk data to file.
Args:
directory_name (str) - path to save
"""
if len(self.buffer) > 0:
if not os.path.exists(directory_name):
os.makedirs(directory_name)
else:
shutil.rmtree(directory_name)
os.makedirs(directory_name)
main_link_file = f'capture_{datetime.now().strftime("%Y%m%d_%H%M%S")}.mainlink.bin'
events_file = f'capture_{datetime.now().strftime("%Y%m%d_%H%M%S")}.events.bin'
for data in self.buffer:
if data.type == CapturedDataType.Event:
e_file = open(os.path.join(directory_name, events_file), 'a+b')
sync = 0x0B41550B
e_file.write(sync.to_bytes(length=4, byteorder='little'))
e_file.write(0x0.to_bytes(length=4, byteorder='little'))
e_file.write(int(data.data[13] & 0xF).to_bytes(length=4, byteorder='little'))
e_file.write(int((len(data.data) - 12) / 4).to_bytes(length=4, byteorder='little'))
e_file.write(data.data[12:])
elif data.type == CapturedDataType.Bulk:
b_file = open(os.path.join(directory_name, main_link_file), 'a+b')
if len(data.data) > sizeof(BulkHeader):
header = BulkHeader.from_buffer(data.data)
if self.assume_scrambler_disabled:
header.Attribute.Packing |= 0x2
b_file.write(bytearray(header))
b_file.write(data.data[sizeof(BulkHeader):])
else:
warnings.warn("Buffer size is equal 0, without bulk data.")
def __str__(self):
return f"Start capture time: {self.start_capture_time}\n" \
f"End capture time: {self.end_capture_time}\n" \
f"Timestamp: {self.timestamp.__str__()}\n"