[Biopython-dev] hmmpfam parser

Brad Chapman chapmanb at uga.edu
Thu Feb 12 18:52:32 EST 2004


Hi Wagied;

> Hopefully this meets the guidelines.
[...New version of the parser deleted...]

I've had a good hard look at this and done a number of fix-ups and
things to try and make things more understandable to myself, easier
to maintain, and more conformant to the normal "Biopython way" of
doing things.

I've attached a new version to this mail which cleans up a number of
things:

1. Iterator and parser are separate entities, and now behave like
standard Biopython iterators and parsers.

2. Cleaned up functions -- some functions (specifically parse()) ran
over multiple pages with heavy indenting. This is really hard to
follow and maintain -- I split things into separate internal
functions.

3. Got rid of lots of use of constants, for standard things like
newlines and other things. Again, these make the code harder to
follow.

4. Removed some of the unnecessary variables which look like they
are left over from coding the parser.

5. Wrapped lines to 80 characters or less.

This leaves things much better and gives a better idea of where the
parser is at. If you like these changes and want to keep working on
it, I'd suggest that a couple of things are still missing which
could use coding.

1. The domains and families can be extracted as XML, but not
accessed through a class. The HmmpfamRecord class really needs to
have family_scores and parsed_domains be lists of objects which have
all of the elements (model, description, e-value...) as attributes
of these classes. An excellent example of how this is done is the
BLAST Record class in Bio/Blast/Record.py, which is also documented
at:

http://biopython.org/docs/tutorial/Tutorial004.html#toc10

2. There needs to be some similar way to access the alignments, so
that they are also parsed into classes.

I think things are really coming along well -- let me know what you
think about expanding the Record class to include the families,
domains and alignments and we can get this finished and all checked
in. Please also let me know if I messed any of your code during my
work on it.

Hope this helps -- glad it's coming along!
Brad
-------------- next part --------------
####################################################################### */
# COPYRIGHT INFORMATION
# Pfam DOMAIN RESULTS PARSER
# @AUTHOR: Wagied Davids
# @DATE: 22.01.2004
# @COPYRIGHT: Wagied Davids, 2004
####################################################################### */

import sys
import string
import re
import time
from types import FileType

