Skip to content

jobs.py

JobTable

Bases: Table

A base table with no definition. Allows reserving jobs

Source code in datajoint/jobs.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class JobTable(Table):
    """
    A base table with no definition. Allows reserving jobs
    """

    def __init__(self, conn, database):
        self.database = database
        self._connection = conn
        self._heading = Heading(
            table_info=dict(
                conn=conn, database=database, table_name=self.table_name, context=None
            )
        )
        self._support = [self.full_table_name]

        self._definition = """    # job reservation table for `{database}`
        table_name  :varchar(255)  # className of the table
        key_hash  :char(32)  # key hash
        ---
        status  :enum('reserved','error','ignore')  # if tuple is missing, the job is available
        key=null  :blob  # structure containing the key
        error_message=""  :varchar({error_message_length})  # error message returned if failed
        error_stack=null  :mediumblob  # error stack if failed
        user="" :varchar(255) # database user
        host=""  :varchar(255)  # system hostname
        pid=0  :int unsigned  # system process id
        connection_id = 0  : bigint unsigned          # connection_id()
        timestamp=CURRENT_TIMESTAMP  :timestamp   # automatic timestamp
        """.format(
            database=database, error_message_length=ERROR_MESSAGE_LENGTH
        )
        if not self.is_declared:
            self.declare()
        self._user = self.connection.get_user()

    @property
    def definition(self):
        return self._definition

    @property
    def table_name(self):
        return "~jobs"

    def delete(self):
        """bypass interactive prompts and dependencies"""
        self.delete_quick()

    def drop(self):
        """bypass interactive prompts and dependencies"""
        self.drop_quick()

    def reserve(self, table_name, key):
        """
        Reserve a job for computation.  When a job is reserved, the job table contains an entry for the
        job key, identified by its hash. When jobs are completed, the entry is removed.

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        :return: True if reserved job successfully. False = the jobs is already taken
        """
        job = dict(
            table_name=table_name,
            key_hash=key_hash(key),
            status="reserved",
            host=platform.node(),
            pid=os.getpid(),
            connection_id=self.connection.connection_id,
            key=key,
            user=self._user,
        )
        try:
            with config(enable_python_native_blobs=True):
                self.insert1(job, ignore_extra_fields=True)
        except DuplicateError:
            return False
        return True

    def ignore(self, table_name, key):
        """
        Set a job to be ignored for computation.  When a job is ignored, the job table contains an entry for the
        job key, identified by its hash, with status "ignore".

        Args:
        table_name:
            Table name (str) - `database`.`table_name`
        key:
            The dict of the job's primary key

        Returns:
            True if ignore job successfully. False = the jobs is already taken
        """
        job = dict(
            table_name=table_name,
            key_hash=key_hash(key),
            status="ignore",
            host=platform.node(),
            pid=os.getpid(),
            connection_id=self.connection.connection_id,
            key=key,
            user=self._user,
        )
        try:
            with config(enable_python_native_blobs=True):
                self.insert1(job, ignore_extra_fields=True)
        except DuplicateError:
            return False
        return True

    def complete(self, table_name, key):
        """
        Log a completed job.  When a job is completed, its reservation entry is deleted.

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        """
        job_key = dict(table_name=table_name, key_hash=key_hash(key))
        (self & job_key).delete_quick()

    def error(self, table_name, key, error_message, error_stack=None):
        """
        Log an error message.  The job reservation is replaced with an error entry.
        if an error occurs, leave an entry describing the problem

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        :param error_message: string error message
        :param error_stack: stack trace
        """
        if len(error_message) > ERROR_MESSAGE_LENGTH:
            error_message = (
                error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)]
                + TRUNCATION_APPENDIX
            )
        with config(enable_python_native_blobs=True):
            self.insert1(
                dict(
                    table_name=table_name,
                    key_hash=key_hash(key),
                    status="error",
                    host=platform.node(),
                    pid=os.getpid(),
                    connection_id=self.connection.connection_id,
                    user=self._user,
                    key=key,
                    error_message=error_message,
                    error_stack=error_stack,
                ),
                replace=True,
                ignore_extra_fields=True,
            )

