Logo Search packages:      
Sourcecode: qm version File versions

web.py

########################################################################
#
# File:   web.py
# Author: Alex Samuel
# Date:   2001-04-09
#
# Contents:
#   Common code for QMTest web user interface.
#
# Copyright (c) 2001, 2002, 2003 by CodeSourcery, LLC.  All rights reserved. 
#
# For license terms see the file COPYING.
#
########################################################################

########################################################################
# imports
########################################################################

import os
import qm
import qm.attachment
import qm.common
from   qm.extension import *
import qm.fields
import qm.label
import qm.test.base
import qm.test.cmdline
from   qm.test.context import *
from   qm.test.database import *
from   qm.test.execution_thread import *
from   qm.test.result import *
from   qm.test.result_stream import *
from   qm.test.suite import *
import qm.web
import string
import StringIO
import sys
import time

########################################################################
# classes
########################################################################

00045 class DefaultDtmlPage(qm.web.DtmlPage):
    """Subclass of DTML page class for QMTest pages."""

    html_generator = "QMTest"

00050     def __init__(self, dtml_template, **attributes):
        """Construct a new 'QMTestPage'.

        'dtml_template' -- The file name of the DTML template, relative
        to the DTML directory."""
        
        # In the build tree, tool-specific DTML pages are in a different
        # location.
        if (not qm.common.is_installed
            and os.path.dirname(dtml_template) == "test"):
            dtml_template \
                = os.path.join("..", "..", "qm", "test", "share",
                               "dtml", os.path.basename(dtml_template))

        apply(qm.web.DtmlPage.__init__,
              (self, dtml_template),
              attributes)


00069     def GetName(self):
        """Return the name of the application."""

        return self.html_generator


    def MakeListingUrl(self):
        return qm.web.WebRequest("dir", base=self.request).AsUrl()


00079     def GetMainPageUrl(self):
        return self.MakeListingUrl()



00084 class QMTestPage(DefaultDtmlPage):
    """A 'QMTestPage' is a 'DtmlPage' for pages generated by QMTest.

    A 'QMTestPage' automatically looks for DTML templates in the
    directory that contains QMTest DTML templates."""

00090     def __init__(self, dtml_template, server):
        """Construct a new 'QMTestPage'.

        'dtml_template' -- The file name of the DTML template, relative
        to the directory that contains QMTest DTML templates.  (Usually,
        this is just a basename.)

        'server' -- The 'QMTestServer' creating this page."""
        
        # Set up the menus first; the attributes might override them.
        if server.GetDatabase().IsModifiable():
            self.file_menu_items = [
            ('New Test', "new-test"),
            ('New Suite', "new-suite"),
            ('New Resource', "new-resource"),
            ]
        else:
            self.file_menu_items = []
        self.file_menu_items.extend([
            ('Load Results', "javascript:load_results();"),
            ('Save Results', qm.test.cmdline.QMTest.results_file_name),
            ('Load Expectations', "javascript:load_expected_results();"),
            ('Save Expectations',
             qm.test.cmdline.QMTest.expectations_file_name),
            ('Load Context', "javascript:load_context();"),
            ('Save Context', qm.test.cmdline.QMTest.context_file_name),
            ('Exit', 'shutdown')
            ])
        self.edit_menu_items = [
            ('Clear Results', "clear-results"),
            ('Edit Context', "edit-context"),
            ]
        self.view_menu_items = [
            ('Directory', "/test/dir"),
            ('Results', "show-results")
            ]
        self.run_menu_items = [
            ('All Tests', "run-tests")
            ]
        self.help_menu_items = [
            ('Manual', "javascript:popup_manual();"),
            ('QMTest Web Site', "http://www.qmtest.com")
            ]

        # Initialize the base class.
        DefaultDtmlPage.__init__(self, os.path.join("test", dtml_template))
        # Remember the server.
        self.server = server
        # Make the QMTest object available to the DTML pages.
        self.qmtest = qm.test.cmdline.get_qmtest()
        
        
00142     def GetServer(self):
        """Returns the 'QMTestServer' serving this page.

        returns -- The 'QMTestServer' serving this page."""

        return self.server


00150     def GetDatabase(self):
        """Returns the 'Database' in use.

        returns -- The 'Database' in use."""

        return self.GetServer().GetDatabase()

    
00158     def FormatId(self, id, type, style="basic"):
        """Format 'id' as HTML.

        'id' -- The name of a test or resource.

        'type' -- The kind of item named by 'id'.  Either 'resource',
        'suite', or 'test'.

        'style' -- The formatting style to use.  One of 'plain',
        'basic', 'navigation', or 'tree'.

        returns -- A string containing HTML to use for 'id'."""
        
        script = "show-" + type
        request = qm.web.WebRequest(script, base=self.request, id=id)
        url = request.AsUrl()
        parent_suite_id, name = self.GetDatabase().SplitLabel(id)
        
        if style == "plain":
            return '<span class="id">%s</span>' % id

        elif style == "basic":
            return '<a href="%s"><span class="id">%s</span></a>' % (url, id)

        elif style == "navigation":
            if parent_suite_id == "":
                parent = ""
            else:
                parent = self.FormatId(parent_suite_id, "dir", style)
                parent += id[len(parent_suite_id)]
            return parent \
                   + '<a href="%s"><span class="id">%s</span></a>' \
                   % (url, name)

        elif style == "tree":
            return '<a href="%s"><span class="id">%s</span></a>' \
                   % (url, name)

        assert None


00199     def GenerateStartBody(self, decorations=1):
        if decorations:
            # If the server is in the midst of executing tests, it
            # is not safe to edit tests, or to rerun the tests.
            if not self.GetServer().GetResultsStream().IsFinished():
                # The basic edit menu items are OK.
                edit_menu_items = self.edit_menu_items[0:2]
                # The run model should have no options.
                run_menu_items = [
                    ('Stop Tests', "stop-tests")
                    ]
            # Otherwise, just use the values specified.
            else:
                edit_menu_items = self.edit_menu_items
                run_menu_items = self.run_menu_items

            # Figure out whether to use click-to-activate menus.
            click_menus = 0
            if qm.common.rc.has_option("common", "click_menus"):
                try:
                    click_menus = qm.common.rc.getboolean("common",
                                                          "click_menus")
                except ValueError:
                    pass
                
            # Generate the navigation bar.
            navigation_bar = \
              DefaultDtmlPage(os.path.join("test", "navigation-bar.dtml"),
                              file_menu_items=self.file_menu_items,
                              edit_menu_items=edit_menu_items,
                              view_menu_items=self.view_menu_items,
                              run_menu_items=run_menu_items,
                              help_menu_items=self.help_menu_items,
                              click_menus = click_menus)
            return "<body>%s<br />" % navigation_bar(self.request)
        else:
            return "<body>"



00239     def IsFinished(self):
        """Returns true if tests are still running.

        returns -- True if the data on this page should be considered
        incomplete due to the fact that tests are still running."""

        return 1


00248     def GetRefreshDelay(self):
        """Returns the number of seconds to wait before refreshing the page.

        returns -- The number of seconds to wait before refreshing this
        page.  A value of zero means that te page should never be
        refreshed.  This function is only called if 'IsFinished' returns
        true."""

        return 0
    
        
00259     def GenerateHtmlHeader(self, description, headers=""):
        """Return the header for an HTML document.

        'description' -- A string describing this page.

        'headers' -- Any additional HTML headers to place in the
        '<head>' section of the HTML document."""

        # If the page isn't finished, automatically refresh it
        # every few seconds.y
        if not self.IsFinished():
            headers = (headers
                       + ('<meta http-equiv="refresh" content="%d" />'
                          % self.GetRefreshDelay()))

        return DefaultDtmlPage.GenerateHtmlHeader(self, description,
                                                  headers)

    
        
00279 class ContextPage(QMTestPage):
    """DTML page for setting the context."""

00282     def __init__(self, server):
        """Construct a new 'ContextPage'.

        'server' -- The 'QMTestServer' creating this page."""

        QMTestPage.__init__(self, "context.dtml", server)
        
        self.context = server.GetContext()
        
    

00293 class DirPage(QMTestPage):
    """A test database directory page.

    These attributes are available in DTML:

    'path' -- The label directory that is being displayed.

    'subdirs' -- A sequence of labels giving the subdirectories of
    this directory.
    
    'test_ids' -- A sequence of labels giving the tests in this
    directory.
    
    'suite_ids' -- A sequence of labels giving the suites in this
    directory.

    'resource_ids' -- A sequence of labels giving the resources in
    this directory."""
    
    SORT_NAME = 'name'
    """Sort by name."""

    SORT_OUTCOME = 'outcome'
    """Sort by outcome."""

    SORT_EXPECTATION = 'expectation'
    """Sort by expectation.  In other words, put unexpected outcomes
    before expected outcomes."""
    
    SORT_KINDS = [ SORT_NAME, SORT_OUTCOME, SORT_EXPECTATION ]
    """The kinds of sorting available."""

    NEGATIVE_UNEXPECTED = Result.FAIL
    """A test's result was unfavorably unexpected."""
    
    POSITIVE_UNEXPECTED = Result.PASS
    """A test's result was favorably unexpected."""
    
    EXPECTED = "EXPECTED"
    """A test's result was as expected."""

    EXPECTATION_KINDS \
        = [ NEGATIVE_UNEXPECTED, EXPECTED, POSITIVE_UNEXPECTED ]
    """The kinds of expectations."""
    