class HmmpfamRecord:
    '''
    Prototype class Entry structure
    @author: Wagied Davids
    @date: 22.01.2004
    @copyright: Wagied Davids, 2004
    '''
    # STATIC DATA
    FAMILY_CLASSIFICATION_HEADER = 'Scores for sequence family ' \
     'classification (score includes all domains):\nModel           ' \
     'Description                             Score    E-value  N\n' \
     '--------        -----------                             ' \
     '-----    ------- ---'
    PARSED_DOMAIN_HEADER = 'Parsed for domains:\nModel           ' \
     'Domain  seq-f seq-t    hmm-f hmm-t      score  E-value\n' \
     '--------        ------- ----- -----    ----- -----      -----  -------'
    NO_HITS= '[no hits above thresholds]'

    # STATIC REGEX OBJECTS
    REGEX_FAMILY_SCORES= re.compile(
            r'((\S.*?)\s+(\S.*?)\s+((-| )\S.*?)\s+(\S.*?)\s+(\d+))',
            re.MULTILINE | re.DOTALL )

    def __init__( self, query, accession= None, description= None,
            family_scores= [], parsed_domains= [], alignments= [] ):
        '''
        Constructor for Pfam Entry structure
        @param ( query, accession= None, description= None, 
        family_scores= [], parsed_domains= [], alignments= [] )
        @return (None)
        '''
        self.query= query
        self.accession= accession
        self.description= description
        self.family_scores= family_scores
        # FAMILY SCORES HITLIST FOR SCORE ENTRIES
        self.family_scores_hitlist= []
        self.parsed_domains= parsed_domains
        self.alignments= alignments
    
    def __str__( self ):
        '''
        Retrieves a string representation of parser entry class
        @param (None)
        @return (String:  representation of HmmpfamRecord class)
        '''
        strBuffer= '' 
        strBuffer= strBuffer + "<HMMER>\n" 
        strBuffer= strBuffer + "\t<QUERY>%s</QUERY>\n" % (self.get_query())
        strBuffer= strBuffer + "\t<ACCESSION>%s</ACCESSION>\n" \
                % (self.get_accession())
        strBuffer= strBuffer + "\t<DESCRIPTION>%s</DESCRIPTION>\n" \
                % (self.get_description())
        strBuffer= strBuffer + "\t%s" % (self.get_family_scores_ml())
        strBuffer= strBuffer + "\t%s" % (self.get_parsed_domains_ml())
        strBuffer= strBuffer + "\t<ALIGNMENTS>%s</ALIGNMENTS>\n" \
                % (self.get_alignments())
        strBuffer= strBuffer + "</HMMER>" 
        
        return strBuffer
      
    def get_query( self ):
        '''
        Retrieves the QUERY
        @param (None)
        @return (String: QUERY )
        '''
        return self.query

    def get_accession( self ):
        '''
        Retrieves the ACCESSION
        @param (None)
        @return (String: ACCESSION)
        '''
        return self.accession

    def get_description( self ):
        '''
        Retrieves the DESCRIPTION
        @param (None)
        @return (String: DESCRIPTION)
        '''
        return self.description

    def get_family_scores_raw(self):
        '''
        Retrieves a list of FAMILY SCORES
        @param (None)
        @return (List: FAMILY SCORES)
        '''
        return self.family_scores

    def get_no_of_family_entries(self):
        '''
        Retrieves the number of hits per query
        @param (None)
        @return (Integer: number of hits per query)
        '''
        return len(self.family_scores)

    def get_family_scores_ml( self ):
        '''
        FINE-GRAINED CONTROL OVER FAMILY CLASSIFICATION AND SCORE RESULTS
        @param (None)
        @return (String: Marked-Up text format of Family Classificatio Scores)
        '''
        # BEGIN FAMILY_SCORE_LIST TAG
        family_scores= "<FAMILY_SCORES_HITLIST>\n" 
        family_model= '' 
        family_description= '' 
        family_score_value= '' 
        family_e_value= '' 
        family_n_value= '' 
        family_scores_counter= 1 
        
        for score_entry in self.get_family_scores_raw():
            MatchScoreEntry= HmmpfamRecord.REGEX_FAMILY_SCORES.search(score_entry)
            if MatchScoreEntry != None:
                # BEGIN FAMILY_SCORE_HIT TAG
                family_scores= family_scores + \
                        "\t\t<FAMILY_SCORE_HIT= %d>\n" \
                        % ( family_scores_counter )
                # EXTRACT INFORMATION FROM MATCH_SCORE_ENTRY
                # MatchScoreEntry.group( 1 ) equals WHOLE ENTRY
                family_model= MatchScoreEntry.group( 2 )
                family_description= MatchScoreEntry.group( 3 )
                family_score_value= MatchScoreEntry.group( 4 )
                # MatchScoreEntry.group( 5 ) equals '-' IF PRESENT
                family_e_value= MatchScoreEntry.group( 6 )
                family_n_value= MatchScoreEntry.group( 7 )

                for data, tag in [(family_model, "FAMILY_SCORE_MODEL"),
                     (family_description, "FAMILY_DESCRIPTION"),
                     (family_score_value, "FAMILY_SCORE_VALUE"),
                     (family_e_value, "FAMILY_E_VALUE"),
                     (family_n_value, "FAMILY_N_VALUE")]:
                    family_scores += "\t\t\t<%s>%s</%s>\n" % \
                            (tag, data, tag)

                # COMPLETE FAMILY_SCORE_HIT TAG
                family_scores= family_scores + "\t\t</FAMILY_SCORE_HIT>\n" 

                # INCREMENT family_scores_counter
                family_scores_counter= family_scores_counter + 1 

        # COMPLETE FAMILY_SCORE_LIST TAG
        family_scores= family_scores + "\t</FAMILY_SCORES_HITLIST>\n" 
        return family_scores
    
    def get_parsed_domains_raw(self):
        '''
        Retrieves a list of PARSED DOMAINS
        @param (None)
        @return (List: PARSED DOMAINS)
        '''
        return self.parsed_domains

    def get_no_of_parsed_domains( self ):
        '''
        Retrieves the number of parsed hits per query
        @param (None)
        @return (Integer: number of parsed hits per query)
        '''
        return len( self.parsed_domains )

    def get_parsed_domains_ml( self ):
        '''
        FINE-GRAINED CONTROL OVER PARSED DOMAINS AND SCORE RESULTS
        @param (None)
        @return (String: Marked-Up text format of Parsed Domain section)
        '''
        parsed_domain_list= []
        parsed_model= '' 
        parsed_domain_number= '' 
        parsed_domain_seq_f= '' 
        parsed_domain_seq_t= '' 
        parsed_domain_hmm_f= '' 
        parsed_domain_hmm_t= '' 
        parsed_domain_2_dots= '' 
        parsed_domain_brackets= '' 
        parsed_domain_score= '' 
        parsed_domain_e_value= '' 
        parsed_domains_counter= 1 


        # BEGIN PARSED_DOMAINS_LIST TAG
        parsed_domains= '<PARSED_DOMAINS_HITLIST>\n' 
    
        for domain in self.get_parsed_domains_raw():
            # IF NO_HITS NOT FOUND, THEN EXTRACT DATA
            if string.find( domain, HmmpfamRecord.NO_HITS ) < 0:
                parsed_domain_list= string.split( domain )
                parsed_model= parsed_domain_list[0]
                parsed_domain_number= parsed_domain_list[1]
                parsed_domain_seq_f= parsed_domain_list[2]
                parsed_domain_seq_t= parsed_domain_list[3]
                #parsed_domain_2_dots= parsed_domain_list[4]            
                parsed_domain_hmm_f= parsed_domain_list[5]
                parsed_domain_hmm_t= parsed_domain_list[6]
                #parsed_domain_brackets= parsed_domain_list[7] 
                parsed_domain_score= parsed_domain_list[8]
                parsed_domain_e_value= parsed_domain_list[9]

                # BEGIN PARSED_DOMAIN_HIT TAG
                parsed_domains= parsed_domains + \
                  "\t\t<PARSED_DOMAIN_HIT= %d>\n" % (parsed_domains_counter)

                # FORMAT ENTRY TAGS
                for data, tag in [(parsed_model, "PARSED_MODEL"),
                            (parsed_domain_number, "PARSED_DOMAIN_NUMBER"),
                            (parsed_domain_seq_f, "PARSED_DOMAIN_SEQ_F"),
                            (parsed_domain_seq_t, "PARSED_DOMAIN_SEQ_T"),
                            (parsed_domain_hmm_f, "PARSED_DOMAIN_HMM_F"),
                            (parsed_domain_hmm_t, "PARSED_DOMAIN_HMM_T"),
                            (parsed_domain_score, "PARSED_DOMAIN_SCORE"),
                            (parsed_domain_e_value, "PARSED_DOMAIN_E_VALUE")]:
                    parsed_domains += "\t\t\t<%s>%s</%s>\n" % \
                            (tag, data, tag)

                # COMPLETE PARSED_DOMAIN_HIT TAG
                parsed_domains= parsed_domains + "\t\t</PARSED_DOMAIN_HIT>\n" 

                # INCREMENT parsed_domains_counter
                parsed_domains_counter= parsed_domains_counter + 1 

            else:
                # NO_HITS FOUND
                return domain

        # COMPLETE PARSED_DOMAINS_LIST TAG
        parsed_domains= parsed_domains + '</PARSED_DOMAINS_HITLIST>\n' 
        return parsed_domains
        
    def get_alignments( self ):
        '''
        Retrieves a list of TOP SCORING ALIGNMENTS
        @param (None)
        @return (List: TOP SCORING ALIGNMENTS)
        '''
        return self.alignments

    def get_regex_family_scores( self ):
        '''
        Retrieves the Regex object for Pfam family scores
        @param (None)
        @return (Regex: Regex object for Pfam family scores)
        '''
        return HmmpfamRecord.REGEX_FAMILY_SCORES