delete()

bypass interactive prompts and dependencies

Source code in datajoint/jobs.py
56
57
58
def delete(self):
    """bypass interactive prompts and dependencies"""
    self.delete_quick()

drop()

bypass interactive prompts and dependencies

Source code in datajoint/jobs.py
60
61
62
def drop(self):
    """bypass interactive prompts and dependencies"""
    self.drop_quick()

reserve(table_name, key)

Reserve a job for computation. When a job is reserved, the job table contains an entry for the job key, identified by its hash. When jobs are completed, the entry is removed.

Parameters:

Name Type Description Default
table_name

database.table_name

required
key

the dict of the job's primary key

required

Returns:

Type Description

True if reserved job successfully. False = the jobs is already taken

Source code in datajoint/jobs.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def reserve(self, table_name, key):
    """
    Reserve a job for computation.  When a job is reserved, the job table contains an entry for the
    job key, identified by its hash. When jobs are completed, the entry is removed.

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    :return: True if reserved job successfully. False = the jobs is already taken
    """
    job = dict(
        table_name=table_name,
        key_hash=key_hash(key),
        status="reserved",
        host=platform.node(),
        pid=os.getpid(),
        connection_id=self.connection.connection_id,
        key=key,
        user=self._user,
    )
    try:
        with config(enable_python_native_blobs=True):
            self.insert1(job, ignore_extra_fields=True)
    except DuplicateError:
        return False
    return True

ignore(table_name, key)

Set a job to be ignored for computation. When a job is ignored, the job table contains an entry for the job key, identified by its hash, with status "ignore".

Args: table_name: Table name (str) - database.table_name key: The dict of the job's primary key

Returns: True if ignore job successfully. False = the jobs is already taken

Source code in datajoint/jobs.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def ignore(self, table_name, key):
    """
    Set a job to be ignored for computation.  When a job is ignored, the job table contains an entry for the
    job key, identified by its hash, with status "ignore".

    Args:
    table_name:
        Table name (str) - `database`.`table_name`
    key:
        The dict of the job's primary key

    Returns:
        True if ignore job successfully. False = the jobs is already taken
    """
    job = dict(
        table_name=table_name,
        key_hash=key_hash(key),
        status="ignore",
        host=platform.node(),
        pid=os.getpid(),
        connection_id=self.connection.connection_id,
        key=key,
        user=self._user,
    )
    try:
        with config(enable_python_native_blobs=True):
            self.insert1(job, ignore_extra_fields=True)
    except DuplicateError:
        return False
    return True

complete(table_name, key)

Log a completed job. When a job is completed, its reservation entry is deleted.

Parameters:

Name Type Description Default
table_name

database.table_name

required
key

the dict of the job's primary key

required
Source code in datajoint/jobs.py
121
122
123
124
125
126
127
128
129
def complete(self, table_name, key):
    """
    Log a completed job.  When a job is completed, its reservation entry is deleted.

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    """
    job_key = dict(table_name=table_name, key_hash=key_hash(key))
    (self & job_key).delete_quick()

error(table_name, key, error_message, error_stack=None)

Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem

Parameters:

Name Type Description Default
table_name

database.table_name

required
key

the dict of the job's primary key

required
error_message

string error message

required
error_stack

stack trace

None
Source code in datajoint/jobs.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def error(self, table_name, key, error_message, error_stack=None):
    """
    Log an error message.  The job reservation is replaced with an error entry.
    if an error occurs, leave an entry describing the problem

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    :param error_message: string error message
    :param error_stack: stack trace
    """
    if len(error_message) > ERROR_MESSAGE_LENGTH:
        error_message = (
            error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)]
            + TRUNCATION_APPENDIX
        )
    with config(enable_python_native_blobs=True):
        self.insert1(
            dict(
                table_name=table_name,
                key_hash=key_hash(key),
                status="error",
                host=platform.node(),
                pid=os.getpid(),
                connection_id=self.connection.connection_id,
                user=self._user,
                key=key,
                error_message=error_message,
                error_stack=error_stack,
            ),
            replace=True,
            ignore_extra_fields=True,
        )