00338     class _TestInformation:
        """A 'TestInfo' stores information about a single test."""

00341         def __init__(self, id, outcome, expectation):
            """Construct a new 'TestInformation'.

            'id' -- The name of the test.

            'outcome' -- A string giving the test outcome, or 'None'
            if the test has not been run.

            'expectation' -- A string giving the expected outcome of the
            test, or 'None' if there is no expected outcome."""

            self.id = id
            self.outcome = outcome
            self.expectation = expectation

        
00357     def __init__(self, server, path):
        """Construct a 'DirPage'.

        'server' -- The 'QMTestServer' creating this page.

        'path' -- The label directory to display."""
        
        # Initialize the base class.
        QMTestPage.__init__(self, "dir.dtml", server)

        self.path = path
        database = server.GetDatabase()
        self.subdir_ids = database.GetSubdirectories(path)
        self.subdir_ids = map(lambda l: database.JoinLabels(path, l),
                              self.subdir_ids)
        self.test_ids = database.GetTestIds(path, scan_subdirs=0)
        self.suite_ids = database.GetSuiteIds(path, scan_subdirs=0)
        # Do not show implicit suites.  Otherwise, there are two
        # entries for a directory: one as a subdirectory entry, and
        # the other as a test suite.
        self.suite_ids = filter(lambda s, d=database: \
                                    not d.GetSuite(s).IsImplicit(),
                                self.suite_ids)
        self.resource_ids = database.GetResourceIds(path, scan_subdirs=0)

        # Get the results to date.
        results_stream = server.GetResultsStream()
        # It is important that we ask for IsFinished before asking
        # for GetTestResults.  The stream could be finished between
        # the two calls, and it is better to show all the results but
        # claim they are incomplete than to show only some of the
        # results and claim they are complete.
        self.__is_finished = results_stream.IsFinished()
        self.test_results = results_stream.GetTestResults()
        self.expected_outcomes = server.GetExpectedOutcomes()

        # Make it easy for the DTML page to get at all the outcomes.
        self.outcomes = Result.outcomes + [self.EXPECTED]
        
        # Provide a menu choice to allow running all of the tests in
        # this directory.
        self.run_menu_items.append(("This Directory", "javascript:run_dir();"))


    def GetExpectationUrl(self, id, expectation):
        """Return the URL for setting the expectation associated with 'id'.

        'id' -- The name of a test.

        'expectation' -- The current expectation associated with the
        test, or 'None' if there is no associated expectation."""
        
        return qm.web.WebRequest("set-expectation",
                                 base=self.request,
                                 id=id,
                                 expectation=expectation or "None",
                                 url=self.request.AsUrl()).AsUrl()

    
    def GetRunUrl(self):
        """Return the URL for running this directory."""

        return qm.web.WebRequest("run-tests",
                                 base=self.request,
                                 ids=self.path) \
               .AsUrl()


    def GetTestResultsForDirectory(self, directory):
        """Return all of the test results for tests in 'directory'.

        'directory' -- A string giving the label for a directory.

        returns -- A sequence of 'Result' instances corresponding to
        results for tests from the indicated directory."""

        # If the directory is the root, just return all the results.
        if directory == "":
            return self.test_results.values()
        
        return filter(lambda r: self.__IsLabelInDirectory(r.GetId(),
                                                          directory),
                      self.test_results.values())
                      

    def GetResultsByOutcome(self, results):
        """Compute the tests in 'results' with each outcome.

        'results' -- A sequence of 'Result' instances.

        returns -- A dictionary mapping outcomes to the sequence of
        tests that have the indicated outcome in 'results'."""

        results_by_outcome = {}
        # At first, there are no results for any outcome.
        for o in self.outcomes:
            results_by_outcome[o] = []

        # Iterate through the results, adding each one to
        # 'results_by_outcome'.
        for r in results:
            results_by_outcome[r.GetOutcome()].append(r)

        return results_by_outcome


    def GetOutcomePercentages(self, results):
        """Compute the percentage (by outcome) of the 'results'.

        'results' -- A sequence of 'Result' instances.
        
        returns -- A dictionary mapping outcomes to the percentage (as
        a floating point number) of tests in 'results' that have
        that outcome."""

        # Compute the total number of tests for which results are
        # available.
        total = len(results)

        # Get the test results, organized by outcome.
        results = self.GetResultsByOutcome(results)

        # Compute the percentages.
        percentages = {}
        for o in self.outcomes:
            if total:
                percentages[o] = float(len(results[o])) / float(total)
            else:
                percentages[o] = 0.0

        return percentages
    

    def GetUnexpectedResultsByOutcome(self, results):
        """Compute the tests in 'results' with each outcome.

        'results' -- A sequence of 'Result' instances.

        returns -- A dictionary mapping outcomes to the results with
        that outcome -- and for which that outcome is unexpected.
        The (fake) outcome 'self.EXPECTED' is mapped to expected
        results."""

        results_by_outcome = {}
        # At first, there are no results for any outcome.
        for o in self.outcomes:
            results_by_outcome[o] = []

        for r in results:
            # See what outcome was expected.
            expectation = self.GetExpectation(r.GetId()) or Result.PASS
            # Update results_by_outcome.
            if r.GetOutcome() != expectation:
                results_by_outcome[r.GetOutcome()].append(r)
            else:
                results_by_outcome[self.EXPECTED].append(r)

        return results_by_outcome


    def GetUnexpectedOutcomePercentages(self, results):
        """Compute percentages of unexpected 'results'.

        'results' -- A sequence of 'Result' instances.
        
        returns -- A dictionary mapping the 'EXPECTATION_KINDS' to the
        percentage (as a floating point number) of tests in 'results'
        that have that expectation."""
        
        # Compute the total number of tests for which results are
        # available.
        total = len(results)

        # Get the test results, organized by outcome.
        results_by_outcome \
            = self.GetUnexpectedResultsByOutcome(results)
        
        # Compute the absolute number of tests in each category.
        percentages = {}
        percentages[self.POSITIVE_UNEXPECTED] \
            = len(results_by_outcome[Result.PASS]) 
        percentages[self.NEGATIVE_UNEXPECTED] \
            = (len(results_by_outcome[Result.FAIL]) 
               + len(results_by_outcome[Result.ERROR])
               + len(results_by_outcome[Result.UNTESTED]))
        percentages[self.EXPECTED] \
            = len(results_by_outcome[self.EXPECTED])

        # And the corresponding percentages.
        for e in self.EXPECTATION_KINDS:
            if percentages[e]:
                percentages[e] = float(percentages[e]) / float(total)
            else:
                percentages[e] = 0.0
                
        return percentages


    def CountUnexpected(self, results):
        """Count the unexpected 'results'.

        'results' -- A dictionary of the form returned by
        'GetUnexpectedResultsByOutcome'.

        returns -- The total number of unexpected results."""

        total = 0
        # Go through all the outcomes except 'EXPECTED'.
        for o in Result.outcomes:
            total += len(results[o])

        return total


    def GetTests(self, sort):
        """Return information about all of the tests.

        'sort' -- One of the 'SORT_KINDS' indicating how the results
        should be sorted.
        
        returns -- A sequence of '_TestInformation' instances
        corresponding to all of the tests in this diretory."""

        # There is no information yet.
        tests = []
        
        # Iterate through each of the tests.
        for id in self.test_ids:
            outcome = self.GetTestOutcome(id)
            expectation = self.GetExpectation(id)
            tests.append(self._TestInformation(id, outcome, expectation))

        if sort == self.SORT_NAME:
            # The tests are already sorted by name.
            pass
        elif sort == self.SORT_OUTCOME:
            # Sort the test by outcome; interesting outcomes come first.
            buckets = {}
            for o in Result.outcomes + [None]:
                buckets[o] = []
                
            # Go through the tests dropping each in the right bucket.
            for t in tests:
                buckets[t.outcome].append(t)
                
            # Combine the buckets.
            tests = []
            for o in Result.outcomes + [None]:
                tests += buckets[o]
        elif sort == self.SORT_EXPECTATION:
            # Sort the test by expectations; unexpected outcomes come
            # first.
            buckets = {}
            for o in ['UNEXPECTED', self.EXPECTED, None]:
                buckets[o] = []
                
            # Go through the tests dropping each in the right bucket.
            for t in tests:
                if (t.outcome == (t.expectation or Result.PASS)):
                    buckets[self.EXPECTED].append(t)
                elif t.outcome:
                    buckets['UNEXPECTED'].append(t)
                else:
                    buckets[None].append(t)
                
            # Combine the buckets.
            tests = []
            for o in ['UNEXPECTED', self.EXPECTED, None]:
                tests += buckets[o]
        else:
            # Ignore the sort request.  (We cannot assert that this case
            # never happens because users can type any URL they like
            # into their web browser.)
            pass

        return tests

    
    def GetTestOutcome(self, test_id):
        """Return the 'Result' for 'test_id'.

        'test_id' -- The name of the test whose result is requested.

        'result' -- The result associated with the 'test_id', or
        'None' if no result is available."""

        result = self.test_results.get(test_id)
        return result and result.GetOutcome()


    def GetDetailURL(self, test_id):
        """Return the detail URL for 'test_id'.

        'test_id' -- The name of the test.

        returns -- The URL that contains details about the 'test_id'."""

        return qm.web.WebRequest("show-result",
                                 base=self.request,
                                 id=test_id).AsUrl()


    def GetExpectation(self, test_id):
        """Return the expected outcome for 'test_id'.

        'test_id' -- The name of the test.
                
        returns -- A string giving the expected outcome for 'test_id',
        or 'None' if there is no expectation."""

        return self.expected_outcomes.get(test_id)


    def GetSortURL(self, sort):
        """Get the URL for this page, but sorted as indicated.

        'sort' -- One of the 'SORT_KINDS'.

        returns -- A URL indicating this page, but sorted as
        indicated."""

        return qm.web.WebRequest("show-dir",
                                 base=self.request,
                                 id=self.path,
                                 sort=sort).AsUrl()
    
    def IsFinished(self):
        """Returns true if tests are still running.

        returns -- True if the data on this page should be considered
        incomplete due to the fact that tests are still running."""

        return self.__is_finished


    def GetRefreshDelay(self):
        """Returns the number of seconds to wait before refreshing the page.

        returns -- The number of seconds to wait before refreshing this
        page.  A value of zero means that te page should never be
        refreshed.  This function is only called if 'IsFinished' returns
        true."""

        if len(self.test_results.items()) < 50:
            return 10
        else:
            return 30


    def __IsLabelInDirectory(self, id, directory):
        """Returns true if 'id' is in 'directory'.

        returns -- True if 'id' indicates a test contained in
        'directory', or one of its subdirectories."""

        while len(id) >= len(directory):
            if id == directory:
                return 1
            id = self.GetDatabase().SplitLabel(id)[0]
            
        return 0


    
