21 окт. 2008 г.

Сохранение в SQLAlchemy под контролем

Большинство задач, для которых используется SQLAlchemy — это веб-приложения, отличающиеся небольшим количеством действий, выполняемых на один запрос, короткими транзакциями. И для этих целей типовая схема работы, расписанная в документации, подходит очень хорошо. Но работа с базами данных нужна не только в веб-приложениях, и даже в веб-приложениях иногда есть отдельные процессы с более сложными операциями.
Типовая схемы работы предполагает накопление некоторого количества изменений и вызов метода flush() у сессии, который сохраняет все изменения в базе. А теперь представьте, что будет, если в ходе работы на одной из итераций мы получаем исключение, мы это исключение обрабатываем и продолжаем работу? Вполне резонно, что часть ("ошибочных") накопленных изменений должна пропасть, то есть не попасть в базу. Но ведь метод flush() предполагает сохранения именно всех изменений. Конечно, мы можем очистить сессию и произвести инициализацию заново — достаточно неудобно, да и зачем снова загружать данные, которые не могли измениться? Кто-то резонно заметит, что в метод flush() можно передать список объектов для сохранения. Да, это именно то, что нужно. Только следует понимать, что в этом случае сохраняться будут только эти объекты, но не объекты, которые от них зависят, то есть cascade rules перестают работать. В итоге мы не можем использовать autoflush=True и должны самостоятельно отслеживать каскадные правила при сохранении. Аналогично не стоит использовать transactional=True, так как в этом случае транзакция открывается сразу же после закрытия предыдущей, и при длительной работе без commit()-ов могут возникать значительные замедления в работе базы данных.
Ещё одна неприятная особенность есть у SessionTransaction. Используя другие библиотеки для работы с базами данных я привык, что можно определить метод с некоторой транзакцией, а затем вызывать его из другого метода, в котором к исходным действиям добавляются ещё какие-то, и всё это, конечно, в одной общей транзакции. Но дело в том, что сессионные транзакции в SQLAlchemy не могут быть вложенными. На самом деле всё гораздо хуже, они могут быть вложенными, но результат будет отличным от ожидаемого: транзакция будет закрыта уже при при вызове commit() внутренней транзакции. Проблема решается использованием объекта транзакции для соединения, который работает как нужно.
Подытожу всё сказанное в классе Storage (недостающие методы не представляют сложности в реализации):
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


class Storage(object):

    def __init__(self, dbURL):
        self._engine = create_engine(dbURL)
        self._conn = self._engine.connect()
        self._session = sessionmaker(bind=self._conn, autoflush=False,
                                     transactional=False)()

    def transaction(self):
        return self._conn.begin()

    def store(self, obj):
        with self.transaction():
            self._session.save_or_update(obj)
            from sqlalchemy.orm.session import _cascade_iterator
            cascaded = [o for o, m in _cascade_iterator('save-update', obj)]
            self._session.flush([obj]+cascaded)

25 сент. 2008 г.

Сохранение времени в базе данных

Очень часто бывает, что практически все знают, как надо делать правильно, но при этом всё равно постоянно делают неправильно. Один из таких случаев — сохрание времени в базе данных. Понятно, что на персональном блоге вполне можно обойтись наивных подходом, не учитывающим перевод времени. Но для круглосуточно работающих приложений строгой системы отчётности вроде биллинга это неприемлемо.
Я не буду здесь рассматривать все возможные варианты корректной работы со временем. Покажу лишь насколько просто можно реализовать самый распространённый вариант — хранение в базе в UTC — на примере SQLAlchemy. Для этого достаточно определить новый тип колонки:
from sqlalchemy import types
from dateutil.tz import tzutc
from datetime import datetime

class UTCDateTime(types.TypeDecorator):

    impl = types.DateTime

    def process_bind_param(self, value, engine):
        if value is not None:
            return value.astimezone(tzutc()).replace(tzinfo=None)

    def process_result_value(self, value, engine):
        if value is not None:
            return datetime(value.year, value.month, value.day,
                            value.hour, value.minute, value.second,
                            value.microsecond, tzinfo=tzutc())