class Iterator:
    """Iterate over a hmmpfam result file one record at a time.
    """
    def __init__(self, handle, parser = None):
        """Initalize with a handle to the hmmpfam output and optional parser.
        """
        if type(handle) is not FileType and type(handle) is not InstanceType:
            raise ValueError, "I expected a file handle or file-like object"
        self._handle = handle
        self._parser = parser

    def __iter__(self):
        return iter(self.next, None)

    def next(self):
        """Return the next hmmpfam output record, parsed if appropriate.
        """
        lines = []
        while 1:
            line= self._handle.readline()
            if not line:
                break
            # Pfam ENTRY DETECTED
            if line.find('Query sequence:') == 0:
                lines.append(line.rstrip())
                while 1:
                    line= self._handle.readline()
                    lines.append(line.rstrip())
                    if not line:
                        break 
                    if line.find("//") == 0:
                        break
        
        if len(lines) == 0: # nothing left
            return None
        else:
            if self._parser:
                data = "\n".join(lines)
                return self._parser.parse(data)
            else:
                return "\n".join(lines)

class RecordParser:
    '''
    Prototype class for parsing hmmpfam output
    @author: Wagied Davids
    @date: 22.01.2004
    @copyright: Wagied Davids, 2004
    '''
    # STATIC REGEX OBJECTS
    REGEX_HMM_ENTRY= re.compile( r'(Query sequence:\s+\S.*\s+//)',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_QUERY= re.compile( r'Query sequence:\s+(\S.*?)\s+Accession',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_ACC= re.compile( r'Accession:\s+(\S.*?)\s+Description',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_DESCRIPTION= re.compile( r'Description:\s+(\S.*?)\s+Scores',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_SEQ_FAMILY_SCORES= re.compile( r'(Scores\s+\S.*)\s+Parsed',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_PARSED_DOMAINS= re.compile(
            r'(Parsed for domains:\s+\S.*)\s+Alignments',
            re.MULTILINE | re.DOTALL )
    REGEX_HMM_ALIGNMENTS= re.compile(
            r'(Alignments of top-scoring domains:\s+\S.*)\s+//',
            re.MULTILINE | re.DOTALL )

    def __init__(self):
        '''
        Constructor for RecordParser
        @param (Filename)
        @return (None)
        '''
        self.debug= 0 
     
    def set_debug( self, debug= 0 ):
        '''
        Sets the debug level when parsing
        debug= 0 No debug information
        debug= 1 Pfam Entry level debug information  
        debug= 2 Regex level debug information
        debug= 3 Incoming data 
        @param (Integer representing the verbosity/ debug level)
        @return (None)
        '''
        self.debug= debug

    def _print_debug(self, level, info):
        """Simple class to print out debug info if it matches a given level.
        """
        if level == self.debug:
            sys.stdout.write(info + "\n")

    def parse(self, data_entry):
        """Initialize with a single hmmpfam record to parse.

        Returns the record parsed into an HmmpfamRecord class.
        """
        if self.debug == 3:
            print data_entry
        
        # MATCH ENTRY STRUCTURE
        match_hmm_entry= RecordParser.REGEX_HMM_ENTRY.search(data_entry)
        if self.debug == 2:
            print "%s: %s" % (match_hmm_entry, match_hmm_entry.re.pattern)
        if match_hmm_entry is not None:
            entry = match_hmm_entry.group(1)
            query, accession, description = self._parse_query_info(entry)
            family_scores_list = self._parse_family_scores(entry)
            parsed_domains_list = self._parse_domains(entry)
            domain_alignments_list = self._parse_alignments(entry)

            # Construct Pfam Entry structure
            record = HmmpfamRecord(query, accession, description,
                    family_scores_list, parsed_domains_list,
                    domain_alignments_list )

            if self.debug == 1:
                print "%s => %s" % ( record.get_query(), record.get_description() )
                print record.get_family_scores_ml()
                print record.get_parsed_domains_ml()

        return record

    def _parse_query_info(self, entry):
        """Retrieve the query name, accession and description.
        """
        hmm_query, hmm_accession, hmm_description = ('', '', '')
        # MATCH QUERY SEQUENCE
        match_hmm_query= RecordParser.REGEX_HMM_QUERY.search(entry)
        if self.debug == 2:
            print "%s: %s" % ( match_hmm_query, match_hmm_query.re.pattern )
        if match_hmm_query is not None:
            hmm_query= match_hmm_query.group(1)

        # MATCH ACCESSION
        match_hmm_accession= RecordParser.REGEX_HMM_ACC.search( entry ) 
        if self.debug == 2:
            print "%s: %s" % ( match_hmm_accession, match_hmm_accession.re.pattern ) 
        if match_hmm_accession is not None:
            hmm_accession= match_hmm_accession.group( 1 )

        # MATCH DESCRIPTION
        match_hmm_description= RecordParser.REGEX_HMM_DESCRIPTION.search( entry )
        if self.debug == 2:
            print "%s: %s" % ( match_hmm_description, match_hmm_description.re.pattern )
        if match_hmm_description is not None:
            hmm_description= match_hmm_description.group(1)

        return hmm_query, hmm_accession, hmm_description

    def _parse_family_scores(self, entry):
        """Retrieve the family scores from the hmmpfam search.
        """
        match_hmm_scores= RecordParser.REGEX_HMM_SEQ_FAMILY_SCORES.search(entry)
        if self.debug == 2:
            print "%s: %s" % (match_hmm_scores, match_hmm_scores.re.pattern)
        
        family_scores_list = []
        if match_hmm_scores != None:
            hmm_scores= match_hmm_scores.group( 1 )
            family_scores_info_list= string.split(
                    hmm_scores, "\n")
            
            # NOTE: LAST ELEMENT = EMPTY SPACE
            family_scores_list = family_scores_info_list[ 3: -1 ]
        return family_scores_list

    def _parse_domains(self, entry):
        """Parse domain information from the hmmpfam output.
        """
        match_hmm_parsed_domains= RecordParser.REGEX_HMM_PARSED_DOMAINS.search( entry )

        if self.debug == 2:
            print "%s: %s" % ( match_hmm_parsed_domains, match_hmm_parsed_domains.re.pattern )
           
        parsed_domains_list = []
        if match_hmm_parsed_domains != None:
            hmm_domains= match_hmm_parsed_domains.group(1)
            parsed_domains_info_list= string.split(hmm_domains, "\n")

            # NOTE: LAST ELEMENT = EMPTY SPACE
            parsed_domains_list= parsed_domains_info_list[3: -1]
        return parsed_domains_list

    def _parse_alignments(self, entry):
        """Parse out alignment information from the hmmpfam output.
        """
        match_hmm_alignments= RecordParser.REGEX_HMM_ALIGNMENTS.search(  entry )
        if self.debug == 2:
            print "%s: %s" % (match_hmm_alignments, match_hmm_alignments.re.pattern)
            
        if match_hmm_alignments is not None:
            hmm_aligments= match_hmm_alignments.group(1)
            damain_aligments_info_list= string.split(hmm_aligments, "\n")
            domain_alignments_list= damain_aligments_info_list[3:-2]

        return domain_alignments_list

    def get_regex_hmm_entry( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_ENTRY
        @param (None)
        @return (Regex: HMM_ENTRY)
        '''
        return RecordParser.REGEX_HMM_ENTRY

    def get_regex_query( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_QUERY
        @param (None)
        @return (Regex: REGEX_HMM_QUERY)
        '''
        return RecordParser.REGEX_HMM_QUERY
    
    def get_regex_accession(self):
        '''
        Retrieves the Regex object for REGEX_HMM_ACC
        @param (None)
        @return (Regex: REGEX_HMM_ACC)
        '''
        return RecordParser.REGEX_HMM_ACC
        
    def get_regex_description( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_DESCRIPTION
        @param (None)
        @return (Regex: REGEX_HMM_DESCRIPTION)
        '''
        return RecordParser.REGEX_HMM_DESCRIPTION

    def get_regex_family_scores( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_SEQ_FAMILY_SCORES
        @param (None)
        @return (Regex: REGEX_HMM_SEQ_FAMILY_SCORES)
        '''
        return RecordParser.REGEX_HMM_SEQ_FAMILY_SCORES

    def get_regex_parsed_domains( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_DOMAINS
        @param (None)
        @return (Regex: REGEX_HMM_DOMAINS)
        '''
        return RecordParser.REGEX_HMM_PARSED_DOMAINS

    def get_regex_alignments( self ):
        '''
        Retrieves the Regex object for REGEX_HMM_ALIGNMENTS
        @param (None)
        @return (Regex: REGEX_HMM_ALIGNMENTS)
        '''
        return RecordParser.REGEX_HMM_ALIGNMENTS
    
    def __str__( self ):
        '''
        Retrieves a string representation of parser class
        @param (None)
        @return (String: Retrieves a string representation of parser class)
        '''
        strBuffer= 'ParserType: RecordParser' 
        return strBuffer
        
# __END__
-------------- next part --------------
#!/usr/bin/env python

###################################################################### */
# COPYRIGHT INFORMATION
# Test program for Pfam domain results parser
# @AUTHOR: Wagied Davids
# @DATE: 22.01.2004
# @COPYRIGHT: Wagied Davids, 2004
###################################################################### */

import Hmmpfam

# Module level re-name

# DATA LOCATION
filename= 'hmmpfam_output.example'
handle = open(filename, "r")

# INSTANTIATE Parser with debugging info
parser= Hmmpfam.RecordParser()
# parser.set_debug(1)

iterator = Hmmpfam.Iterator(handle, parser)
for rec in iter(iterator):
    print "--> %s : %s : %s" % (rec.query, rec.accession, rec.description)
    print rec.get_family_scores_ml()
    print rec.get_parsed_domains_ml()


More information about the Biopython-dev mailing list