00721 class LoadContextPage(QMTestPage):
    """DTML page for uploading a context."""

    title = "Load Context"
    """The title for the page."""
    
    heading = "Load the context from a file."
    """The heading printed across the top of the page."""
    
    prompt = "The file from which to load the context."
    """The prompt for the file name."""
    
    submit_url = "submit-context-file"
    """The URL to which the file should be submitted."""
    
00736     def __init__(self, server):
        """Construct a new 'LoadContextPage'.

        'server' -- The 'QMTestServer' creating this page."""

        QMTestPage.__init__(self, "load.dtml", server)

        
        
00745 class LoadExpectationsPage(QMTestPage):
    """DTML page for uploading a context."""

    title = "Load Expectations"
    """The title for the page."""
    
    heading = "Load expectations from a file."
    """The heading printed across the top of the page."""
    
    prompt = "The file from which to load expectations."""
    """The prompt for the file name."""
    
    submit_url = "submit-expectations"
    """The URL to which the file should be submitted."""
    
00760     def __init__(self, server):
        """Construct a new 'LoadExpectationsPage'.

        'server' -- The 'QMTestServer' creating this page."""

        QMTestPage.__init__(self, "load.dtml", server)


        
00769 class LoadResultsPage(QMTestPage):
    """DTML page for uploading a context."""

    title = "Load Results"
    """The title for the page."""
    
    heading = "Load results from a file."
    """The heading printed across the top of the page."""
    
    prompt = "The file from which to load the results."""
    """The prompt for the file name."""
    
    submit_url = "submit-results"
    """The URL to which the file should be submitted."""
    
00784     def __init__(self, server):
        """Construct a new 'LoadContextPage'.

        'server' -- The 'QMTestServer' creating this page."""

        QMTestPage.__init__(self, "load.dtml", server)



00793 class NewItemPage(QMTestPage):
    """Page for creating a new test or resource."""

00796     def __init__(self,
                 server,
                 type,
                 item_id="",
                 class_name="",
                 field_errors={}):
        """Create a new DTML context.

        'type' -- Either "test" or "resource".

        'server' -- The 'QMTestServer' creating this page.

        'item_id' -- The item ID to show.

        'class_name' -- The class name to show.

        'field_errors' -- A mapping of error messages for fields.  Keys
        may be "_id" or "_class"."""

        # Initialize the base class.
        QMTestPage.__init__(self, "new.dtml", server)
        # Set up attributes.
        assert type in ["test", "resource"]
        self.database = server.GetDatabase()
        self.type = type
        self.item_id = item_id
        self.class_name = class_name
        if type == "test":
            self.class_names = self.database.GetTestClassNames()
        elif type == "resource":
            self.class_names = self.database.GetResourceClassNames()
        self.field_errors = field_errors


00830     def GetTitle(self):
        """Return the title this page."""

        return "Create a New %s" % string.capwords(self.type)


00836     def GetClassDescriptions(self):
        """Return a description of the available classes.

        returns -- Structured text describing each of the available
        test or resource classes."""

        desc = "**Available Classes**\n\n"
        for n in self.class_names:
            c = qm.test.base.get_extension_class(n, self.type,
                                                 self.database)
            d = qm.extension.get_class_description(c, brief=1)
            desc = desc + "  * " + n + "\n\n    " +  d + "\n\n"

        return desc
    
            
00852     def MakeSubmitUrl(self):
        """Return the URL for submitting the form.

        The URL is for the script 'create-test' or 'create-resource' as
        appropriate."""

        return qm.web.WebRequest("create-" + self.type,
                                 base=self.request) \
               .AsUrl()



00864 class NewSuitePage(QMTestPage):
    """Page for creating a new test suite."""

00867     def __init__(self, server, suite_id="", field_errors={}):
        """Create a new DTML context.

        'server' -- The 'QMTestServer' creating this page.

        'suite_id' -- Initial value for the new test suite ID field.

        'field_errors' -- A mapping of error messages to fields.  If
        empty, there are no errors."""

        # Initialize the base class.
        QMTestPage.__init__(self, "new-suite.dtml", server)
        # Set up attributes.
        self.suite_id = suite_id
        self.field_errors = field_errors



00885 class ResultPage(QMTestPage):
    """DTML page for showing result detail."""

00888     def __init__(self, server, result):
        """Construct a new 'ResultPage'

        'server' -- The 'QMTestServer' creating this page.

        'result' -- The result to display."""

        QMTestPage.__init__(self, "result.dtml", server)
        self.result = result
        if result.GetKind() == Result.TEST:
            self.run_menu_items.append(("This Test",
                                        "javascript:run_test();"))

    def GetResultURL(self, id):

        return qm.web.WebRequest("show-result",
                                 base = self.request,
                                 id = id).AsUrl()


    def GetRunURL(self):

        return qm.web.WebRequest("run-tests",
                                 base = self.request,
                                 ids = self.result.GetId()).AsUrl()



00916 class SetExpectationPage(QMTestPage):
    """DTML page for setting the expectation associated with a test."""

00919     def __init__(self, server, id):
        """Construct a new 'SetExpectationPage'.

        'server' -- The 'QMTestServer' creating this page.

        'id' -- The name of the test whose expectation is being set."""

        QMTestPage.__init__(self, "set-expectation.dtml", server)
        self.outcomes = ["None"] + Result.outcomes
        

        
00931 class ShowItemPage(QMTestPage):
    """DTML page for showing and editing tests and resources."""

00934     def __init__(self, server, item, edit, new, type, field_errors={}):
        """Construct a new DTML context.
        
        These parameters are also available in DTML under the same name:

        'server' -- The 'QMTestServer' creating this page.
        
        'item' -- The 'TestDescriptor' or 'ResourceDescriptor' for the
        test being shown.

        'edit' -- True for editing the item; false for displaying it
        only.

        'new' -- True for editing a newly-created item ('edit' is then
        also true).

        'type' -- Either "test" or "resource".

        'field_errors' -- A map from field names to corresponding error
        messages."""

        # Initialize the base class.
        QMTestPage.__init__(self, "show.dtml", server)
        # Set up attributes.
        self.__database = server.GetDatabase()
        self.item = item
        self.fields = item.GetClassArguments()
        self.edit = edit
        self.new = new
        assert type in ["test", "resource"]
        self.type = type
        self.field_errors = field_errors

        if self.__database.IsModifiable():
            self.edit_menu_items.append(("Edit %s" % string.capitalize(type),
                                         "javascript:edit_item();"))
            self.edit_menu_items.append(("Delete %s" % string.capitalize(type),
                                         "javascript:delete_item();"))

        if type == "test" and not edit:
            self.run_menu_items.append(("This Test", "javascript:run_test();"))


