[Biopython] SeqRecord and MultipleSeqAlignment pickle-proof (for multiprocessing)?

Jan T Kim jttkim at googlemail.com
Thu Jul 12 19:10:45 UTC 2018


Hi All,

I'll start with the short version of the question: Are instances of the
SeqRecord and MultipleSeqAlignment classes safe to pickle and unpickle,
i.e. suitable for serialisation?

The longer version with some background: I'd like to use pools as provided
by the standard multiprocessing package to run multiple sequence aligners
(currently mafft) in parallel. This package uses serialisation to to send
input objects to worker processes in the pool, and to send results back to
the caller. Looking at the BioPython docs I haven't found any statement
regarding the pickle-safety of the classes mentioned above. Tests
(see code below) indicate that all works as intended, but rather than
assuming that all is good I'd like to find out whether I've overlooked
something that could cause problems. Additionally, I'd also like to be
told if I'm re-inventing something, or overlooking some better / easier /
more elegant way to parallelise multiple alignment computation.

I append the code below mainly for reading, but as an example usage,
saving the code below as "mp" and running

    ./mp -o a.fasta g*.fasta
    
works for me in a directory with suitable g*.fasta files.

Best regards & thanks in advance for comments, Jan

----- 8< --- demo code -------------------------------------------

#!/usr/bin/env python

import sys
import getopt
import os
import time
import random
import copy
import multiprocessing
import subprocess

import Bio
import Bio.Seq
import Bio.SeqRecord
import Bio.SeqIO
import Bio.Align
import Bio.AlignIO

        
class AlignmentGenerator(object):
    
    def __init__(self):
        pass

    def alignSeqs(self, srList):
        gapChar = '-'
        maxLength = 0
        for sr in srList:
            l = len(sr)
            if l > maxLength:
                maxLength = l
        alignedSrList = []
        for sr in srList:
            l = len(sr)
            alignedSrList.append(Bio.SeqRecord.SeqRecord(Bio.Seq.Seq(str(sr.seq) + gapChar * (maxLength - l)), id=sr.id, description=''))
        time.sleep(random.random() + 0.1)
        sys.stderr.write('aligned: pid %d, %s et.al.\n' % (os.getpid(), srList[0].id))
        return Bio.Align.MultipleSeqAlignment(alignedSrList)

    
class MafftAlignmentGenerator(AlignmentGenerator):
    
    def __init__(self):
        super(MafftAlignmentGenerator, self).__init__()
        
    def alignSeqs(self, srList):
        mafftArgv = ['mafft', '--localpair', '--maxiterate', '1000', '--thread', '8', '-']
        sys.stderr.write('%s\n' % ' '.join(mafftArgv))
        mafftProcess = subprocess.Popen(mafftArgv, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        pid = os.fork()
        if pid == 0:
            mafftProcess.stdout.close()
            Bio.SeqIO.write(srList, sys.stderr, 'fasta')
            Bio.SeqIO.write(srList, mafftProcess.stdin, 'fasta')
            mafftProcess.stdin.close()
            os._exit(0)
        mafftProcess.stdin.close()
        mafftAlignment = Bio.AlignIO.read(mafftProcess.stdout, 'fasta')
        mafftProcess.stdout.close()
        wPid, wExit = os.waitpid(pid, 0)
        if pid != wPid:
            raise StandardError, 'wait returned pid %s (expected %d)' % (wPid, pid)
        if wExit != 0:
            raise StandardError, 'wait on forked process returned %d' % wExit
        mafftReturncode = mafftProcess.wait()
        if mafftReturncode != 0:
            raise StandardError, 'process "%s" returned %d' % (' '.join(mafftArgv), mafftReturncode)
        return mafftAlignment

        
def alignWrapper(srList, alignmentGenerator):
    return alignmentGenerator.alignSeqs(srList)


poolSize = 5
outfileName = None
options, args = getopt.getopt(sys.argv[1:], 'p:o:h')
for opt, par in options:
    if opt == '-h':
        print 'options:'
        print '-h: print this help and exit'
        sys.exit()
    elif opt == '-p':
        poolSize = int(par)
    elif opt == '-o':
        outfileName = par
    else:
        raise StandardError, 'unhandled option "%s"' % opt
srListList = []
for infileName in args:
    srListList.append(list(Bio.SeqIO.parse(infileName, 'fasta')))
pool = multiprocessing.Pool(poolSize)
alignmentGenerator = MafftAlignmentGenerator()
# a = alignmentGenerator.alignSeqs(srListList[0])
# Bio.AlignIO.write(a, sys.stderr, 'fasta')
# sys.exit()
resultList = []
for srList in srListList:
    resultList.append(pool.apply_async(alignWrapper, [srList], {'alignmentGenerator': alignmentGenerator}))
pool.close()
alignmentList = []
i = 0
for result in resultList:
    alignmentList.append(result.get())
    sys.stderr.write('got result %d\n' % i)
    i = i + 1
pool.join()
sys.stderr.write('pool joined\n')
if outfileName is None:
    for alignment in alignmentList:
        Bio.AlignIO.write(alignment, sys.stdout, 'fasta')
else:
    with open(outfileName, 'w') as outfile:
        for alignment in alignmentList:
            Bio.AlignIO.write(alignment, outfile, 'fasta')


More information about the Biopython mailing list