[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [coldsync-hackers] Python Conduits?
- To: coldsync-hackers at lusars dot net
- Subject: Re: [coldsync-hackers] Python Conduits?
- From: Peter Haight <peterh at sapros dot com>
- Date: Tue, 20 Aug 2002 14:54:59 -0700
- cc: "Dale P. Smith" <dsmith at altustech dot com>
- Reply-To: coldsync-hackers at lusars dot net
- Sender: owner-coldsync-hackers at lusars dot net
>Does anyone know of any conduits for coldsync written in Python? Would
>anyone be interested in a Python api that is similar to the Perl one?
>
>I might try and write one, but I'm making no promises.
I don't have a conduit, but I do have a small python module that you can use
to read and write PDB files. I used it to convert some flash card data from
one flash card program to another.
#!/usr/bin/env python
#
# $Id: pdb.py,v 1.2 2002/07/26 22:22:03 dawnthorn Exp $
#
import os
import stat
import struct
import sys
import mx.DateTime
class PDBErr(Exception):
pass
def palm_epoch_from_date_time(date_time):
palm_epoch = mx.DateTime.DateTime(1904, 1, 1)
palm_time = date_time - palm_epoch
return palm_time.seconds
def date_time_from_palm_epoch(palm_time):
palm_epoch = mx.DateTime.DateTime(1904, 1, 1)
palm_time = mx.DateTime.RelativeDateTime(seconds = palm_time)
return palm_epoch + palm_time
def hex_print(data, output = sys.stdout):
current_pos = 0
length = len(data)
while current_pos < length:
ascii = '|'
output.write('%08x ' % (current_pos))
for i in range(0, 8):
if (current_pos >= length):
current_byte = '\x00'
else:
current_byte = data[current_pos]
output.write('%02x ' % (ord(current_byte)))
if ord(current_byte) > 33 and ord(current_byte) < 126:
ascii += current_byte
else:
ascii += '.'
current_pos += 1
output.write(' ')
for i in range(0, 8):
if (current_pos >= length):
current_byte = '\x00'
else:
current_byte = data[current_pos]
output.write('%02x ' % (ord(current_byte)))
if ord(current_byte) > 33 and ord(current_byte) < 126:
ascii += current_byte
else:
ascii += '.'
current_pos += 1
ascii += '|'
output.write(' %s\n' % ascii)
class Structure:
def __init__(self):
self.load_structure()
def load_structure(self):
self.struct = '!'
for member in self.structure:
if member[1][-1] == 'S':
self.struct += member[1][:-1] + 's'
elif member[1][-1] == 'D':
self.struct += member[1][:-1] + 'I'
else:
self.struct += member[1]
self.struct_len = struct.calcsize(self.struct)
def pack(self):
args = [self.struct]
for member in self.structure:
args.append(self.__dict__[member[0]])
packed_data = apply(struct.pack, args)
return packed_data
def unpack(self, data):
unpacked_data = struct.unpack(self.struct, data)
i = 0
for member in self.structure:
if member[1][-1] == 'D':
value = date_time_from_palm_epoch(unpacked_data[i])
else:
value = unpacked_data[i]
self.__dict__[member[0]] = value
i += 1
def unpack_from_file(self, data_file):
self.data = data_file.read(self.struct_len)
self.unpack(self.data)
def pack_to_file(self, data_file):
packed_data = self.pack()
self.length = len(packed_data)
data_file.write(packed_data)
class FileHeader(Structure):
structure = \
(
('name', '32S'),
('attributes', 'H'),
('version', 'H'),
('creation_date', 'D'),
('modification_date', 'D'),
('last_backup_date', 'D'),
('modification_number', 'I'),
('appinfo_area', 'I'),
('sortinfo_area', 'I'),
('database_type', '4s'),
('creator_id', '4s'),
('unique_id_seed', 'I'),
('next_record_list_id', 'I'),
('number_of_records', 'H')
)
attribute_bitmask = \
(
(0x0002, 'Read-Only'),
(0x0004, 'Dirty AppInfo Area'),
(0x0008, 'Backup this database'),
(0x0010, 'Okay to install newer over existing copy'),
(0x0020, 'Reset after installing database'),
(0x0040, 'Don\'t allow copy of file to be beamed')
)
def __init__(self):
Structure.__init__(self)
def attribute_list(self):
attributes = []
for attribute in self.attribute_bitmask:
if self.attributes & attribute[0]:
attributes.append(attribute[1])
return attributes
class CategoryHeader(Structure):
structure = \
(
('renamed_categories', 'H'),
('category_label1', '16S'),
('category_label2', '16S'),
('category_label3', '16S'),
('category_label4', '16S'),
('category_label5', '16S'),
('category_label6', '16S'),
('category_label7', '16S'),
('category_label8', '16S'),
('category_label9', '16S'),
('category_label10', '16S'),
('category_label11', '16S'),
('category_label12', '16S'),
('category_label13', '16S'),
('category_label14', '16S'),
('category_label15', '16S'),
('category_label16', '16S'),
('category_id1', 'c'),
('category_id2', 'c'),
('category_id3', 'c'),
('category_id4', 'c'),
('category_id5', 'c'),
('category_id6', 'c'),
('category_id7', 'c'),
('category_id8', 'c'),
('category_id9', 'c'),
('category_id10', 'c'),
('category_id11', 'c'),
('category_id12', 'c'),
('category_id13', 'c'),
('category_id14', 'c'),
('category_id15', 'c'),
('category_id16', 'c'),
('last_id', 'c'),
('padding', 'c'),
)
def unpack(self, data):
Structure.unpack(self, data)
self.categories = []
for i in range(1, 17):
category = self.__dict__['category_label%d' % (i)]
category = category[:category.find('\000')]
self.categories.append(category)
class RecordHeader(Structure):
structure = \
(
('offset', 'I'),
('unique_id', 'I')
)
def __init__(self):
Structure.__init__(self)
def unpack(self, data):
Structure.unpack(self, data)
self.attributes = (0xFF000000 & self.unique_id) >> 24
self.unique_id = 0x00FFFFFF & self.unique_id
self.category = self.attributes & 0x0f
class Record:
def __init__(self, record_header):
self.data = None
self.record_header = record_header
self.length = self.record_header.record_len
def __getattr__(self, name):
if name == 'length':
return self.record_header.record_len
raise AttributeError, "%s instance has no attribute '%s'" % (self.__class__, name)
def __setattr__(self, name, value):
if name == 'length':
self.record_header.record_len = value
self.__dict__[name] = value
def load_from_file(self, data_file):
self.data = data_file.read(self.length)
def pack_to_file(self, data_file):
data_file.write(self.data)
def hex_print(self, output = sys.stdout):
hex_print(self.data, output)
class PDBFile:
def __init__(self, file_name, has_categories = 0):
self.file_name = file_name
self.sortinfo_len = 0
self.appinfo_len = 0
self.sortinfo_data = None
self.appinfo_data = None
self.has_categories = has_categories
def init(self):
self.record_headers = []
self.records = []
self.categories = [[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]]
self.file_header = FileHeader()
def load(self):
self.init()
self.file_size = os.stat(self.file_name)[stat.ST_SIZE]
self.pdb_file = open(self.file_name, 'r')
self.file_header.unpack_from_file(self.pdb_file)
self.load_record_headers()
self.load_info_area()
self.load_categories()
self.load_records()
def save_header(self):
self.pdb_file = open(self.file_name, 'w')
self.file_header.pack_to_file(self.pdb_file)
def add_record(self, data):
record_header = RecordHeader()
record_header.record_len = len(data)
record = Record(record_header)
record.data = data
self.record_headers.append(record_header)
self.records.append(record)
def save(self):
num_records = len(self.records)
self.file_header.number_of_records = num_records
self.save_header()
offset = self.file_header.length
record_entries_len = num_records * self.record_headers[0].struct_len
offset += record_entries_len
for record in self.records:
record.record_header.offset = offset
record.record_header.attributes = 0
record.record_header.unique_id = 0
offset += len(record.data)
for record_header in self.record_headers:
record_header.pack_to_file(self.pdb_file)
for record in self.records:
record.pack_to_file(self.pdb_file)
self.pdb_file.close()
def load_info_area(self):
if self.file_header.number_of_records > 0:
sortinfo_end = self.record_headers[0].offset
else:
sortinfo_end = self.file_size
if self.file_header.sortinfo_area > 0:
self.sortinfo_len = sortinfo_end - self.file_header.sortinfo_area
appinfo_end = self.file_header.sortinfo_area
else:
self.sortinfo_len = 0
appinfo_end = sortinfo_end
if self.file_header.appinfo_area > 0:
self.appinfo_len = appinfo_end - self.file_header.appinfo_area
if self.appinfo_len > 0:
self.pdb_file.seek(self.file_header.appinfo_area)
self.appinfo_data = self.pdb_file.read(self.appinfo_len)
if self.sortinfo_len > 0:
self.pdb_file.seek(self.file_header.sortinfo_area)
self.sortinfo_data = self.pdb_file.read(self.sortinfo_len)
def load_categories(self):
if not self.has_categories:
return
if self.appinfo_data is None:
self.has_categories = 0
return
self.category_header = CategoryHeader()
if len(self.appinfo_data) < self.category_header.struct_len:
self.has_categories = 0
return
self.category_header.unpack(self.appinfo_data[:self.category_header.struct_len])
def load_record_headers(self):
self.record_headers = []
last_record_header = None
for record_num in range(0, self.file_header.number_of_records):
record_header = RecordHeader()
self.record_headers.append(record_header)
record_header.unpack_from_file(self.pdb_file)
if last_record_header is not None:
last_record_header.record_len = record_header.offset - last_record_header.offset
last_record_header = record_header
if last_record_header is not None:
last_record_header.record_len = self.file_size - last_record_header.offset
def load_records(self):
self.records = []
if len(self.record_headers) == 0:
return
record_start = self.record_headers[0].offset
self.pdb_file.seek(record_start)
for record_header in self.record_headers:
record = Record(record_header)
record.load_from_file(self.pdb_file)
self.records.append(record)
self.categories[record_header.category].append(record)
def count_bits(num):
count = 0
mask = 0x00000001
for i in range(0, 32):
if (mask & num):
count += 1
mask = mask << 1
def print_bits(num):
string = ''
mask = 0x00000001
for i in range(0, 32):
if (mask & num):
string = '1' + string
else:
string = '0' + string
mask = mask << 1
return string
def _test():
pdb_file = PDBFile(sys.argv[1], 1)
pdb_file.load()
print pdb_file.file_header.struct_len
hex_print(pdb_file.file_header.data)
sys.stdout.write('\n')
hex_print(pdb_file.appinfo_data)
i = 1
print pdb_file.category_header.categories
for category in pdb_file.categories:
print i, len(category)
i += 1
# for record in pdb_file.records:
# sys.stdout.write('\n')
# header = record.record_header
# print header.offset, header.attributes, header.unique_id, header.category
# record.hex_print()
if __name__ == '__main__':
_test()