00977     def GetTitle(self):
        """Return the page title for this page."""

        # Map the scriptname to a nicely-formatted title.
        url = self.request.GetScriptName()
        title = {
            "show-test":       "Show Test",
            "edit-test":       "Edit Test",
            "create-test":     "New Test",
            "show-resource":   "Show Resource",
            "edit-resource":   "Edit Resource",
            "create-resource": "New Resource",
            }[url]
        # Show the item's ID too.
        title = title + " " + self.item.GetId()
        return title


00995     def FormatFieldValue(self, field):
        """Return an HTML rendering of the value for 'field'."""

        # Extract the field value.
        arguments = self.item.GetArguments()
        field_name = field.GetName()
        try:
            value = arguments[field_name]
        except KeyError:
            # Use the default value if none is provided.
            value = field.GetDefaultValue()
        # Format it appropriately.
        server = self.server
        if self.edit:
            if field.IsHidden():
                return field.FormatValueAsHtml(server, value, "hidden")
            elif field.IsReadOnly():
                # For read-only fields, we still need a form input, but
                # the user shouldn't be able to change anything.  Use a
                # hidden input, and display the contents as if this
                # wasn't an editing form.
                return field.FormatValueAsHtml(server, value, "hidden") \
                       + field.FormatValueAsHtml(server, value, "full")
            else:
                return field.FormatValueAsHtml(server, value, "edit")
        else:
            return field.FormatValueAsHtml(server, value, "full")


01024     def GetClassDescription(self):
        """Return a full description of the test or resource class.

        returns -- The description, formatted as HTML."""

        d = qm.extension.get_class_description(self.item.GetClass())
        return qm.web.format_structured_text(d)


01033     def GetBriefClassDescription(self):
        """Return a brief description of the test or resource class.

        returns -- The brief description, formatted as HTML."""

        d = qm.extension.get_class_description(self.item.GetClass(),
                                               brief=1)
        return qm.web.format_structured_text(d)


01043     def MakeEditUrl(self):
        """Return the URL for editing this item."""

        return qm.web.WebRequest("edit-" + self.type,
                                 base=self.request,
                                 id=self.item.GetId()).AsUrl()

        
01051     def MakeRunUrl(self):
        """Return the URL for running this item."""

        return qm.web.WebRequest("run-tests",
                                 base=self.request,
                                 ids=self.item.GetId()).AsUrl()


01059     def MakeShowUrl(self):
        """Return the URL for showing this item."""

        return qm.web.WebRequest("show-" + self.type,
                                 base=self.request,
                                 id=self.item.GetId()).AsUrl()


01067     def MakeSubmitUrl(self):
        """Return the URL for submitting edits."""

        return qm.web.WebRequest("submit-" + self.type,
                                 base=self.request).AsUrl()


01074     def MakeDeleteScript(self):
        """Make a script to confirm deletion of the test or resource.

        returns -- JavaScript source to handle deletion of the
        test or resource."""

        item_id = self.item.GetId()
        delete_url = qm.web.make_url("delete-" + self.type,
                                     base_request=self.request,
                                     id=item_id)
        message = """
        <p>Are you sure you want to delete the %s %s?</p>
        """ % (self.type, item_id)
        return self.server.MakeConfirmationDialog(message, delete_url)



01091 class ShowSuitePage(QMTestPage):
    """Page for displaying the contents of a test suite."""

01094     def __init__(self, server, suite, edit, is_new_suite):
        """Construct a new DTML context.

        'server' -- The 'QMTestServer' creating this page.
        
        'suite' -- The 'Suite' instance to display.

        'edit' -- If true, display controls for editing the suite.

        'is_new_suite' -- If true, the suite being displayed is being
        created at this time."""

        # Initialize the base class.
        QMTestPage.__init__(self, "suite.dtml", server)

        # It does not make sense to display a new suite without being
        # able to edit it; there is nothing to show.
        assert edit or not is_new_suite 
        
        # Set up attributes.
        database = server.GetDatabase()
        self.suite = suite
        self.test_ids = suite.GetTestIds()
        self.suite_ids = suite.GetSuiteIds()
        self.edit = edit
        self.is_new_suite = is_new_suite
        
        if not suite.IsImplicit() and database.IsModifiable():
            self.edit_menu_items.append(("Edit Suite",
                                         "javascript:edit_suite();"))
            self.edit_menu_items.append(("Delete Suite",
                                         "javascript:delete_suite();"))

        if not edit:
            self.run_menu_items.append(("This Suite",
                                        "javascript:run_suite();"))
            
        if edit:
            # Find the directory path containing this suite.
            (dirname, basename) = self.GetDatabase().SplitLabel(suite.GetId())

            # Construct a list of all test IDs, relative to the suite,
            # that are not explicitly included in the suite.
            excluded_test_ids = database.GetTestIds(dirname)
            for test_id in self.test_ids:
                if test_id in excluded_test_ids:
                    excluded_test_ids.remove(test_id)
            # Make controls for adding or removing test IDs.
            self.test_id_controls = qm.web.make_choose_control(
                "test_ids",
                "Included Tests",
                self.test_ids,
                "Available Tests",
                excluded_test_ids)

            # Likewise for suite IDs.
            excluded_suite_ids = database.GetSuiteIds(dirname)
            for suite_id in self.suite_ids:
                if suite_id in excluded_suite_ids:
                    excluded_suite_ids.remove(suite_id)
            # Don't show the suite as a candidate for inclusion in
            # itself. 
            self_suite_id = basename
            if self_suite_id in excluded_suite_ids:
                excluded_suite_ids.remove(self_suite_id)
            # Make controls for adding or removing suite IDs.
            self.suite_id_controls = qm.web.make_choose_control(
                "suite_ids",
                "Included Suites",
                self.suite_ids,
                "Available Suites",
                excluded_suite_ids)


01168     def MakeEditUrl(self):
        """Return the URL for editing this suite."""

        return qm.web.WebRequest("edit-suite",
                                 base=self.request,
                                 id=self.suite.GetId()) \
               .AsUrl()

        
01177     def MakeRunUrl(self):
        """Return the URL for running this suite."""

        return qm.web.WebRequest("run-tests",
                                 base=self.request,
                                 ids=self.suite.GetId()) \
               .AsUrl()

    
01186     def MakeDeleteScript(self):
        """Make a script to confirm deletion of the suite.

        returns -- JavaScript source for a function, 'delete_script',
        which shows a popup confirmation window."""

        suite_id = self.suite.GetId()
        delete_url = qm.web.make_url("delete-suite",
                                     base_request=self.request,
                                     id=suite_id)
        message = """
        <p>Are you sure you want to delete the suite %s?</p>
        """ % suite_id
        return self.server.MakeConfirmationDialog(message, delete_url)

        
        
01203 class StorageResultsStream(ResultStream):
    """A 'StorageResultsStream' stores results.

    A 'StorageResultsStream' does not write any output.  It simply
    stores the results for future display."""

01209     def __init__(self):
        """Construct a 'StorageResultsStream'."""

        super(StorageResultsStream, self).__init__({})
        self.__test_results = {}
        self.__test_results_in_order = []
        self.__resource_results = {}
        # The stream is not finished yet.
        self.__is_finished = 0
        # And there are no annotations yet.
        self.__annotations = {}
        
        # Create a lock for synchronization between the test execution
        # thread (which will call methods like 'WriteResults' and
        # 'Summarize') and the GUI thread (which will call
        # 'GetTestResults' and 'IsFinished').
        self.__lock = Lock()


01228     def GetAnnotations(self):
        """Return the annotations for this run."""

        return self.__annotations


01234     def WriteAnnotation(self, key, value):

        self.__annotations[key] = value


01239     def WriteResult(self, result):
        """Output a test result.

        'result' -- A 'Result'."""

        self.__lock.acquire()
        try:
            if result.GetKind() == Result.TEST:
                self.__test_results[result.GetId()] = result
                self.__test_results_in_order.append(result)
            else:
                self.__resource_results[result.GetId()] = result
        finally:
            self.__lock.release()
            

01255     def Summarize(self):
        """Output summary information about the results.

        When this method is called, the test run is complete.  Summary
        information should be displayed for the user, if appropriate.
        Any finalization, such as the closing of open files, should
        also be performed at this point.

        Derived class methods may override this method.  They should,
        however, invoke this version before returning."""

        # Mark the stream as finished.
        self.__lock.acquire()
        ResultStream.Summarize(self)
        self.__is_finished = 1
        self.__lock.release()
        