Теперь вы можете сохранять время с произвольной зоной, все преобразования будут сделаны автоматически. Но сохранить время без зоны не получится — метод astimezone() выбросит исключение ValueError, что позволит избежать случайных ошибок.

22 сент. 2008 г.

Логгинг в базу или борьба с рекурсией

Как-то понадобилось мне сделать логгинг в базу для модуля logging. Сразу видна очевидная проблема: внутри такого обработчика идёт сохранения в базу некоторыми принятыми в проекте средствами, которые сами используют logging. В результате обработчик рекусивно будет вызывать себя и зацикливаться. Понятно, что можно отбросить привычные средства и сделать либо логгинг своими средствами без использования стандартного пакета logging, либо в базу писать низкоуровневым кодом, который logging не использует. Но это всё не интересно и ведёт за собой неудобства в использовании.
Первая мысль была выставлять флаг в обработчике перед записью в базу и снимать его после. Но это без ухищрений не будет работать в многопоточном приложении, а ведь есть ещё и обработчики сигналов. А нельзя ли определить, что обработчик был вызван из самого себя? Оказывается, можно — с помощью средств работы со стеком интерпретатора в модуле inspect. Достаточно убедиться, чтобы текущего метода не было в стеке вызовов. Сам фрейм не подходит для сравнения, так как он создаётся новый на каждый вызов, но можно сравнивать объекты кода. Первая версия на базе inspect.stack() оказалась достаточно медленной, а ведь запись в лог — это то, что используется постоянно. Дело в том, что эта функция подготавливает много лишней информации, которая нам не нужна. Зато проход по стеку "вручную" оказался достаточно быстрым, примерно на 2 порядка быстрее варианта с inspect.stack():
def isRecursive():
   '''Returns whether it's recursive call of caller function.'''
   frame = inspect.currentframe().f_back
   try:
       code = frame.f_code
       while True:
           frame = frame.f_back
           if frame is None:
               break
           if frame.f_code is code:
               return True
       return False
   finally:
       del frame
Сам обработчик достаточно прост (здесь storage — произвольное хранилище, в моём случае оно сделано на базе SQLAlchemy):
class DBServiceLogHandler(logging.Handler):

   def __init__(self, storage):
       logging.Handler.__init__(self)
       self._storage = storage

   def emit(self, record):
       # We can't check this in filter() method, since recursive call is from
       # emit, not filter.
       if isRecursive():
           return
       traceback = None
       if record.exc_info:
           # It's used by logging to cache
           if not record.exc_text:
               record.exc_text = self.formatException(record.exc_info)
           traceback = record.exc_text
       entry = LogEntry(name=record.name, level=record.levelno,
                        message=record.getMessage(), traceback=traceback)
       try:
           self._storage.store(entry)
       except self._storage.Error:
           logger.exception('Error logging to DB (%s):', record.getMessage())
       self._storage.clear()

7 авг. 2008 г.

Всемогущий ssh

Думаю, сложно найти веб-разработичка, который бы не использовал ssh в своей работе. Но далеко не каждый знает о его возможностях. Когда-то очень давно я испытал ребяческий восторг впервые установив прямой канал между двумя компьютерами в несвязанных друг с другом private network. Сейчас уже стало привычным попросить человека в заNATченной сетке запустить на пару минут команду вида
ssh -R publicport:privatehost:22 user@publichost
и уже самому с privatehost запустить аналогичную команду, но уже через autossh. А тут вчера мой коллега из дальнего зарубежья вдруг просит дать ему попробовать кое-какие действия по HTTP с российских IP. Поднимать локальный прокси-сервер мне не хотелось, поэтому первая же моя мысль была, а не поможет ли мне здесь ssh? Но одно дело перенаправить соединение на определенный порт определенной машины, а другое дело дать возможность устанавливать соединение куда угодно. Надо отдать должное, и в этот раз нашлась нужная опция. Всего две команды
ssh -R remotesshport:localhost:22 me@remotehost
и уже оттуда
ssh -D '*:proxyport' -p remotesshport me@localhost
и SOCKS прокси настроен.

