[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [coldsync-hackers] Python Conduits?



>Does anyone know of any conduits for coldsync written in Python?  Would
>anyone be interested in a Python api that is similar to the Perl one?
>
>I might try and write one, but I'm making no promises.

I don't have a conduit, but I do have a small python module that you can use
to read and write PDB files. I used it to convert some flash card data from
one flash card program to another. 

#!/usr/bin/env python
#
# $Id: pdb.py,v 1.2 2002/07/26 22:22:03 dawnthorn Exp $
#

import os
import stat
import struct
import sys
import mx.DateTime

class PDBErr(Exception):
	pass

def palm_epoch_from_date_time(date_time):
	palm_epoch = mx.DateTime.DateTime(1904, 1, 1)
	palm_time = date_time - palm_epoch
	return palm_time.seconds


def date_time_from_palm_epoch(palm_time):
	palm_epoch = mx.DateTime.DateTime(1904, 1, 1)
	palm_time = mx.DateTime.RelativeDateTime(seconds = palm_time)
	return palm_epoch + palm_time


def hex_print(data, output = sys.stdout):
	current_pos = 0
	length = len(data)
	while current_pos < length:
		ascii = '|'
		output.write('%08x  ' % (current_pos))
		for i in range(0, 8):
			if (current_pos >= length):
				current_byte = '\x00'
			else:
				current_byte = data[current_pos]
			output.write('%02x ' % (ord(current_byte)))
			if ord(current_byte) > 33 and ord(current_byte) < 126:
				ascii += current_byte
			else:
				ascii += '.'
			current_pos += 1
		output.write(' ')
		for i in range(0, 8):
			if (current_pos >= length):
				current_byte = '\x00'
			else:
				current_byte = data[current_pos]
			output.write('%02x ' % (ord(current_byte)))
			if ord(current_byte) > 33 and ord(current_byte) < 126:
				ascii += current_byte
			else:
				ascii += '.'
			current_pos += 1
		ascii += '|'
		output.write(' %s\n' % ascii)


class Structure:
	def __init__(self):
		self.load_structure()


	def load_structure(self):
		self.struct = '!'
		for member in self.structure:
			if member[1][-1] == 'S':
				self.struct += member[1][:-1] + 's'
			elif member[1][-1] == 'D':
				self.struct += member[1][:-1] + 'I'
			else:
				self.struct += member[1]
		self.struct_len = struct.calcsize(self.struct)


	def pack(self):
		args = [self.struct]
		for member in self.structure:
			args.append(self.__dict__[member[0]])
		packed_data = apply(struct.pack, args)
		return packed_data


	def unpack(self, data):
		unpacked_data = struct.unpack(self.struct, data)
		i = 0
		for member in self.structure:
			if member[1][-1] == 'D':
				value = date_time_from_palm_epoch(unpacked_data[i])
			else:
				value = unpacked_data[i]
			self.__dict__[member[0]] = value
			i += 1


	def unpack_from_file(self, data_file):
		self.data = data_file.read(self.struct_len)
		self.unpack(self.data)


	def pack_to_file(self, data_file):
		packed_data = self.pack()
		self.length = len(packed_data)
		data_file.write(packed_data)



class FileHeader(Structure):
	structure = \
	(
		('name', '32S'),
		('attributes', 'H'),
		('version', 'H'),
		('creation_date', 'D'),
		('modification_date', 'D'),
		('last_backup_date', 'D'),
		('modification_number', 'I'),
		('appinfo_area', 'I'),
		('sortinfo_area', 'I'),
		('database_type', '4s'),
		('creator_id', '4s'),
		('unique_id_seed', 'I'),
		('next_record_list_id', 'I'),
		('number_of_records', 'H')
	)
	attribute_bitmask = \
	(
		(0x0002, 'Read-Only'),
		(0x0004, 'Dirty AppInfo Area'),
		(0x0008, 'Backup this database'),
		(0x0010, 'Okay to install newer over existing copy'),
		(0x0020, 'Reset after installing database'),
		(0x0040, 'Don\'t allow copy of file to be beamed')
	)

	def __init__(self):
		Structure.__init__(self)


	def attribute_list(self):
		attributes = []
		for attribute in self.attribute_bitmask:
			if self.attributes & attribute[0]:
				attributes.append(attribute[1])
		return attributes


class CategoryHeader(Structure):
	structure = \
	(
		('renamed_categories', 'H'),
		('category_label1', '16S'),
		('category_label2', '16S'),
		('category_label3', '16S'),
		('category_label4', '16S'),
		('category_label5', '16S'),
		('category_label6', '16S'),
		('category_label7', '16S'),
		('category_label8', '16S'),
		('category_label9', '16S'),
		('category_label10', '16S'),
		('category_label11', '16S'),
		('category_label12', '16S'),
		('category_label13', '16S'),
		('category_label14', '16S'),
		('category_label15', '16S'),
		('category_label16', '16S'),
		('category_id1', 'c'),
		('category_id2', 'c'),
		('category_id3', 'c'),
		('category_id4', 'c'),
		('category_id5', 'c'),
		('category_id6', 'c'),
		('category_id7', 'c'),
		('category_id8', 'c'),
		('category_id9', 'c'),
		('category_id10', 'c'),
		('category_id11', 'c'),
		('category_id12', 'c'),
		('category_id13', 'c'),
		('category_id14', 'c'),
		('category_id15', 'c'),
		('category_id16', 'c'),
		('last_id', 'c'),
		('padding', 'c'),
	)
	def unpack(self, data):
		Structure.unpack(self, data)
		self.categories = []
		for i in range(1, 17):
			category = self.__dict__['category_label%d' % (i)]
			category = category[:category.find('\000')]
			self.categories.append(category)
			
			


class RecordHeader(Structure):
	structure = \
	(
		('offset', 'I'),
		('unique_id', 'I')
	)

	def __init__(self):
		Structure.__init__(self)


	def unpack(self, data):
		Structure.unpack(self, data)
		self.attributes = (0xFF000000 & self.unique_id) >> 24
		self.unique_id = 0x00FFFFFF & self.unique_id
		self.category = self.attributes & 0x0f



class Record:
	def __init__(self, record_header):
		self.data = None
		self.record_header = record_header
		self.length = self.record_header.record_len


	def __getattr__(self, name):
		if name == 'length':
			return self.record_header.record_len
		raise AttributeError, "%s instance has no attribute '%s'" % (self.__class__, name)


	def __setattr__(self, name, value):
		if name == 'length':
			self.record_header.record_len = value
		self.__dict__[name] = value
		

	def load_from_file(self, data_file):
		self.data = data_file.read(self.length)


	def pack_to_file(self, data_file):
		data_file.write(self.data)


	def hex_print(self, output = sys.stdout):
		hex_print(self.data, output)




class PDBFile:
	def __init__(self, file_name, has_categories = 0):
		self.file_name = file_name
		self.sortinfo_len = 0
		self.appinfo_len = 0
		self.sortinfo_data = None
		self.appinfo_data = None
		self.has_categories = has_categories


	def init(self):
		self.record_headers = []
		self.records = []
		self.categories = [[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]]
		self.file_header = FileHeader()


	def load(self):
		self.init()
		self.file_size = os.stat(self.file_name)[stat.ST_SIZE]
		self.pdb_file = open(self.file_name, 'r')
		self.file_header.unpack_from_file(self.pdb_file)
		self.load_record_headers()
		self.load_info_area()
		self.load_categories()
		self.load_records()


	def save_header(self):
		self.pdb_file = open(self.file_name, 'w')
		self.file_header.pack_to_file(self.pdb_file)


	def add_record(self, data):
		record_header = RecordHeader()
		record_header.record_len = len(data)
		record = Record(record_header)
		record.data = data
		self.record_headers.append(record_header)
		self.records.append(record)


	def save(self):
		num_records = len(self.records)
		self.file_header.number_of_records = num_records
		self.save_header()
		offset = self.file_header.length
		record_entries_len = num_records * self.record_headers[0].struct_len
		offset += record_entries_len
		for record in self.records:
			record.record_header.offset = offset
			record.record_header.attributes = 0
			record.record_header.unique_id = 0
			offset += len(record.data)
		for record_header in self.record_headers:
			record_header.pack_to_file(self.pdb_file)
		for record in self.records:
			record.pack_to_file(self.pdb_file)
		self.pdb_file.close()

	


	def load_info_area(self):
		if self.file_header.number_of_records > 0:
			sortinfo_end = self.record_headers[0].offset
		else:
			sortinfo_end = self.file_size
		if self.file_header.sortinfo_area > 0:
			self.sortinfo_len = sortinfo_end - self.file_header.sortinfo_area
			appinfo_end = self.file_header.sortinfo_area
		else:
			self.sortinfo_len = 0
			appinfo_end = sortinfo_end
		if self.file_header.appinfo_area > 0:
			self.appinfo_len = appinfo_end - self.file_header.appinfo_area
		if self.appinfo_len > 0:
			self.pdb_file.seek(self.file_header.appinfo_area)
			self.appinfo_data = self.pdb_file.read(self.appinfo_len)
		if self.sortinfo_len > 0:
			self.pdb_file.seek(self.file_header.sortinfo_area)
			self.sortinfo_data = self.pdb_file.read(self.sortinfo_len)


	def load_categories(self):
		if not self.has_categories:
			return
		if self.appinfo_data is None:
			self.has_categories = 0
			return
		self.category_header = CategoryHeader()
		if len(self.appinfo_data) < self.category_header.struct_len:
			self.has_categories = 0
			return
		self.category_header.unpack(self.appinfo_data[:self.category_header.struct_len])


	def load_record_headers(self):
		self.record_headers = []
		last_record_header = None
		for record_num in range(0, self.file_header.number_of_records):
			record_header = RecordHeader()
			self.record_headers.append(record_header)
			record_header.unpack_from_file(self.pdb_file)
			if last_record_header is not None:
				last_record_header.record_len = record_header.offset - last_record_header.offset
			last_record_header = record_header
		if last_record_header is not None:
			last_record_header.record_len = self.file_size - last_record_header.offset


	def load_records(self):
		self.records = []
		if len(self.record_headers) == 0:
			return
		record_start = self.record_headers[0].offset
		self.pdb_file.seek(record_start)
		for record_header in self.record_headers:
			record = Record(record_header)
			record.load_from_file(self.pdb_file)
			self.records.append(record)
			self.categories[record_header.category].append(record)


def count_bits(num):
	count = 0
	mask = 0x00000001
	for i in range(0, 32):
		if (mask & num):
			count += 1
		mask = mask << 1


def print_bits(num):
	string = ''
	mask = 0x00000001
	for i in range(0, 32):
		if (mask & num):
			string = '1' + string
		else:
			string = '0' + string
		mask = mask << 1
	return string


def _test():
	pdb_file = PDBFile(sys.argv[1], 1)
	pdb_file.load()
	print pdb_file.file_header.struct_len
	hex_print(pdb_file.file_header.data)
	sys.stdout.write('\n')
	hex_print(pdb_file.appinfo_data)
	i = 1
	print pdb_file.category_header.categories
	for category in pdb_file.categories:
		print i, len(category)
		i += 1
#	for record in pdb_file.records:
#		sys.stdout.write('\n')
#		header = record.record_header
#		print header.offset, header.attributes, header.unique_id, header.category
#		record.hex_print()
	

if __name__ == '__main__':
	_test()