01273     def Start(self, test_ids):
        """Start collecting results.

        'test_ids' -- The names of the tests that we are about to run.
        
        Start collecting new results.  Discard results for the
        'test_ids', but not for other tests."""

        self.__lock.acquire()
        self.__is_finished = 0
        # Go through all of the tests we are about to run and remove
        # corresponding results.
        for id in test_ids:
            if self.__test_results.has_key(id):
                del self.__test_results[id]
            self.__test_results_in_order \
                = filter(lambda r, rs=self.__test_results: \
                             rs.has_key(r.GetId()),
                         self.__test_results_in_order)
        self.__lock.release()
        
        
01295     def IsFinished(self):
        """Return true iff no more results are forthcoming.

        returns -- True if no more results will be written to this
        stream."""

        self.__lock.acquire()
        finished = self.__is_finished
        self.__lock.release()
        return finished
    
        
01307     def GetTestResults(self):
        """Return the accumulated test results.

        returns -- A dictionary mapping test names to 'Result' objects."""

        self.__lock.acquire()
        results = self.__test_results
        self.__lock.release()
        return results
    

01318     def GetTestResultsInOrder(self):
        """Return the test results in the order they appeared.

        returns -- A sequence of test results, in the order that they
        appeared."""

        self.__lock.acquire()
        results = self.__test_results_in_order
        self.__lock.release()
        return results
    

01330     def GetResourceResults(self):
        """Return the accumulated resource results.
    
        returns -- A dictionary mapping resource names to 'Result'
        objects."""

        self.__lock.acquire()
        results = self.__resource_results
        self.__lock.release()
        return results


01342     def GetResult(self, name):
        """Return the 'Result' with the indicated 'name'.

        'name' -- A string giving the name of a test or resource result.

        returns -- The 'Result' instance corresponding to 'name'."""

        self.__lock.acquire()
        result = self.__test_results.get(name)
        if not result:
            result = self.__resource_results.get(name)
        self.__lock.release()

        return result


01358 class TestResultsPage(QMTestPage):
    """DTML page for displaying test results."""

01361     def __init__(self, server):
        """Construct a new 'TestResultsPage'.

        'server' -- The 'QMTestServer' creating this page."""
        
        # Initialize the base classes.
        QMTestPage.__init__(self, "results.dtml", server)

        results_stream = server.GetResultsStream()
        # It is important that we ask for IsFinished before asking
        # for GetTestResults.  The stream could be finished between
        # the two calls, and it is better to show all the results but
        # claim they are incomplete than to show only some of the
        # results and claim they are complete.
        self.__is_finished = results_stream.IsFinished()
        self.test_results = results_stream.GetTestResultsInOrder()
        self.expected_outcomes = server.GetExpectedOutcomes()
        

01380     def GetOutcomes(self):
        """Return the list of result outcomes.

        returns -- A sequence of result outcomes."""

        return Result.outcomes


01388     def GetTotal(self):
        """Return the total number of tests.

        returns -- The total number of tests."""

        return len(self.test_results)


01396     def GetTotalUnexpected(self):
        """Return the total number of unexpected results.

        returns -- The total number of unexpected results."""

        return len(self.GetRelativeResults(self.test_results, 0))


01404     def GetResultsWithOutcome(self, outcome):
        """Return the number of tests with the given 'outcome'.

        'outcome' -- One of the 'Result.outcomes'.

        returns -- The results with the given 'outcome'."""

        return filter(lambda r, o=outcome: r.GetOutcome() == o,
                      self.test_results)
    
        
01415     def GetCount(self, outcome):
        """Return the number of tests with the given 'outcome'.

        'outcome' -- One of the 'Result.outcomes'.

        returns -- The number of tests with the given 'outcome'."""

        return len(self.GetResultsWithOutcome(outcome))


01425     def GetUnexpectedCount(self, outcome):
        """Return the number of tests with the given 'outcome'.

        'outcome' -- One of the 'Result.outcomes'.

        returns -- The number of tests with the given 'outcome' that
        were expected to have some other outcome."""

        results = self.GetResultsWithOutcome(outcome)
        results = self.GetRelativeResults(results, 0)
        return len(results)

    
01438     def GetRelativeResults(self, results, expected):
        """Return the results that match, or fail to match, expectations.

        'results' -- A sequence of 'Result' objects.

        'expected' -- A boolean.  If true, expected results are
        returned.  If false, unexpected results are returned."""

        if expected:
            return filter(lambda r, er=self.expected_outcomes: \
                              r.GetOutcome() == er.get(r.GetId(),
                                                        Result.PASS),
                          results)
        else:
            return filter(lambda r, er=self.expected_outcomes: \
                              r.GetOutcome() != er.get(r.GetId(),
                                                        Result.PASS),
                          results)


01458     def GetDetailUrl(self, test_id):
        """Return the detail URL for a test.

        'test_id' -- The name of the test.

        returns -- The URL that contains details about the 'test_id'."""

        return qm.web.WebRequest("show-result",
                                 base=self.request,
                                 id=test_id).AsUrl()


01470     def IsFinished(self):
        """Returns true if tests are still running.

        returns -- True if the data on this page should be considered
        incomplete due to the fact that tests are still running."""

        return self.__is_finished


01479     def GetRefreshDelay(self):
        """Returns the number of seconds to wait before refreshing the page.

        returns -- The number of seconds to wait before refreshing this
        page.  A value of zero means that te page should never be
        refreshed.  This function is only called if 'IsFinished' returns
        true."""

        return 10

    
    
01491 class QMTestServer(qm.web.WebServer):
    """A 'QMTestServer' is the web GUI interface to QMTest."""

01494     def __init__(self, database, port, address, log_file,
                 targets, context, expectations):
        """Create and bind an HTTP server.

        'database' -- The test database to serve.

        'port' -- The port number on which to accept HTTP requests.

        'address' -- The local address to which to bind the server.  An
        empty string indicates all local addresses.

        'log_file' -- A file object to which the server will log requests.
        'None' for no logging.

        'targets' -- A sequence of 'Target' objects to use when running
        tests.

        'context' -- The 'Context' in which tests will execute."""

        qm.web.WebServer.__init__(self, port, address, log_file=log_file)

        self.__database = database
        self.__targets = targets
        self.__context = context
        
        # Base URL path for QMTest stuff.
        script_base = "/test/"
        # Register all our web pages.
        for name, function in [
            ( "clear-results", self.HandleClearResults ),
            ( "create-resource", self.HandleShowItem ),
            ( "create-suite", self.HandleCreateSuite ),
            ( "create-test", self.HandleShowItem ),
            ( "delete-resource", self.HandleDeleteItem ),
            ( "delete-suite", self.HandleDeleteSuite ),
            ( "delete-test", self.HandleDeleteItem ),
            ( "dir", self.HandleDir ),
            ( "edit-context", self.HandleEditContext ),
            ( "edit-resource", self.HandleShowItem ),
            ( "edit-suite", self.HandleEditSuite ),
            ( "edit-test", self.HandleShowItem ),
            ( "load-context", self.HandleLoadContext ),
            ( "load-expectations", self.HandleLoadExpectations ),
            ( "load-results", self.HandleLoadResults ),
            ( "new-resource", self.HandleNewResource ),
            ( "new-suite", self.HandleNewSuite ),
            ( "new-test", self.HandleNewTest ),
            ( "run-tests", self.HandleRunTests ),
            ( "set-expectation", self.HandleSetExpectation ),
            ( "show-dir", self.HandleDir ),
            ( "show-resource", self.HandleShowItem ),
            ( "show-result", self.HandleShowResult ),
            ( "show-results", self.HandleShowResults ),
            ( "show-suite", self.HandleShowSuite ),
            ( "show-test", self.HandleShowItem ),
            ( "shutdown", self.HandleShutdown ),
            ( "stop-tests", self.HandleStopTests ),
            ( "submit-context", self.HandleSubmitContext ),
            ( "submit-context-file", self.HandleSubmitContextFile ),
            ( "submit-expectation", self.HandleSubmitExpectation ),
            ( "submit-resource", self.HandleSubmitItem ),
            ( "submit-expectations", self.HandleSubmitExpectations ),
            ( "submit-expectations-form", self.HandleSubmitExpectationsForm ),
            ( "submit-results", self.HandleSubmitResults ),
            ( "submit-suite", self.HandleSubmitSuite ),
            ( "submit-test", self.HandleSubmitItem ),
            ( qm.test.cmdline.QMTest.context_file_name,
              self.HandleSaveContext ),
            ( qm.test.cmdline.QMTest.expectations_file_name,
              self.HandleSaveExpectations ),
            ( qm.test.cmdline.QMTest.results_file_name,
              self.HandleSaveResults ),
            ]:
            self.RegisterScript(script_base + name, function)
        self.RegisterPathTranslation(
            "/stylesheets", qm.get_share_directory("web", "stylesheets"))
        self.RegisterPathTranslation(
            "/images", qm.get_share_directory("web", "images"))
        self.RegisterPathTranslation(
            "/static", qm.get_share_directory("web", "static"))
        # Register the QM manual.
        self.RegisterPathTranslation(
            "/manual", qm.get_doc_directory("test", "html"))

        # The DB's attachment store processes download requests for
        # attachment data.
        attachment_store = database.GetAttachmentStore()
        if attachment_store:
            self.RegisterScript(qm.fields.AttachmentField.download_url,
                                attachment_store.HandleDownloadRequest)

        self.__expected_outcomes = expectations
        # There are no results yet.        
        self.__results_stream = StorageResultsStream()
        self.__results_stream.Summarize()
        # There is no execution thread.
        self.__execution_thread = None
        
        # Bind the server to the specified address.
        try:
            self.Bind()
        except qm.web.AddressInUseError, address:
            raise RuntimeError, qm.error("address in use", address=address)
        except qm.web.PrivilegedPortError:
            raise RuntimeError, qm.error("privileged port", port=port)