19 июн. 2008 г.

Борьба с обNULLением в SQLAlchemy

SQLAlchemy, пожалуй, самый продвинутый ORM для питона. Но, к сожалению, он постоянно подбрасывает неприятные сюрпризы. В очередной раз натолкнувшись на одну из проблем и потратив время на повторный поиск её решения, я решил его задокументировать. Речь об установки в NULL поля с идентификатором при удалении объекта, на который он ссылается, если в маппере для связи используется relation. Для тех, кто привык работать с SQL, такое поведение по умолчанию в лучшем случае вызывает недоумение. Фактически оно означает использование на уровне кода по умолчанию правила ON DELETE SET NULL, вместо привычного (и логичного!) ON DELETE RESTRICT. Если бы не моя чрезмерная педантичность в проставлении nullable=False для полей с FOREIGN KEY, этот сюрприз мог бы привести к весьма печальным последствиям - потери данных. Упоминание об описанном поведении в документации к SQLAlchemy встречается только один раз - при описании ключа passive_deletes функции relation(). Собственно его установка в 'all' и решает проблему. Так как имя ключа ничего не говорит о его истинном назначении, то соответствующий комментарий явно не помешает.

5 июн. 2008 г.

Использование copy в mercurial для разбиения модуля

Продолжаю смотреть на поведение mercurial при слиянии изменений. Тест второй, copy + merge. Простое дублирование одинаковой информации вряд ли когда-либо понадобится, поэтому я решил рассмотреть ситуацию, когда часть содержимого черезчур разросшегося модуля выносится в отдельный модуль.
Для начала создаём репозиторий trunk с одним модулем module.py:
def func1():
    return 'initial'

def func2():
    return 'initial'
и клонируем trunk в branch. Теперь изменяем функции в trunk:
def func1():
    return 'changed'

def func2():
    return 'changed'
В branch мы выносим функцию func2 в модуль submodule.py. Но для того, чтобы mercurial "знал", откуда появилась новая функция в submodule.py, мы выполняем команду hg copy module.py submodule.py и удаляем из первого func1, а из второго func2.
Остаётся только перелить изменения из trunk и сделать merge. Однако теперь процесс проходит не так гладко: изменения обоих функций успешно попадают куда нужно, но изменения второй отсутствующей функции mercurial для обоих подмодулей считает конфликтом, который и предлагает разрешить вручную. Впрочем, в данном случае всё разрешение конфликта сводится к удалению ненужной функции. К сожалению, такую операцию, как перенос части кода из одного файла в другой существующий, mercurial никак не позволяет отработать.

Обработка rename и merge в mercurial

Иван Сагалаев написал про проблемы слияния веток репозитория при использовании subversion. Это навело меня на мысль проверить, как с такими вещами в используемом мной mercurial? По ходу обнаружилось, что на моей локальной машине до сих пор установленная версия 0.8, в том время как все вкусности появились только начиная с 1.0. Чтож, хороший повод обновиться.
Итак, тест первый.
mkdir trunk
cd trunk
hg init
Создаю в нём модуль old_name.py с одной функцией:
def old_func():
  return 'initial'
Клонирую репозиторий
cd ..
hg clone trunk branch
и переименовываю модуль в новой ветке
cd branch
hg rename old_name.py new_name.py
hg ci -m 'old_name.py -> new_name.py' old_name.py new_name.py
Меняю функцию в old_name.py в trunk и добавляю новую, в результате получаю такой код:
def old_func():
   return 'changed'

def new_func():
   pass
Комичу изменения и пробую слить trunk с branch:
hg ci -m 'old_func is changed; new_func is added' old_name.py
cd ../branch
hg pull ../trunk
hg merge
Всё проходит замечательно и все изменения old_name.py в trunk успешно попадают в new_name.py в branch.
Ради интереса, я повторил тест со старой версией. При слиянии mercurial сообщает о том, что был изменён удалённый файл и предлагает либо добавить его, либо удалить. То есть ни о каком слиении изменений речи не идёт.