01601     def GetContext(self):
        """Return the 'Context' in which tests will be run.

        returns -- The 'Context' in which tests will be run."""

        return self.__context


01609     def GetDatabase(self):
        """Return the 'Database' handled by this server.

        returns -- The 'Database' handled by this server."""

        return self.__database
    
        
01617     def GetExpectedOutcomes(self):
        """Return the current expected outcomes for the test database.

        returns -- A map from test IDs to outcomes.  Some tests may have
        not have an entry in the map."""

        return self.__expected_outcomes


01626     def GetHTMLClassForOutcome(self, outcome):
        """Return the CSS class for the 'outcome'.

        'outcome' -- One of the result outcomes.
        
        returns -- The name of a CSS class.  These are used with <span>
        elements.  See 'qm.css'."""

        return {
            Result.PASS: "qmtest_pass",
            Result.FAIL: "qmtest_fail",
            Result.UNTESTED: "qmtest_untested",
            Result.ERROR: "qmtest_error",
            "EXPECTED" : "qmtest_expected"
            }[outcome]


01643     def GetResultsStream(self):
        """Return the 'StorageResultsStream' containing test results.

        returns -- The 'StorageResultsStream' associated with this
        server."""

        return self.__results_stream
    

01652     def HandleClearResults(self, request):
        """Handle a request to clear the current test results.

        'request' -- A 'WebRequest' object."""

        # Eliminate the old results stream.
        del self.__results_stream
        # And create a new one.
        self.__results_stream = StorageResultsStream()
        self.__results_stream.Summarize()

        # Redirect to the main page.
        request = qm.web.WebRequest("dir", base=request)
        raise qm.web.HttpRedirect, request
    

01668     def HandleCreateSuite(self, request):
        """Handle a submission of a new test suite.

        'request' -- A 'WebRequest' object."""

        field_errors = {}
        database = self.__database

        # Extract the suite ID of the new suite from the request.
        suite_id = request["id"]
        # Check that the ID is valid.
        if not database.IsValidLabel(suite_id, is_component = 0):
            field_errors["_id"] = qm.error("invalid id", id=suite_id)
        # Check that the ID doesn't already exist.
        elif database.HasSuite(suite_id):
            field_errors["_id"] = qm.error("suite already exists",
                                           suite_id=suite_id)

        # Were there any validation errors?
        if len(field_errors) > 0:
            # Yes.  Instead of showing the page for editing the suite,
            # redisplay the new suite page with error messages.
            return NewSuitePage(self, suite_id, field_errors)(request)
        else:
            # Everything looks good.  Make an empty suite.
            suite_class = qm.test.base.get_extension_class(
               "explicit_suite.ExplicitSuite",
               "suite",
               self.GetDatabase())
            extras = { suite_class.EXTRA_DATABASE : self.GetDatabase(),
                       suite_class.EXTRA_ID : suite_id }
            suite = suite_class({}, **extras)
            # Show the editing page.
            return ShowSuitePage(self, suite, edit=1, is_new_suite=1)(request)


01704     def HandleDeleteItem(self, request):
        """Handle a request to delete a test or resource.

        This function handles the script requests 'delete-test' and
        'delete-resource'.

        'request' -- A 'WebRequest' object.

        The ID of the test or resource to delete is specified in the 'id'
        field of the request."""

        database = self.__database
        # Extract the item ID.
        item_id = request["id"]
        # The script name determines whether we're deleting a test or an
        # resource. 
        script_name = request.GetScriptName()
        if script_name == "delete-test":
            database.RemoveExtension(item_id, database.TEST)
        elif script_name == "delete-resource":
            database.RemoveExtension(item_id, database.RESOURCE)
        else:
            raise RuntimeError, "unrecognized script name"
        # Redirect to the main page.
        request = qm.web.WebRequest("dir", base=request)
        raise qm.web.HttpRedirect, request


01732     def HandleDeleteSuite(self, request):
        """Handle a request to delete a test suite.

        'request' -- A 'WebRequest' object.

        The ID of the suite to delete is specified in the 'id' field of the
        request."""

        database = self.__database
        # Extract the suite ID.
        suite_id = request["id"]
        database.RemoveExtension(suite_id, database.SUITE)
        # Redirect to the main page.
        raise qm.web.HttpRedirect, qm.web.WebRequest("dir", base=request)


01748     def HandleDir(self, request):
        """Generate a directory page.

        'request' -- A 'WebRequest' object.

        The request has these fields:

        'path' -- A path in test/resource/suite ID space.  If specified,
        only tests and resources in this subtree are displayed, and their
        IDs are displayed relative to this path.  If omitted, the entire
        contents of the test database are shown."""

        path = request.get("id", "")
        return DirPage(self, path)(request)


01764     def HandleEditContext(self, request):
        """Handle a request to edit the context.

        'request' -- The 'WebRequest' that caused the event."""

        context_page = ContextPage(self)
        return context_page(request)
        

01773     def HandleEditSuite(self, request):
        """Generate the page for editing a test suite."""

        return self.HandleShowSuite(request, edit=1)


01779     def HandleLoadContext(self, request):
        """Handle a request to upload a context file.
        
        'request' -- The 'WebRequest' that caused the event."""

        return LoadContextPage(self)(request)
    
        
01787     def HandleLoadExpectations(self, request):
        """Handle a request to upload results.
        
        'request' -- The 'WebRequest' that caused the event."""

        return LoadExpectationsPage(self)(request)

        
01795     def HandleLoadResults(self, request):
        """Handle a request to upload results.
        
        'request' -- The 'WebRequest' that caused the event."""

        return LoadResultsPage(self)(request)


01803     def HandleNewResource(self, request):
        """Handle a request to create a new test.

        'request' -- The 'WebRequest' that caused the event."""

        return NewItemPage(self, "resource")(request)


01811     def HandleNewTest(self, request):
        """Handle a request to create a new test.

        'request' -- The 'WebRequest' that caused the event."""

        return NewItemPage(self, "test")(request)


01819     def HandleNewSuite(self, request):
        """Handle a request to create a new suite.

        'request' -- The 'WebRequest' that caused the event."""

        return NewSuitePage(self)(request)


01827     def HandleRunTests(self, request):
        """Handle a request to run tests.

        'request' -- The 'WebRequest' that caused the event.

        These fields in 'request' are used:

          'ids' -- A comma-separated list of test and suite IDs.  These IDs
          are expanded into the list of IDs of tests to run.

        """
        
        # Extract and expand the IDs of tests to run.
        if request.has_key("ids"):
            ids = string.split(request["ids"], ",")
        else:
            ids = [""]
        test_ids = self.GetDatabase().ExpandIds(ids)[0]

        # Let the results stream know that we are going to start
        # providing it with results.
        self.__results_stream.Start(test_ids)
        
        # Create the thread that will run all of the tests.
        del self.__execution_thread
        test_ids.sort()
        self.__execution_thread = \
          ExecutionThread(self.__database, test_ids, self.__context,
                          self.__targets, [self.__results_stream],
                          self.__expected_outcomes)
        # Start the thread.
        self.__execution_thread.start()

        # Sleep for a few seconds so that if we're only running one
        # test there's a good chance that it will finish before we
        # show the results page.
        time.sleep(5)
        
        # Redirect to the results page.
        request = qm.web.WebRequest("show-results", base=request)
        raise qm.web.HttpRedirect, request


01870     def HandleSaveContext(self, request):
        """Handlea  request to save the context to a file.

        'request' -- The 'WebRequest' that caused the event."""

        # Start with the empty string.
        s = ""
        # Run through all of the context variables.
        for (name, value) in self.__context.items():
            s = s + "%s=%s\n" % (name, value)
            
        return ("application/x-qmtest-context", s)
    

01884     def HandleSaveExpectations(self, request):
        """Handle a request to save expectations to a file.

        'request' -- The 'WebRequest' that caused the event."""
        
        # Create a string stream to store the results.
        s = StringIO.StringIO()
        # Create a results stream for storing the results.
        rsc = qm.test.cmdline.get_qmtest().GetFileResultStreamClass()
        rs = rsc({ "file" : s })
        # Write all the results.
        for (id, outcome) in self.__expected_outcomes.items():
            r = Result(Result.TEST, id, outcome)
            rs.WriteResult(r)
        # Terminate the stream.
        rs.Summarize()
        # Extract the data.
        data = s.getvalue()
        # Close the stream.
        s.close()
        
        return ("application/x-qmtest-results", data)
        

01908     def HandleSaveResults(self, request):
        """Handle a request to save results to a file.

        'request' -- The 'WebRequest' that caused the event."""

        # Create a string stream to store the results.
        s = StringIO.StringIO()
        # Create a results stream for storing the results.
        rsc = qm.test.cmdline.get_qmtest().GetFileResultStreamClass()
        rs = rsc({ "file" : s })
        # Write all the annotations.
        rs.WriteAllAnnotations(self.__results_stream.GetAnnotations())
        # Write all the results.
        for r in self.__results_stream.GetTestResults().values():
            rs.WriteResult(r)
        for r in self.__results_stream.GetResourceResults().values():
            rs.WriteResult(r)
        # Terminate the stream.
        rs.Summarize()
        # Extract the data.
        data = s.getvalue()
        # Close the stream.
        s.close()
        
        return ("application/x-qmtest-results", data)
    

01935     def HandleSetExpectation(self, request):
        """Handle a request to set expectations.

        'request' -- A 'WebRequest' object."""

        return SetExpectationPage(self, request["id"])(request)
    
        
01943     def HandleShowItem(self, request):
        """Handle a request to show a test or resource.

        'request' -- A 'WebRequest' object.

        This function generates pages to handle these requests:

          'create-test' -- Generate a form for initial editing of a test
          about to be created, given its test ID and test class.

          'create-resource' -- Likewise for an resource.

          'show-test' -- Display a test.

          'show-resource' -- Likewise for an resource.

          'edit-test' -- Generate a form for editing an existing test.

          'edit-resource' -- Likewise for an resource.

        This function distinguishes among these cases by checking the script
        name of the request object.

        The request must have the following fields:

          'id' -- A test or resource ID.  For show or edit pages, the ID of an
          existing item.  For create pages, the ID of the item being
          created.

          'class' -- For create pages, the name of the test or resource
          class.

        """

        # Paramaterize this function based on the request's script name.
        url = request.GetScriptName()
        edit, create, type = {
            "show-test":       (0, 0, "test"),
            "edit-test":       (1, 0, "test"),
            "create-test":     (1, 1, "test"),
            "show-resource":   (0, 0, "resource"),
            "edit-resource":   (1, 0, "resource"),
            "create-resource": (1, 1, "resource"),
            }[url]

        database = self.__database

        try:
            # Determine the ID of the item.
            item_id = request["id"]
        except KeyError:
            # The user probably submitted the form without entering an ID.
            message = qm.error("no id for show")
            return qm.web.generate_error_page(request, message)

        if create:
            # We're in the middle of creating a new item.  
            class_name = request["class"]

            # First perform some validation.
            field_errors = {}
            # Check that the ID is valid.
            if not database.IsValidLabel(item_id, is_component = 0):
                field_errors["_id"] = qm.error("invalid id", id=item_id)
            else:
                # Check that the ID doesn't already exist.
                if type is "resource":
                    if database.HasResource(item_id):
                        field_errors["_id"] \
                           = qm.error("resource already exists",
                                      resource_id=item_id)
                elif type is "test":
                    if database.HasTest(item_id):
                        field_errors["_id"] = qm.error("test already exists",
                                                       test_id=item_id)
            # Check that the class exists.
            try:
                qm.test.base.get_extension_class(class_name, type,
                                                 database)
            except ValueError:
                # The class name was incorrectly specified.
                field_errors["_class"] = qm.error("invalid class name",
                                                  class_name=class_name)
            except:
                # Can't find the class.
                field_errors["_class"] = qm.error("class not found",
                                                  class_name=class_name)
            # Were there any errors?
            if len(field_errors) > 0:
                # Yes.  Instead of showing the edit page, re-show the new
                # item page.
                page = NewItemPage(server=self,
                                   type=type,
                                   item_id=item_id,
                                   class_name=class_name,
                                   field_errors=field_errors)
                return page(request)

            # Construct a test with default argument values, as the
            # starting point for editing.
            if type is "resource":
                item = self.MakeNewResource(class_name, item_id)
            elif type is "test":
                item = self.MakeNewTest(class_name, item_id)
        else:
            # We're showing or editing an existing item.
            # Look it up in the database.
            if type is "resource":
                try:
                    item = database.GetResource(item_id)
                except qm.test.database.NoSuchTestError, e:
                    # An test with the specified test ID was not fount.
                    # Show a page indicating the error.
                    return qm.web.generate_error_page(request, str(e))
            elif type is "test":
                try:
                    item = database.GetTest(item_id)
                except qm.test.database.NoSuchResourceError, e:
                    # An test with the specified resource ID was not fount.
                    # Show a page indicating the error.
                    return qm.web.generate_error_page(request, str(e))

        # Generate HTML.
        return ShowItemPage(self, item, edit, create, type)(request)


02069     def HandleShowResult(self, request):
        """Handle a request to show result detail.

        'request' -- The 'WebRequest' that caused the event."""

        name = request["id"]
        result = self.__results_stream.GetResult(name)
        return ResultPage(self, result)(request)
    

02079     def HandleShowResults(self, request):
        """Handle a request to show results.

        'request' -- The 'WebRequest' that caused the event."""

        # Display the results.
        results_page = TestResultsPage(self)
        return results_page(request)


02089     def HandleShowSuite(self, request, edit=0):
        """Generate the page for displaying or editing a test suite.

        'request' -- A 'WebRequest' object.

        'edit' -- If true, display the page for editing the suite.
        Otherwise, just display the suite.

        The request has the following fields:

          'id' -- The ID of the suite to display or edit."""

        database = self.__database

        try:
            # Determine the suite ID.
            suite_id = request["id"]
        except KeyError:
            # No suite ID was given.
            message = qm.error("no id for show")
            return qm.web.generate_error_page(request, message)
        else:
            suite = database.GetSuite(suite_id)
        # Generate HTML.
        return ShowSuitePage(self, suite, edit, is_new_suite=0)(request)


02116     def HandleShutdown(self, request):
        """Handle a request to shut down the server.

        'request' -- The 'WebRequest' that caused the event."""

        raise SystemExit, None


02124     def HandleStopTests(self, request):
        """Handle a request to stop test execution.

        'request' -- The 'WebRequest' that caused the event."""

        # Stop the thread.
        self.__execution_thread.RequestTermination()
        # Redirect to the results page.
        request = qm.web.WebRequest("show-results", base=request)
        raise qm.web.HttpRedirect, request


02136     def HandleSubmitContext(self, request):
        """Handle a context submission..

        'request' -- The 'WebRequest' that caused the event.  The
        'request' must have a 'context_vars' key, whose value is the
        the context variables."""

        vars = qm.web.decode_properties(request["context_vars"])
        self.__context = Context()
        for k in vars.keys():
            self.__context[k] = vars[k]

        # Redirect to the main page.
        request = qm.web.WebRequest("dir", base=request)
        raise qm.web.HttpRedirect, request


02153     def HandleSubmitContextFile(self, request):
        """Handle a context file submission..

        'request' -- The 'WebRequest' that caused the event."""

        # The context data.
        data = request["file"]
        # Create a file objet to read from.
        file = StringIO.StringIO(data)
        # Parse the assignments in the context file.
        assignments = qm.common.read_assignments(file)
        # Add them to the context.
        for (name, value) in assignments.items():
            try:
                self.__context[name] = value
            except ValueError:
                # Skip any invalid assignments.
                pass
        # Redirect to the main page.
        return self._ClosePopupAndRedirect("dir")


02175     def HandleSubmitExpectation(self, request):
        """Handle setting a single expectation.

        'request' -- The 'WebRequest' that caused the event."""

        id = request["id"]
        outcome = request["outcome"]
        self.__expected_outcomes[id] = outcome
        # Close the upload popup window, and reload the main window.
        return self._ClosePopupAndRedirect(request["url"])
        
        
02187     def HandleSubmitExpectations(self, request):
        """Handle uploading expected results.

        'request' -- The 'WebRequest' that caused the event."""

        # Get the results file data.
        data = request["file"]
        # Create a file object from the data.
        f = StringIO.StringIO(data)
        # Read the results.
        self.__expected_outcomes = \
            qm.test.base.load_outcomes(f, self.GetDatabase())
        # Close the upload popup window, and redirect the main window
        # to the root of the database.
        return self._ClosePopupAndRedirect("dir")
        

02204     def HandleSubmitExpectationsForm(self, request):
        """Handle uploading expected results.

        'request' -- The 'WebRequest' that caused the event."""

        # Clear out the current set of expected outcomes; the entire
        # set of new 
        self.__expected_outcomes = {}
        
        # Loop over all the tests.
        for id in self.GetDatabase().ExpandIds("")[0]:
            outcome = request[id]
            if outcome != "None":
                self.__expected_outcomes[id] = outcome

        # Redirect to the main page.
        request = qm.web.WebRequest("dir", base=request)
        raise qm.web.HttpRedirect, request
    
        
02224     def HandleSubmitItem(self, request):
        """Handle a test or resource submission.

        This function handles submission of the test or resource editing form
        generated by 'handle_show'.  The script name in 'request' should be
        'submit-test' or 'submit-resource'.  It constructs the appropriate
        'Test' or 'Resource' object and writes it to the database, either as a
        new item or overwriting an existing item.

        The request must have the following form fields:

        'id' -- The test or resource ID of the item being edited or created.

        'class' -- The name of the test or resource class of this item.

        arguments -- Argument values are encoded in fields whose names start
        with 'qm.fields.Field.form_field_prefix'."""

        if request.GetScriptName() == "submit-test":
            type = "test"
        elif request.GetScriptName() == "submit-resource":
            type = "resource"

        # Make sure there's an ID in the request, and extract it.
        try:
            item_id = request["id"]
        except KeyError:
            message = qm.error("no id for submit")
            return qm.web.generate_error_page(request, message)

        database = self.__database
        # Learn whether or not this is a new item.
        is_new = int(request["is_new"])
        # Extract the class and field specification.
        item_class_name = request["class"]
        item_class = qm.test.base.get_extension_class(item_class_name,
                                                      type,
                                                      database)
        fields = get_class_arguments(item_class)

        # We'll perform various kinds of validation as we extract form
        # fields.  Errors are placed into this map.
        field_errors = {}
        redisplay = 0
        
        # Loop over fields of the class, looking for arguments in the
        # submitted request.
        arguments = {}
        temporary_store = self.GetTemporaryAttachmentStore()
        main_store = database.GetAttachmentStore()
        attachment_stores = { id(temporary_store): temporary_store,
                              id(main_store): main_store }
        for field in fields:
            # Construct the name we expect for the corresponding argument.
            field_name = field.GetName()
            form_field_name = field.GetHtmlFormFieldName()
            # Parse the value for this field.
            try:
                value, r = field.ParseFormValue(request, form_field_name,
                                                attachment_stores)
                if r:
                    redisplay = 1
                arguments[field_name] = value
            except:
                # Something went wrong parsing the value.  Associate an
                # error message with this field.
                message = str(sys.exc_info()[1])
                field_errors[field_name] = message
                redisplay = 1

        if type is "test":
            # Create a new test.
            item = TestDescriptor(
                    database,
                    test_id=item_id,
                    test_class_name=item_class_name,
                    arguments=arguments)

        elif type is "resource":
            # Create a new resource.
            item = ResourceDescriptor(database, item_id,
                                      item_class_name, arguments)

        # If necessary, redisplay the form.
        if redisplay:
          request = qm.web.WebRequest("edit-" + type, base=request, 
                                        id=item_id)
          return ShowItemPage(self, item, 1, is_new, type,
                                field_errors)(request)

        # Store it in the database.
        database.WriteExtension(item_id, item.GetItem())

        # Redirect to a page that displays the newly-edited item.
        request = qm.web.WebRequest("show-" + type, base=request, id=item_id)
        raise qm.web.HttpRedirect, request


02322     def HandleSubmitResults(self, request):
        """Handle uploading results.

        'request' -- The 'WebRequest' that caused the event."""

        # Get the results file data.
        data = request["file"]
        # Create a file object from the data.
        f = StringIO.StringIO(data)
        # Read the results.
        results = qm.test.base.load_results(f, self.GetDatabase())
        # Enter them into a new results stream.
        self.__results_stream = StorageResultsStream()
        annotations = results.GetAnnotations()
        self.__results_stream.WriteAllAnnotations(annotations)
        for r in results:
            self.__results_stream.WriteResult(r)
        self.__results_stream.Summarize()
        # Close the upload popup window, and redirect the main window
        # to a view of the results.
        return self._ClosePopupAndRedirect("show-results")


02345     def HandleSubmitSuite(self, request):
        """Handle test suite submission.

        'request' -- A 'WebRequest' object.

        The request object has these fields:

          'id' -- The ID of the test suite being edited.  If a suite with
          this ID exists, it is replaced (it must not be an implicit suite
          though).  Otherwise a new suite is edited.

          'test_ids' -- A comma-separated list of test IDs to include in the
          suite, relative to the suite's own ID.

          'suite_ids' -- A comma-separated list of other test suite IDs to
          include in the suite, relative to the suite's own ID.
        """

        database = self.__database
        # Extract fields from the request.
        suite_id = request["id"]
        test_ids = request["test_ids"]
        if string.strip(test_ids) == "":
            test_ids = []
        else:
            test_ids = string.split(test_ids, ",")
        suite_ids = request["suite_ids"]
        if string.strip(suite_ids) == "":
            suite_ids = []
        else:
            suite_ids = string.split(suite_ids, ",")
        # Construct a new suite.
        suite_class = qm.test.base.get_extension_class(
            "explicit_suite.ExplicitSuite",
            "suite",
            self.GetDatabase())
        extras = { suite_class.EXTRA_DATABASE : self.GetDatabase(),
                   suite_class.EXTRA_ID : suite_id }
        suite = suite_class({ "test_ids" : test_ids,
                              "suite_ids" : suite_ids },
                            **extras)
        # Store it.
        database.WriteExtension(suite_id, suite)
        # Redirect to a page that displays the newly-edited item.
        raise qm.web.HttpRedirect, \
              qm.web.WebRequest("show-suite", base=request, id=suite_id)


02393     def MakeNewTest(self, test_class_name, test_id):
        """Create a new test with default arguments.

        'test_class_name' -- The name of the test class of which to create a
        new test.

        'test_id' -- The test ID of the new test.

        returns -- A new 'TestDescriptor' object."""

        test_class = qm.test.base.get_test_class(test_class_name,
                                                 self.GetDatabase())
        # Make sure there isn't already such a test.
        if self.GetDatabase().HasTest(test_id):
            raise RuntimeError, qm.error("test already exists",
                                         test_id=test_id)
        # Construct an argument map containing default values.
        arguments = {}
        for field in get_class_arguments(test_class):
            name = field.GetName()
            value = field.GetDefaultValue()
            arguments[name] = value
        # Construct a default test instance.
        return TestDescriptor(self.GetDatabase(), test_id,
                              test_class_name, arguments)


02420     def MakeNewResource(self, resource_class_name, resource_id):
        """Create a new resource with default arguments.

        'resource_class_name' -- The name of the resource class of which to
        create a new resource.

        'resource_id' -- The resource ID of the new resource.

        returns -- A new 'ResourceDescriptor' object."""

        resource_class \
          = qm.test.base.get_resource_class(resource_class_name,
                                            self.GetDatabase())
        # Make sure there isn't already such a resource.
        if self.GetDatabase().HasResource(resource_id):
            raise RuntimeError, qm.error("resource already exists",
                                         resource_id=resource_id)
        # Construct an argument map containing default values.
        arguments = {}
        for field in get_class_arguments(resource_class):
            name = field.GetName()
            value = field.GetDefaultValue()
            arguments[name] = value
        # Construct a default resource instance.
        return ResourceDescriptor(self.GetDatabase(), resource_id,
                                  resource_class_name, arguments)


02448     def _HandleRoot(self, request):
        """Handle the '/' URL."""

        raise qm.web.HttpRedirect, qm.web.WebRequest("/test/dir")


02454     def _ClosePopupAndRedirect(self, url):
        """Close the current window.  Redirect the main window to 'url'.

        'url' -- A string giving the URL to which the main window should
        be redirected.

        returns -- A string giving HTML that will close the current
        window and redirect the main window to 'url'."""

        return """<html><body><script language="JavaScript">
                  window.opener.location = '%s';
                  window.close();</script></body></html>""" % url
        
########################################################################
# initialization
########################################################################

# Use our 'DefaultDtmlPage' subclass even when generating generic
# (non-QMTest) pages.
qm.web.DtmlPage.default_class = DefaultDtmlPage

########################################################################
# Local Variables:
# mode: python
# indent-tabs-mode: nil
# fill-column: 72
# End:

Generated by  Doxygen 1.6.0   Back to index