Skip to content

Resources

text2everything_sdk.resources.projects.ProjectsResource

Bases: BaseResource

Client for managing projects in the Text2Everything API.

Projects are the top-level containers for organizing contexts, schema metadata, golden examples, and other resources.

Source code in resources/projects.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class ProjectsResource(BaseResource):
    """
    Client for managing projects in the Text2Everything API.

    Projects are the top-level containers for organizing contexts, schema metadata,
    golden examples, and other resources.
    """

    def list(
        self,
        page: int = 1,
        per_page: int = 50,
        search: Optional[str] = None
    ) -> List[Project]:
        """
        List all projects.

        Args:
            page: Page number (default: 1)
            per_page: Items per page (default: 50)
            search: Search term to filter projects by name

        Returns:
            List of Project instances

        Example:
            >>> projects = client.projects.list()
            >>> for project in projects:
            ...     print(f"{project.name}: {project.description}")
        """
        params = {
            'page': page,
            'per_page': per_page
        }
        if search:
            params['search'] = search

        return self._paginate("projects", params=params, model_class=Project)

    def get(self, project_id: str) -> Project:
        """
        Get a specific project by ID.

        Args:
            project_id: The project ID

        Returns:
            Project instance

        Raises:
            NotFoundError: If project doesn't exist

        Example:
            >>> project = client.projects.get("proj_123")
            >>> print(project.name)
        """
        endpoint = self._build_endpoint("projects", project_id)
        response = self._client.get(endpoint)
        return self._create_model_instance(response, Project)

    def create(
        self,
        name: str,
        description: Optional[str] = None,
        **kwargs
    ) -> Project:
        """
        Create a new project.

        Args:
            name: Project name
            description: Project description
            **kwargs: Additional project fields

        Returns:
            Created Project instance

        Example:
            >>> project = client.projects.create(
            ...     name="My Project",
            ...     description="A sample project"
            ... )
        """
        data = ProjectCreate(
            name=name,
            description=description,
            **kwargs
        ).model_dump(exclude_none=True)

        response = self._client.post("projects", data=data)
        return self._create_model_instance(response, Project)

    def update(
        self,
        project_id: str,
        name: Optional[str] = None,
        description: Optional[str] = None,
        **kwargs
    ) -> Project:
        """
        Update an existing project.

        Args:
            project_id: The project ID
            name: New project name
            description: New project description
            **kwargs: Additional fields to update

        Returns:
            Updated Project instance

        Example:
            >>> project = client.projects.update(
            ...     "proj_123",
            ...     name="Updated Name"
            ... )
        """
        # Get current project data first since API expects complete data
        current_project = self.get(project_id)

        # Use current values as defaults, override with provided values
        update_data = ProjectCreate(
            name=name if name is not None else current_project.name,
            description=description if description is not None else current_project.description,
            **kwargs
        ).model_dump(exclude_none=True)

        endpoint = self._build_endpoint("projects", project_id)
        response = self._client.put(endpoint, data=update_data)
        return self._create_model_instance(response, Project)

    def delete(self, project_id: str) -> Dict[str, Any]:
        """
        Delete a project.

        Args:
            project_id: The project ID

        Returns:
            Deletion confirmation response

        Example:
            >>> result = client.projects.delete("proj_123")
            >>> print(result["message"])
        """
        endpoint = self._build_endpoint("projects", project_id)
        return self._client.delete(endpoint)

    def get_by_name(self, name: str) -> Optional[Project]:
        """
        Get a project by name.

        Args:
            name: Project name to search for

        Returns:
            Project instance if found, None otherwise

        Example:
            >>> project = client.projects.get_by_name("My Project")
            >>> if project:
            ...     print(f"Found project: {project.id}")
        """
        projects = self.list(search=name)
        for project in projects:
            if project.name == name:
                return project
        return None

    def exists(self, project_id: str) -> bool:
        """
        Check if a project exists.

        Args:
            project_id: The project ID

        Returns:
            True if project exists, False otherwise

        Example:
            >>> if client.projects.exists("proj_123"):
            ...     print("Project exists")
        """
        try:
            self.get(project_id)
            return True
        except Exception:
            return False

create(name, description=None, **kwargs)

Create a new project.

Parameters:

Name Type Description Default
name str

Project name

required
description Optional[str]

Project description

None
**kwargs

Additional project fields

{}

Returns:

Type Description
Project

Created Project instance

Example

project = client.projects.create( ... name="My Project", ... description="A sample project" ... )

Source code in resources/projects.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def create(
    self,
    name: str,
    description: Optional[str] = None,
    **kwargs
) -> Project:
    """
    Create a new project.

    Args:
        name: Project name
        description: Project description
        **kwargs: Additional project fields

    Returns:
        Created Project instance

    Example:
        >>> project = client.projects.create(
        ...     name="My Project",
        ...     description="A sample project"
        ... )
    """
    data = ProjectCreate(
        name=name,
        description=description,
        **kwargs
    ).model_dump(exclude_none=True)

    response = self._client.post("projects", data=data)
    return self._create_model_instance(response, Project)

delete(project_id)

Delete a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
Dict[str, Any]

Deletion confirmation response

Example

result = client.projects.delete("proj_123") print(result["message"])

Source code in resources/projects.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def delete(self, project_id: str) -> Dict[str, Any]:
    """
    Delete a project.

    Args:
        project_id: The project ID

    Returns:
        Deletion confirmation response

    Example:
        >>> result = client.projects.delete("proj_123")
        >>> print(result["message"])
    """
    endpoint = self._build_endpoint("projects", project_id)
    return self._client.delete(endpoint)

exists(project_id)

Check if a project exists.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
bool

True if project exists, False otherwise

Example

if client.projects.exists("proj_123"): ... print("Project exists")

Source code in resources/projects.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def exists(self, project_id: str) -> bool:
    """
    Check if a project exists.

    Args:
        project_id: The project ID

    Returns:
        True if project exists, False otherwise

    Example:
        >>> if client.projects.exists("proj_123"):
        ...     print("Project exists")
    """
    try:
        self.get(project_id)
        return True
    except Exception:
        return False

get(project_id)

Get a specific project by ID.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
Project

Project instance

Raises:

Type Description
NotFoundError

If project doesn't exist

Example

project = client.projects.get("proj_123") print(project.name)

Source code in resources/projects.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get(self, project_id: str) -> Project:
    """
    Get a specific project by ID.

    Args:
        project_id: The project ID

    Returns:
        Project instance

    Raises:
        NotFoundError: If project doesn't exist

    Example:
        >>> project = client.projects.get("proj_123")
        >>> print(project.name)
    """
    endpoint = self._build_endpoint("projects", project_id)
    response = self._client.get(endpoint)
    return self._create_model_instance(response, Project)

get_by_name(name)

Get a project by name.

Parameters:

Name Type Description Default
name str

Project name to search for

required

Returns:

Type Description
Optional[Project]

Project instance if found, None otherwise

Example

project = client.projects.get_by_name("My Project") if project: ... print(f"Found project: {project.id}")

Source code in resources/projects.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_by_name(self, name: str) -> Optional[Project]:
    """
    Get a project by name.

    Args:
        name: Project name to search for

    Returns:
        Project instance if found, None otherwise

    Example:
        >>> project = client.projects.get_by_name("My Project")
        >>> if project:
        ...     print(f"Found project: {project.id}")
    """
    projects = self.list(search=name)
    for project in projects:
        if project.name == name:
            return project
    return None

list(page=1, per_page=50, search=None)

List all projects.

Parameters:

Name Type Description Default
page int

Page number (default: 1)

1
per_page int

Items per page (default: 50)

50
search Optional[str]

Search term to filter projects by name

None

Returns:

Type Description
List[Project]

List of Project instances

Example

projects = client.projects.list() for project in projects: ... print(f"{project.name}: {project.description}")

Source code in resources/projects.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def list(
    self,
    page: int = 1,
    per_page: int = 50,
    search: Optional[str] = None
) -> List[Project]:
    """
    List all projects.

    Args:
        page: Page number (default: 1)
        per_page: Items per page (default: 50)
        search: Search term to filter projects by name

    Returns:
        List of Project instances

    Example:
        >>> projects = client.projects.list()
        >>> for project in projects:
        ...     print(f"{project.name}: {project.description}")
    """
    params = {
        'page': page,
        'per_page': per_page
    }
    if search:
        params['search'] = search

    return self._paginate("projects", params=params, model_class=Project)

update(project_id, name=None, description=None, **kwargs)

Update an existing project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name Optional[str]

New project name

None
description Optional[str]

New project description

None
**kwargs

Additional fields to update

{}

Returns:

Type Description
Project

Updated Project instance

Example

project = client.projects.update( ... "proj_123", ... name="Updated Name" ... )

Source code in resources/projects.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def update(
    self,
    project_id: str,
    name: Optional[str] = None,
    description: Optional[str] = None,
    **kwargs
) -> Project:
    """
    Update an existing project.

    Args:
        project_id: The project ID
        name: New project name
        description: New project description
        **kwargs: Additional fields to update

    Returns:
        Updated Project instance

    Example:
        >>> project = client.projects.update(
        ...     "proj_123",
        ...     name="Updated Name"
        ... )
    """
    # Get current project data first since API expects complete data
    current_project = self.get(project_id)

    # Use current values as defaults, override with provided values
    update_data = ProjectCreate(
        name=name if name is not None else current_project.name,
        description=description if description is not None else current_project.description,
        **kwargs
    ).model_dump(exclude_none=True)

    endpoint = self._build_endpoint("projects", project_id)
    response = self._client.put(endpoint, data=update_data)
    return self._create_model_instance(response, Project)

text2everything_sdk.resources.contexts.ContextsResource

Bases: BaseResource

Client for managing contexts in the Text2Everything API.

Contexts provide business rules and domain knowledge to improve SQL generation. They can contain text content or structured information that helps the AI understand the business context and generate more accurate SQL queries.

Source code in resources/contexts.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class ContextsResource(BaseResource):
    """
    Client for managing contexts in the Text2Everything API.

    Contexts provide business rules and domain knowledge to improve SQL generation.
    They can contain text content or structured information that helps the AI
    understand the business context and generate more accurate SQL queries.
    """

    def list(
        self,
        project_id: str,
        page: int = 1,
        per_page: int = 50,
        search: Optional[str] = None
    ) -> List[Context]:
        """
        List contexts for a specific project.

        Args:
            project_id: The project ID to list contexts for
            page: Page number (default: 1)
            per_page: Items per page (default: 50)
            search: Search term to filter contexts by name

        Returns:
            List of Context instances

        Example:
            >>> contexts = client.contexts.list(project_id="proj_123")
            >>> for context in contexts:
            ...     print(f"{context.name}: {context.is_always_displayed}")
        """
        params = {
            'page': page,
            'per_page': per_page
        }
        if search:
            params['search'] = search

        endpoint = self._build_endpoint("projects", project_id, "contexts")
        return self._paginate(endpoint, params=params, model_class=Context)

    def get(self, project_id: str, context_id: str) -> Context:
        """
        Get a specific context by ID.

        Args:
            project_id: The project ID
            context_id: The context ID

        Returns:
            Context instance

        Raises:
            NotFoundError: If context doesn't exist

        Example:
            >>> context = client.contexts.get("proj_123", "ctx_456")
            >>> print(context.content)
        """
        endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
        response = self._client.get(endpoint)
        return self._create_model_instance(response, Context)

    def create(
        self,
        project_id: str,
        name: str,
        content: str,
        description: Optional[str] = None,
        is_always_displayed: bool = False,
        **kwargs
    ) -> Context:
        """
        Create a new context.

        Args:
            project_id: The project ID
            name: Context name
            content: Context content (business rules, domain knowledge, etc.)
            description: Context description
            is_always_displayed: Whether this context should always be included
            **kwargs: Additional context fields

        Returns:
            Created Context instance

        Example:
            >>> context = client.contexts.create(
            ...     project_id="proj_123",
            ...     name="Business Rules",
            ...     content="Active customers have status = 'active'",
            ...     is_always_displayed=True
            ... )
        """
        data = ContextCreate(
            name=name,
            content=content,
            description=description,
            is_always_displayed=is_always_displayed,
            **kwargs
        ).model_dump(exclude_none=True)

        endpoint = self._build_endpoint("projects", project_id, "contexts")
        response = self._client.post(endpoint, data=data)
        return self._create_model_instance(response, Context)

    def update(
        self,
        project_id: str,
        context_id: str,
        name: Optional[str] = None,
        content: Optional[str] = None,
        description: Optional[str] = None,
        is_always_displayed: Optional[bool] = None,
        **kwargs
    ) -> Context:
        """
        Update an existing context.

        Args:
            project_id: The project ID
            context_id: The context ID
            name: New context name
            content: New context content
            description: New context description
            is_always_displayed: Whether this context should always be included
            **kwargs: Additional fields to update

        Returns:
            Updated Context instance

        Example:
            >>> context = client.contexts.update(
            ...     "proj_123", "ctx_456",
            ...     content="Updated business rules..."
            ... )
        """
        # Get current context data first since API expects complete data
        current_context = self.get(project_id, context_id)

        # Use current values as defaults, override with provided values
        update_data = ContextCreate(
            name=name if name is not None else current_context.name,
            content=content if content is not None else current_context.content,
            description=description if description is not None else current_context.description,
            is_always_displayed=is_always_displayed if is_always_displayed is not None else current_context.is_always_displayed,
            **kwargs
        ).model_dump(exclude_none=True)

        endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
        response = self._client.put(endpoint, data=update_data)
        return self._create_model_instance(response, Context)

    def delete(self, project_id: str, context_id: str) -> Dict[str, Any]:
        """
        Delete a context.

        Args:
            project_id: The project ID
            context_id: The context ID

        Returns:
            Deletion confirmation response

        Example:
            >>> result = client.contexts.delete("proj_123", "ctx_456")
            >>> print(result["message"])
        """
        endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
        return self._client.delete(endpoint)

    def bulk_create(
        self,
        project_id: str,
        contexts: List[Dict[str, Any]],
        parallel: bool = True,
        max_workers: Optional[int] = None,
        max_concurrent: int = 8,
        use_connection_isolation: bool = True
    ) -> List[Context]:
        """
        Create multiple contexts with optional parallel execution and rate limiting.

        Args:
            project_id: The project ID
            contexts: List of context data dictionaries
            parallel: Whether to execute requests in parallel (default: True)
            max_workers: Maximum number of parallel workers (default: min(16, len(items)))
            max_concurrent: Maximum number of concurrent requests to prevent server overload (default: 8)
            use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

        Returns:
            List of created Context instances in the same order as input

        Raises:
            ValidationError: If any validation fails

        Example:
            >>> contexts_data = [
            ...     {"name": "Rule 1", "content": "Business rule 1"},
            ...     {"name": "Rule 2", "content": "Business rule 2"}
            ... ]
            >>> # Parallel execution with rate limiting (default)
            >>> contexts = client.contexts.bulk_create("proj_123", contexts_data)
            >>> 
            >>> # Sequential execution
            >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, parallel=False)
            >>> 
            >>> # Custom rate limiting
            >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, max_concurrent=4)
            >>> 
            >>> # Disable connection isolation for shared connection pool
            >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, use_connection_isolation=False)
        """
        if not contexts:
            return []

        # Pre-validate all contexts
        all_errors = []
        for i, context_data in enumerate(contexts):
            try:
                if not context_data.get("name", "").strip():
                    all_errors.append(f"Item {i} ({context_data.get('name', 'unnamed')}): Name cannot be empty")
                if not context_data.get("content", "").strip():
                    all_errors.append(f"Item {i} ({context_data.get('name', 'unnamed')}): Content cannot be empty")
            except Exception as e:
                all_errors.append(f"Item {i}: Invalid data structure - {str(e)}")

        if all_errors:
            raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

        if not parallel or len(contexts) == 1:
            # Sequential execution
            results = []
            for context_data in contexts:
                result = self.create(
                    project_id=project_id,
                    **context_data
                )
                results.append(result)
            return results

        # Create the first item sequentially to avoid race conditions when creating collections
        first_result = self.create(
            project_id=project_id,
            **contexts[0]
        )

        # Parallel execution for the remaining items
        remaining = contexts[1:]
        if max_workers is None:
            max_workers = min(16, len(remaining))

        def create_single_context(indexed_data):
            """Helper function to create a single context with error handling."""
            index, context_data = indexed_data
            try:
                if use_connection_isolation:
                    # Create isolated HTTP client for this request to avoid connection conflicts
                    return index, self._create_with_isolated_client(
                        project_id=project_id,
                        context_data=context_data
                    ), None
                else:
                    # Use shared connection pool
                    return index, self.create(
                        project_id=project_id,
                        **context_data
                    ), None
            except Exception as e:
                return index, None, f"Item {index} ({context_data.get('name', 'unnamed')}): {str(e)}"

        # Execute in parallel with RateLimitedExecutor
        results = [None] * len(contexts)
        results[0] = first_result
        errors = []

        with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
            # Submit tasks for remaining items with their original indices starting at 1
            indexed_data = list(enumerate(remaining, start=1))
            future_to_index = {
                executor.submit_rate_limited(create_single_context, item): item[0] 
                for item in indexed_data
            }

            # Collect results as they complete
            for future in concurrent.futures.as_completed(future_to_index):
                index, result, error = future.result()
                if error:
                    errors.append(error)
                else:
                    results[index] = result

        # Check for any errors
        if errors:
            successful_count = sum(1 for r in results if r is not None)
            raise ValidationError(
                f"Bulk create partially failed: {successful_count}/{len(contexts)} succeeded. "
                f"Errors: {'; '.join(errors)}"
            )

        return results

    def _create_with_isolated_client(self, project_id: str, context_data: Dict[str, Any]) -> Context:
        """
        Create a context using an isolated HTTP client to avoid connection conflicts.

        Args:
            project_id: The project ID
            context_data: Context data dictionary

        Returns:
            Created Context instance
        """
        # Create isolated HTTP client with same configuration as main client
        timeout_config = httpx.Timeout(
            connect=30,      # Connection timeout
            read=180,        # Read timeout for long requests
            write=30,        # Write timeout
            pool=300         # Pool timeout
        )

        limits_config = httpx.Limits(
            max_connections=1,           # Single connection for isolation
            max_keepalive_connections=0, # No keep-alive to avoid state issues
            keepalive_expiry=0           # Immediate expiry
        )

        with httpx.Client(
            timeout=timeout_config,
            limits=limits_config,
            http2=False  # Use HTTP/1.1 for better compatibility
        ) as isolated_client:
            # Prepare context data
            data = ContextCreate(
                name=context_data["name"],
                content=context_data["content"],
                description=context_data.get("description"),
                is_always_displayed=context_data.get("is_always_displayed", False),
                **{k: v for k, v in context_data.items() if k not in ["name", "content", "description", "is_always_displayed"]}
            ).model_dump(exclude_none=True)

            # Build endpoint and headers
            endpoint = self._build_endpoint("projects", project_id, "contexts")
            url = self._client._build_url(endpoint)
            headers = self._client._get_default_headers()

            # Make isolated request
            response = isolated_client.post(url, json=data, headers=headers)
            response_data = self._client._handle_response(response)

            return self._create_model_instance(response_data, Context)

    def get_by_name(self, project_id: str, name: str) -> Optional[Context]:
        """
        Get a context by name within a project.

        Args:
            project_id: The project ID
            name: Context name to search for

        Returns:
            Context instance if found, None otherwise

        Example:
            >>> context = client.contexts.get_by_name("proj_123", "Business Rules")
            >>> if context:
            ...     print(f"Found context: {context.id}")
        """
        contexts = self.list(project_id=project_id, search=name)
        for context in contexts:
            if context.name == name:
                return context
        return None

    def list_always_displayed(self, project_id: str) -> List[Context]:
        """
        Get all contexts that are always displayed for a project.

        Args:
            project_id: The project ID

        Returns:
            List of Context instances with is_always_displayed=True

        Example:
            >>> always_contexts = client.contexts.list_always_displayed("proj_123")
            >>> print(f"Found {len(always_contexts)} always-displayed contexts")
        """
        all_contexts = self.list(project_id=project_id)
        return [ctx for ctx in all_contexts if ctx.is_always_displayed]

bulk_create(project_id, contexts, parallel=True, max_workers=None, max_concurrent=8, use_connection_isolation=True)

Create multiple contexts with optional parallel execution and rate limiting.

Parameters:

Name Type Description Default
project_id str

The project ID

required
contexts List[Dict[str, Any]]

List of context data dictionaries

required
parallel bool

Whether to execute requests in parallel (default: True)

True
max_workers Optional[int]

Maximum number of parallel workers (default: min(16, len(items)))

None
max_concurrent int

Maximum number of concurrent requests to prevent server overload (default: 8)

8
use_connection_isolation bool

Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

True

Returns:

Type Description
List[Context]

List of created Context instances in the same order as input

Raises:

Type Description
ValidationError

If any validation fails

Example

contexts_data = [ ... {"name": "Rule 1", "content": "Business rule 1"}, ... {"name": "Rule 2", "content": "Business rule 2"} ... ]

Parallel execution with rate limiting (default)

contexts = client.contexts.bulk_create("proj_123", contexts_data)

Sequential execution

contexts = client.contexts.bulk_create("proj_123", contexts_data, parallel=False)

Custom rate limiting

contexts = client.contexts.bulk_create("proj_123", contexts_data, max_concurrent=4)

Disable connection isolation for shared connection pool

contexts = client.contexts.bulk_create("proj_123", contexts_data, use_connection_isolation=False)

Source code in resources/contexts.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def bulk_create(
    self,
    project_id: str,
    contexts: List[Dict[str, Any]],
    parallel: bool = True,
    max_workers: Optional[int] = None,
    max_concurrent: int = 8,
    use_connection_isolation: bool = True
) -> List[Context]:
    """
    Create multiple contexts with optional parallel execution and rate limiting.

    Args:
        project_id: The project ID
        contexts: List of context data dictionaries
        parallel: Whether to execute requests in parallel (default: True)
        max_workers: Maximum number of parallel workers (default: min(16, len(items)))
        max_concurrent: Maximum number of concurrent requests to prevent server overload (default: 8)
        use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

    Returns:
        List of created Context instances in the same order as input

    Raises:
        ValidationError: If any validation fails

    Example:
        >>> contexts_data = [
        ...     {"name": "Rule 1", "content": "Business rule 1"},
        ...     {"name": "Rule 2", "content": "Business rule 2"}
        ... ]
        >>> # Parallel execution with rate limiting (default)
        >>> contexts = client.contexts.bulk_create("proj_123", contexts_data)
        >>> 
        >>> # Sequential execution
        >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, parallel=False)
        >>> 
        >>> # Custom rate limiting
        >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, max_concurrent=4)
        >>> 
        >>> # Disable connection isolation for shared connection pool
        >>> contexts = client.contexts.bulk_create("proj_123", contexts_data, use_connection_isolation=False)
    """
    if not contexts:
        return []

    # Pre-validate all contexts
    all_errors = []
    for i, context_data in enumerate(contexts):
        try:
            if not context_data.get("name", "").strip():
                all_errors.append(f"Item {i} ({context_data.get('name', 'unnamed')}): Name cannot be empty")
            if not context_data.get("content", "").strip():
                all_errors.append(f"Item {i} ({context_data.get('name', 'unnamed')}): Content cannot be empty")
        except Exception as e:
            all_errors.append(f"Item {i}: Invalid data structure - {str(e)}")

    if all_errors:
        raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

    if not parallel or len(contexts) == 1:
        # Sequential execution
        results = []
        for context_data in contexts:
            result = self.create(
                project_id=project_id,
                **context_data
            )
            results.append(result)
        return results

    # Create the first item sequentially to avoid race conditions when creating collections
    first_result = self.create(
        project_id=project_id,
        **contexts[0]
    )

    # Parallel execution for the remaining items
    remaining = contexts[1:]
    if max_workers is None:
        max_workers = min(16, len(remaining))

    def create_single_context(indexed_data):
        """Helper function to create a single context with error handling."""
        index, context_data = indexed_data
        try:
            if use_connection_isolation:
                # Create isolated HTTP client for this request to avoid connection conflicts
                return index, self._create_with_isolated_client(
                    project_id=project_id,
                    context_data=context_data
                ), None
            else:
                # Use shared connection pool
                return index, self.create(
                    project_id=project_id,
                    **context_data
                ), None
        except Exception as e:
            return index, None, f"Item {index} ({context_data.get('name', 'unnamed')}): {str(e)}"

    # Execute in parallel with RateLimitedExecutor
    results = [None] * len(contexts)
    results[0] = first_result
    errors = []

    with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
        # Submit tasks for remaining items with their original indices starting at 1
        indexed_data = list(enumerate(remaining, start=1))
        future_to_index = {
            executor.submit_rate_limited(create_single_context, item): item[0] 
            for item in indexed_data
        }

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_index):
            index, result, error = future.result()
            if error:
                errors.append(error)
            else:
                results[index] = result

    # Check for any errors
    if errors:
        successful_count = sum(1 for r in results if r is not None)
        raise ValidationError(
            f"Bulk create partially failed: {successful_count}/{len(contexts)} succeeded. "
            f"Errors: {'; '.join(errors)}"
        )

    return results

create(project_id, name, content, description=None, is_always_displayed=False, **kwargs)

Create a new context.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name str

Context name

required
content str

Context content (business rules, domain knowledge, etc.)

required
description Optional[str]

Context description

None
is_always_displayed bool

Whether this context should always be included

False
**kwargs

Additional context fields

{}

Returns:

Type Description
Context

Created Context instance

Example

context = client.contexts.create( ... project_id="proj_123", ... name="Business Rules", ... content="Active customers have status = 'active'", ... is_always_displayed=True ... )

Source code in resources/contexts.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def create(
    self,
    project_id: str,
    name: str,
    content: str,
    description: Optional[str] = None,
    is_always_displayed: bool = False,
    **kwargs
) -> Context:
    """
    Create a new context.

    Args:
        project_id: The project ID
        name: Context name
        content: Context content (business rules, domain knowledge, etc.)
        description: Context description
        is_always_displayed: Whether this context should always be included
        **kwargs: Additional context fields

    Returns:
        Created Context instance

    Example:
        >>> context = client.contexts.create(
        ...     project_id="proj_123",
        ...     name="Business Rules",
        ...     content="Active customers have status = 'active'",
        ...     is_always_displayed=True
        ... )
    """
    data = ContextCreate(
        name=name,
        content=content,
        description=description,
        is_always_displayed=is_always_displayed,
        **kwargs
    ).model_dump(exclude_none=True)

    endpoint = self._build_endpoint("projects", project_id, "contexts")
    response = self._client.post(endpoint, data=data)
    return self._create_model_instance(response, Context)

delete(project_id, context_id)

Delete a context.

Parameters:

Name Type Description Default
project_id str

The project ID

required
context_id str

The context ID

required

Returns:

Type Description
Dict[str, Any]

Deletion confirmation response

Example

result = client.contexts.delete("proj_123", "ctx_456") print(result["message"])

Source code in resources/contexts.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def delete(self, project_id: str, context_id: str) -> Dict[str, Any]:
    """
    Delete a context.

    Args:
        project_id: The project ID
        context_id: The context ID

    Returns:
        Deletion confirmation response

    Example:
        >>> result = client.contexts.delete("proj_123", "ctx_456")
        >>> print(result["message"])
    """
    endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
    return self._client.delete(endpoint)

get(project_id, context_id)

Get a specific context by ID.

Parameters:

Name Type Description Default
project_id str

The project ID

required
context_id str

The context ID

required

Returns:

Type Description
Context

Context instance

Raises:

Type Description
NotFoundError

If context doesn't exist

Example

context = client.contexts.get("proj_123", "ctx_456") print(context.content)

Source code in resources/contexts.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def get(self, project_id: str, context_id: str) -> Context:
    """
    Get a specific context by ID.

    Args:
        project_id: The project ID
        context_id: The context ID

    Returns:
        Context instance

    Raises:
        NotFoundError: If context doesn't exist

    Example:
        >>> context = client.contexts.get("proj_123", "ctx_456")
        >>> print(context.content)
    """
    endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
    response = self._client.get(endpoint)
    return self._create_model_instance(response, Context)

get_by_name(project_id, name)

Get a context by name within a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name str

Context name to search for

required

Returns:

Type Description
Optional[Context]

Context instance if found, None otherwise

Example

context = client.contexts.get_by_name("proj_123", "Business Rules") if context: ... print(f"Found context: {context.id}")

Source code in resources/contexts.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def get_by_name(self, project_id: str, name: str) -> Optional[Context]:
    """
    Get a context by name within a project.

    Args:
        project_id: The project ID
        name: Context name to search for

    Returns:
        Context instance if found, None otherwise

    Example:
        >>> context = client.contexts.get_by_name("proj_123", "Business Rules")
        >>> if context:
        ...     print(f"Found context: {context.id}")
    """
    contexts = self.list(project_id=project_id, search=name)
    for context in contexts:
        if context.name == name:
            return context
    return None

list(project_id, page=1, per_page=50, search=None)

List contexts for a specific project.

Parameters:

Name Type Description Default
project_id str

The project ID to list contexts for

required
page int

Page number (default: 1)

1
per_page int

Items per page (default: 50)

50
search Optional[str]

Search term to filter contexts by name

None

Returns:

Type Description
List[Context]

List of Context instances

Example

contexts = client.contexts.list(project_id="proj_123") for context in contexts: ... print(f"{context.name}: {context.is_always_displayed}")

Source code in resources/contexts.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def list(
    self,
    project_id: str,
    page: int = 1,
    per_page: int = 50,
    search: Optional[str] = None
) -> List[Context]:
    """
    List contexts for a specific project.

    Args:
        project_id: The project ID to list contexts for
        page: Page number (default: 1)
        per_page: Items per page (default: 50)
        search: Search term to filter contexts by name

    Returns:
        List of Context instances

    Example:
        >>> contexts = client.contexts.list(project_id="proj_123")
        >>> for context in contexts:
        ...     print(f"{context.name}: {context.is_always_displayed}")
    """
    params = {
        'page': page,
        'per_page': per_page
    }
    if search:
        params['search'] = search

    endpoint = self._build_endpoint("projects", project_id, "contexts")
    return self._paginate(endpoint, params=params, model_class=Context)

list_always_displayed(project_id)

Get all contexts that are always displayed for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
List[Context]

List of Context instances with is_always_displayed=True

Example

always_contexts = client.contexts.list_always_displayed("proj_123") print(f"Found {len(always_contexts)} always-displayed contexts")

Source code in resources/contexts.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def list_always_displayed(self, project_id: str) -> List[Context]:
    """
    Get all contexts that are always displayed for a project.

    Args:
        project_id: The project ID

    Returns:
        List of Context instances with is_always_displayed=True

    Example:
        >>> always_contexts = client.contexts.list_always_displayed("proj_123")
        >>> print(f"Found {len(always_contexts)} always-displayed contexts")
    """
    all_contexts = self.list(project_id=project_id)
    return [ctx for ctx in all_contexts if ctx.is_always_displayed]

update(project_id, context_id, name=None, content=None, description=None, is_always_displayed=None, **kwargs)

Update an existing context.

Parameters:

Name Type Description Default
project_id str

The project ID

required
context_id str

The context ID

required
name Optional[str]

New context name

None
content Optional[str]

New context content

None
description Optional[str]

New context description

None
is_always_displayed Optional[bool]

Whether this context should always be included

None
**kwargs

Additional fields to update

{}

Returns:

Type Description
Context

Updated Context instance

Example

context = client.contexts.update( ... "proj_123", "ctx_456", ... content="Updated business rules..." ... )

Source code in resources/contexts.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def update(
    self,
    project_id: str,
    context_id: str,
    name: Optional[str] = None,
    content: Optional[str] = None,
    description: Optional[str] = None,
    is_always_displayed: Optional[bool] = None,
    **kwargs
) -> Context:
    """
    Update an existing context.

    Args:
        project_id: The project ID
        context_id: The context ID
        name: New context name
        content: New context content
        description: New context description
        is_always_displayed: Whether this context should always be included
        **kwargs: Additional fields to update

    Returns:
        Updated Context instance

    Example:
        >>> context = client.contexts.update(
        ...     "proj_123", "ctx_456",
        ...     content="Updated business rules..."
        ... )
    """
    # Get current context data first since API expects complete data
    current_context = self.get(project_id, context_id)

    # Use current values as defaults, override with provided values
    update_data = ContextCreate(
        name=name if name is not None else current_context.name,
        content=content if content is not None else current_context.content,
        description=description if description is not None else current_context.description,
        is_always_displayed=is_always_displayed if is_always_displayed is not None else current_context.is_always_displayed,
        **kwargs
    ).model_dump(exclude_none=True)

    endpoint = self._build_endpoint("projects", project_id, "contexts", context_id)
    response = self._client.put(endpoint, data=update_data)
    return self._create_model_instance(response, Context)

text2everything_sdk.resources.schema_metadata.SchemaMetadataResource

Bases: BaseResource

Resource for managing schema metadata with nested field validation.

Source code in resources/schema_metadata.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
class SchemaMetadataResource(BaseResource):
    """Resource for managing schema metadata with nested field validation."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(
        self,
        project_id: str,
        name: str,
        schema_data: Dict[str, Any],
        description: Optional[str] = None,
        is_always_displayed: bool = False,
        validate: bool = True,
        **kwargs
    ) -> SchemaMetadataResponse:
        """Create new schema metadata with validation.

        Args:
            project_id: The project ID to create schema metadata in
            name: Schema metadata name
            schema_data: The schema data structure
            description: Optional description
            is_always_displayed: Whether this schema should always be displayed
            validate: Whether to perform nested field validation (default: True)
            **kwargs: Additional schema metadata fields

        Returns:
            The created schema metadata with response details

        Raises:
            ValidationError: If validation fails and validate=True

        Example:
            ```python
            # Create a table schema
            result = client.schema_metadata.create(
                project_id=project_id,
                name="users_table",
                description="User information table",
                schema_data={
                    "table": {
                        "name": "users",
                        "columns": [
                            {"name": "id", "type": "integer"},
                            {"name": "email", "type": "string"}
                        ]
                    }
                },
                is_always_displayed=True
            )
            ```
        """
        # Build the SchemaMetadataCreate object internally
        schema_metadata = SchemaMetadataCreate(
            name=name,
            schema_data=schema_data,
            description=description,
            is_always_displayed=is_always_displayed,
            **kwargs
        )

        if validate:
            validation_errors = validate_schema_metadata_create(schema_metadata)
            if validation_errors:
                raise ValidationError(f"Schema metadata validation failed: {'; '.join(validation_errors)}")

        response = self._client.post(
            f"/projects/{project_id}/schema-metadata",
            data=schema_metadata.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return SchemaMetadataResponse(**response_data)

    def get(self, project_id: str, schema_metadata_id: str) -> SchemaMetadataResponse:
        """Get schema metadata by ID.

        Args:
            project_id: The project ID
            schema_metadata_id: The schema metadata ID

        Returns:
            The schema metadata details

        Example:
            ```python
            schema_metadata = client.schema_metadata.get(project_id, schema_metadata_id)
            print(f"Schema type: {detect_schema_type(schema_metadata.schema_data)}")
            ```
        """
        response = self._client.get(f"/projects/{project_id}/schema-metadata/{schema_metadata_id}")
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return SchemaMetadataResponse(**response_data)

    def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[SchemaMetadataResponse]:
        """List schema metadata for a project.

        Args:
            project_id: The project ID
            limit: Maximum number of items to return
            offset: Number of items to skip

        Returns:
            List of schema metadata

        Example:
            ```python
            all_schemas = client.schema_metadata.list(project_id)
            tables = [s for s in all_schemas if detect_schema_type(s.schema_data) == "table"]
            ```
        """
        endpoint = f"/projects/{project_id}/schema-metadata"
        params = {"limit": limit, "offset": offset}
        return self._paginate(endpoint, params=params, model_class=SchemaMetadataResponse)

    def update(
        self,
        project_id: str,
        schema_metadata_id: str,
        name: Optional[str] = None,
        schema_data: Optional[Dict[str, Any]] = None,
        description: Optional[str] = None,
        is_always_displayed: Optional[bool] = None,
        validate: bool = True,
        **kwargs
    ) -> SchemaMetadataResponse:
        """Update schema metadata with validation.

        Args:
            project_id: The project ID
            schema_metadata_id: The schema metadata ID to update
            name: New schema metadata name
            schema_data: New schema data structure
            description: New description
            is_always_displayed: New always displayed setting
            validate: Whether to perform nested field validation (default: True)
            **kwargs: Additional fields to update

        Returns:
            The updated schema metadata

        Raises:
            ValidationError: If validation fails and validate=True

        Example:
            ```python
            # Update a dimension schema
            result = client.schema_metadata.update(
                project_id=project_id,
                schema_metadata_id=schema_id,
                description="Updated dimension description",
                schema_data={
                    "table": {
                        "dimension": {
                            "name": "updated_dimension",
                            "content": {"type": "categorical", "values": ["A", "B", "C"]}
                        }
                    }
                }
            )
            ```
        """
        # Get current schema metadata first since API expects complete data
        current_schema = self.get(project_id, schema_metadata_id)

        # Use current values as defaults, override with provided values
        update_data = SchemaMetadataCreate(
            name=name if name is not None else current_schema.name,
            description=description if description is not None else current_schema.description,
            schema_data=schema_data if schema_data is not None else current_schema.schema_data,
            is_always_displayed=is_always_displayed if is_always_displayed is not None else current_schema.is_always_displayed,
            **kwargs
        )

        if validate:
            validation_errors = validate_schema_metadata_create(update_data)
            if validation_errors:
                raise ValidationError(f"Schema metadata validation failed: {'; '.join(validation_errors)}")

        response = self._client.put(
            f"/projects/{project_id}/schema-metadata/{schema_metadata_id}",
            data=update_data.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return SchemaMetadataResponse(**response_data)

    def delete(self, project_id: str, schema_metadata_id: str) -> bool:
        """Delete schema metadata.

        Args:
            project_id: The project ID
            schema_metadata_id: The schema metadata ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.schema_metadata.delete(project_id, schema_metadata_id)
            ```
        """
        self._client.delete(f"/projects/{project_id}/schema-metadata/{schema_metadata_id}")
        return True

    def list_by_type(self, project_id: str, schema_type: str) -> List[SchemaMetadataResponse]:
        """List schema metadata filtered by type.

        Args:
            project_id: The project ID
            schema_type: The schema type to filter by ('table', 'dimension', 'metric', 'relationship')

        Returns:
            List of schema metadata of the specified type

        Example:
            ```python
            # Get all table schemas
            tables = client.schema_metadata.list_by_type(project_id, "table")

            # Get all metrics
            metrics = client.schema_metadata.list_by_type(project_id, "metric")
            ```
        """
        all_schemas = self.list(project_id)
        return [
            schema for schema in all_schemas 
            if detect_schema_type(schema.schema_data) == schema_type
        ]

    def validate_schema(self, schema_data: Dict[str, Any], expected_type: Optional[str] = None) -> List[str]:
        """Validate schema data structure with nested field checks.

        Args:
            schema_data: The schema data to validate
            expected_type: Optional expected schema type

        Returns:
            List of validation error messages (empty if valid)

        Example:
            ```python
            # Validate a table schema
            table_data = {
                "table": {
                    "name": "users",
                    "columns": [{"name": "id", "type": "integer"}]
                }
            }
            errors = client.schema_metadata.validate_schema(table_data, "table")
            if not errors:
                print("Schema is valid!")
            ```
        """
        return validate_schema_metadata({"name": "validation_test", "schema_data": schema_data}, expected_type)

    def get_schema_type(self, schema_data: Dict[str, Any]) -> Optional[str]:
        """Detect the schema type from schema data structure.

        Args:
            schema_data: The schema data to analyze

        Returns:
            The detected schema type or None if unable to detect

        Example:
            ```python
            schema_type = client.schema_metadata.get_schema_type(schema_data)
            print(f"Detected schema type: {schema_type}")
            ```
        """
        return detect_schema_type(schema_data)

    def bulk_create(
        self, 
        project_id: str, 
        schema_metadata_list: List[Dict[str, Any]], 
        validate: bool = True,
        parallel: bool = True,
        max_workers: Optional[int] = None,
        max_concurrent: int = 8,
        use_connection_isolation: bool = True
    ) -> List[SchemaMetadataResponse]:
        """Create multiple schema metadata items with validation and optional parallel execution.

        Args:
            project_id: The project ID
            schema_metadata_list: List of schema metadata dictionaries to create
            validate: Whether to perform nested field validation (default: True)
            parallel: Whether to execute requests in parallel (default: True)
            max_workers: Maximum number of parallel workers (default: min(16, len(items)))
            max_concurrent: Maximum number of concurrent requests (default: 8, rate limiting)
            use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

        Returns:
            List of created schema metadata in the same order as input

        Raises:
            ValidationError: If any validation fails and validate=True, or if any creation fails

        Example:
            ```python
            schemas = [
                {
                    "name": "table1", 
                    "schema_data": {"table": {"columns": []}},
                    "description": "First table"
                },
                {
                    "name": "dim1", 
                    "schema_data": {"table": {"dimension": {"content": {}}}},
                    "is_always_displayed": True
                }
            ]
            # Parallel execution (default)
            results = client.schema_metadata.bulk_create(project_id, schemas)

            # Sequential execution
            results = client.schema_metadata.bulk_create(project_id, schemas, parallel=False)

            # Disable connection isolation for shared connection pool
            results = client.schema_metadata.bulk_create(project_id, schemas, use_connection_isolation=False)
            ```
        """
        if not schema_metadata_list:
            return []

        # Pre-validate all items if validation is enabled
        if validate:
            all_errors = []
            for i, schema_data in enumerate(schema_metadata_list):
                try:
                    # Create temporary object for validation
                    temp_schema = SchemaMetadataCreate(**schema_data)
                    validation_errors = validate_schema_metadata_create(temp_schema)
                    if validation_errors:
                        all_errors.append(f"Item {i} ({schema_data.get('name', 'unnamed')}): {'; '.join(validation_errors)}")
                except Exception as e:
                    all_errors.append(f"Item {i} ({schema_data.get('name', 'unnamed')}): Invalid data structure - {str(e)}")

            if all_errors:
                raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

        if not parallel or len(schema_metadata_list) == 1:
            # Sequential execution
            results = []
            for schema_data in schema_metadata_list:
                result = self.create(
                    project_id=project_id,
                    validate=False,  # Already validated
                    **schema_data
                )
                results.append(result)
            return results

        # Create the first item sequentially to avoid race conditions when creating collections
        first_result = self.create(
            project_id=project_id,
            validate=False,
            **schema_metadata_list[0]
        )

        # Parallel execution for remaining items with rate limiting
        remaining = schema_metadata_list[1:]
        if max_workers is None:
            max_workers = min(16, len(remaining))

        def create_single_schema(indexed_data):
            """Helper function to create a single schema with error handling."""
            index, schema_data = indexed_data
            try:
                if use_connection_isolation:
                    # Create isolated HTTP client for this request to avoid connection conflicts
                    return index, self._create_with_isolated_client(
                        project_id=project_id,
                        schema_data=schema_data
                    ), None
                else:
                    # Use shared connection pool
                    return index, self.create(
                        project_id=project_id,
                        validate=False,  # Already validated
                        **schema_data
                    ), None
            except Exception as e:
                return index, None, f"Item {index} ({schema_data.get('name', 'unnamed')}): {str(e)}"

        # Execute in parallel with rate-limited executor
        results = [None] * len(schema_metadata_list)
        results[0] = first_result
        errors = []

        with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
            # Submit tasks for remaining items with their original indices starting at 1
            indexed_data = list(enumerate(remaining, start=1))
            future_to_index = {
                executor.submit_rate_limited(create_single_schema, item): item[0] 
                for item in indexed_data
            }

            # Collect results as they complete
            for future in concurrent.futures.as_completed(future_to_index):
                index, result, error = future.result()
                if error:
                    errors.append(error)
                else:
                    results[index] = result

        # Check for any errors
        if errors:
            successful_count = sum(1 for r in results if r is not None)
            raise ValidationError(
                f"Bulk create partially failed: {successful_count}/{len(schema_metadata_list)} succeeded. "
                f"Errors: {'; '.join(errors)}"
            )

        return results

    def _create_with_isolated_client(self, project_id: str, schema_data: Dict[str, Any]) -> SchemaMetadataResponse:
        """
        Create schema metadata using an isolated HTTP client to avoid connection conflicts.

        Args:
            project_id: The project ID
            schema_data: Schema metadata data dictionary

        Returns:
            Created SchemaMetadataResponse instance
        """
        # Create isolated HTTP client with same configuration as main client
        timeout_config = httpx.Timeout(
            connect=30,      # Connection timeout
            read=180,        # Read timeout for long requests
            write=30,        # Write timeout
            pool=300         # Pool timeout
        )

        limits_config = httpx.Limits(
            max_connections=1,           # Single connection for isolation
            max_keepalive_connections=0, # No keep-alive to avoid state issues
            keepalive_expiry=0           # Immediate expiry
        )

        with httpx.Client(
            timeout=timeout_config,
            limits=limits_config,
            http2=False  # Use HTTP/1.1 for better compatibility
        ) as isolated_client:
            # Prepare schema metadata
            schema_metadata = SchemaMetadataCreate(
                name=schema_data["name"],
                schema_data=schema_data["schema_data"],
                description=schema_data.get("description"),
                is_always_displayed=schema_data.get("is_always_displayed", False),
                **{k: v for k, v in schema_data.items() if k not in ["name", "schema_data", "description", "is_always_displayed"]}
            )

            # Build endpoint and headers
            url = self._client._build_url(f"/projects/{project_id}/schema-metadata")
            headers = self._client._get_default_headers()

            # Make isolated request
            response = isolated_client.post(url, json=schema_metadata.model_dump(), headers=headers)
            response_data = self._client._handle_response(response)

            # Handle both list and single object responses
            if isinstance(response_data, list):
                if response_data:
                    response_data = response_data[0]  # Take first item from list
                else:
                    raise ValidationError("API returned empty list")

            return SchemaMetadataResponse(**response_data)

    def list_always_displayed(self, project_id: str) -> List[SchemaMetadataResponse]:
        """List schema metadata that are marked as always displayed.

        Args:
            project_id: The project ID

        Returns:
            List of schema metadata marked as always displayed

        Example:
            ```python
            important_schemas = client.schema_metadata.list_always_displayed(project_id)
            ```
        """
        all_schemas = self.list(project_id)
        return [schema for schema in all_schemas if schema.is_always_displayed]

bulk_create(project_id, schema_metadata_list, validate=True, parallel=True, max_workers=None, max_concurrent=8, use_connection_isolation=True)

Create multiple schema metadata items with validation and optional parallel execution.

Parameters:

Name Type Description Default
project_id str

The project ID

required
schema_metadata_list List[Dict[str, Any]]

List of schema metadata dictionaries to create

required
validate bool

Whether to perform nested field validation (default: True)

True
parallel bool

Whether to execute requests in parallel (default: True)

True
max_workers Optional[int]

Maximum number of parallel workers (default: min(16, len(items)))

None
max_concurrent int

Maximum number of concurrent requests (default: 8, rate limiting)

8
use_connection_isolation bool

Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

True

Returns:

Type Description
List[SchemaMetadataResponse]

List of created schema metadata in the same order as input

Raises:

Type Description
ValidationError

If any validation fails and validate=True, or if any creation fails

Example
schemas = [
    {
        "name": "table1", 
        "schema_data": {"table": {"columns": []}},
        "description": "First table"
    },
    {
        "name": "dim1", 
        "schema_data": {"table": {"dimension": {"content": {}}}},
        "is_always_displayed": True
    }
]
# Parallel execution (default)
results = client.schema_metadata.bulk_create(project_id, schemas)

# Sequential execution
results = client.schema_metadata.bulk_create(project_id, schemas, parallel=False)

# Disable connection isolation for shared connection pool
results = client.schema_metadata.bulk_create(project_id, schemas, use_connection_isolation=False)
Source code in resources/schema_metadata.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def bulk_create(
    self, 
    project_id: str, 
    schema_metadata_list: List[Dict[str, Any]], 
    validate: bool = True,
    parallel: bool = True,
    max_workers: Optional[int] = None,
    max_concurrent: int = 8,
    use_connection_isolation: bool = True
) -> List[SchemaMetadataResponse]:
    """Create multiple schema metadata items with validation and optional parallel execution.

    Args:
        project_id: The project ID
        schema_metadata_list: List of schema metadata dictionaries to create
        validate: Whether to perform nested field validation (default: True)
        parallel: Whether to execute requests in parallel (default: True)
        max_workers: Maximum number of parallel workers (default: min(16, len(items)))
        max_concurrent: Maximum number of concurrent requests (default: 8, rate limiting)
        use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

    Returns:
        List of created schema metadata in the same order as input

    Raises:
        ValidationError: If any validation fails and validate=True, or if any creation fails

    Example:
        ```python
        schemas = [
            {
                "name": "table1", 
                "schema_data": {"table": {"columns": []}},
                "description": "First table"
            },
            {
                "name": "dim1", 
                "schema_data": {"table": {"dimension": {"content": {}}}},
                "is_always_displayed": True
            }
        ]
        # Parallel execution (default)
        results = client.schema_metadata.bulk_create(project_id, schemas)

        # Sequential execution
        results = client.schema_metadata.bulk_create(project_id, schemas, parallel=False)

        # Disable connection isolation for shared connection pool
        results = client.schema_metadata.bulk_create(project_id, schemas, use_connection_isolation=False)
        ```
    """
    if not schema_metadata_list:
        return []

    # Pre-validate all items if validation is enabled
    if validate:
        all_errors = []
        for i, schema_data in enumerate(schema_metadata_list):
            try:
                # Create temporary object for validation
                temp_schema = SchemaMetadataCreate(**schema_data)
                validation_errors = validate_schema_metadata_create(temp_schema)
                if validation_errors:
                    all_errors.append(f"Item {i} ({schema_data.get('name', 'unnamed')}): {'; '.join(validation_errors)}")
            except Exception as e:
                all_errors.append(f"Item {i} ({schema_data.get('name', 'unnamed')}): Invalid data structure - {str(e)}")

        if all_errors:
            raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

    if not parallel or len(schema_metadata_list) == 1:
        # Sequential execution
        results = []
        for schema_data in schema_metadata_list:
            result = self.create(
                project_id=project_id,
                validate=False,  # Already validated
                **schema_data
            )
            results.append(result)
        return results

    # Create the first item sequentially to avoid race conditions when creating collections
    first_result = self.create(
        project_id=project_id,
        validate=False,
        **schema_metadata_list[0]
    )

    # Parallel execution for remaining items with rate limiting
    remaining = schema_metadata_list[1:]
    if max_workers is None:
        max_workers = min(16, len(remaining))

    def create_single_schema(indexed_data):
        """Helper function to create a single schema with error handling."""
        index, schema_data = indexed_data
        try:
            if use_connection_isolation:
                # Create isolated HTTP client for this request to avoid connection conflicts
                return index, self._create_with_isolated_client(
                    project_id=project_id,
                    schema_data=schema_data
                ), None
            else:
                # Use shared connection pool
                return index, self.create(
                    project_id=project_id,
                    validate=False,  # Already validated
                    **schema_data
                ), None
        except Exception as e:
            return index, None, f"Item {index} ({schema_data.get('name', 'unnamed')}): {str(e)}"

    # Execute in parallel with rate-limited executor
    results = [None] * len(schema_metadata_list)
    results[0] = first_result
    errors = []

    with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
        # Submit tasks for remaining items with their original indices starting at 1
        indexed_data = list(enumerate(remaining, start=1))
        future_to_index = {
            executor.submit_rate_limited(create_single_schema, item): item[0] 
            for item in indexed_data
        }

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_index):
            index, result, error = future.result()
            if error:
                errors.append(error)
            else:
                results[index] = result

    # Check for any errors
    if errors:
        successful_count = sum(1 for r in results if r is not None)
        raise ValidationError(
            f"Bulk create partially failed: {successful_count}/{len(schema_metadata_list)} succeeded. "
            f"Errors: {'; '.join(errors)}"
        )

    return results

create(project_id, name, schema_data, description=None, is_always_displayed=False, validate=True, **kwargs)

Create new schema metadata with validation.

Parameters:

Name Type Description Default
project_id str

The project ID to create schema metadata in

required
name str

Schema metadata name

required
schema_data Dict[str, Any]

The schema data structure

required
description Optional[str]

Optional description

None
is_always_displayed bool

Whether this schema should always be displayed

False
validate bool

Whether to perform nested field validation (default: True)

True
**kwargs

Additional schema metadata fields

{}

Returns:

Type Description
SchemaMetadataResponse

The created schema metadata with response details

Raises:

Type Description
ValidationError

If validation fails and validate=True

Example
# Create a table schema
result = client.schema_metadata.create(
    project_id=project_id,
    name="users_table",
    description="User information table",
    schema_data={
        "table": {
            "name": "users",
            "columns": [
                {"name": "id", "type": "integer"},
                {"name": "email", "type": "string"}
            ]
        }
    },
    is_always_displayed=True
)
Source code in resources/schema_metadata.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def create(
    self,
    project_id: str,
    name: str,
    schema_data: Dict[str, Any],
    description: Optional[str] = None,
    is_always_displayed: bool = False,
    validate: bool = True,
    **kwargs
) -> SchemaMetadataResponse:
    """Create new schema metadata with validation.

    Args:
        project_id: The project ID to create schema metadata in
        name: Schema metadata name
        schema_data: The schema data structure
        description: Optional description
        is_always_displayed: Whether this schema should always be displayed
        validate: Whether to perform nested field validation (default: True)
        **kwargs: Additional schema metadata fields

    Returns:
        The created schema metadata with response details

    Raises:
        ValidationError: If validation fails and validate=True

    Example:
        ```python
        # Create a table schema
        result = client.schema_metadata.create(
            project_id=project_id,
            name="users_table",
            description="User information table",
            schema_data={
                "table": {
                    "name": "users",
                    "columns": [
                        {"name": "id", "type": "integer"},
                        {"name": "email", "type": "string"}
                    ]
                }
            },
            is_always_displayed=True
        )
        ```
    """
    # Build the SchemaMetadataCreate object internally
    schema_metadata = SchemaMetadataCreate(
        name=name,
        schema_data=schema_data,
        description=description,
        is_always_displayed=is_always_displayed,
        **kwargs
    )

    if validate:
        validation_errors = validate_schema_metadata_create(schema_metadata)
        if validation_errors:
            raise ValidationError(f"Schema metadata validation failed: {'; '.join(validation_errors)}")

    response = self._client.post(
        f"/projects/{project_id}/schema-metadata",
        data=schema_metadata.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return SchemaMetadataResponse(**response_data)

delete(project_id, schema_metadata_id)

Delete schema metadata.

Parameters:

Name Type Description Default
project_id str

The project ID

required
schema_metadata_id str

The schema metadata ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.schema_metadata.delete(project_id, schema_metadata_id)
Source code in resources/schema_metadata.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def delete(self, project_id: str, schema_metadata_id: str) -> bool:
    """Delete schema metadata.

    Args:
        project_id: The project ID
        schema_metadata_id: The schema metadata ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.schema_metadata.delete(project_id, schema_metadata_id)
        ```
    """
    self._client.delete(f"/projects/{project_id}/schema-metadata/{schema_metadata_id}")
    return True

get(project_id, schema_metadata_id)

Get schema metadata by ID.

Parameters:

Name Type Description Default
project_id str

The project ID

required
schema_metadata_id str

The schema metadata ID

required

Returns:

Type Description
SchemaMetadataResponse

The schema metadata details

Example
schema_metadata = client.schema_metadata.get(project_id, schema_metadata_id)
print(f"Schema type: {detect_schema_type(schema_metadata.schema_data)}")
Source code in resources/schema_metadata.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def get(self, project_id: str, schema_metadata_id: str) -> SchemaMetadataResponse:
    """Get schema metadata by ID.

    Args:
        project_id: The project ID
        schema_metadata_id: The schema metadata ID

    Returns:
        The schema metadata details

    Example:
        ```python
        schema_metadata = client.schema_metadata.get(project_id, schema_metadata_id)
        print(f"Schema type: {detect_schema_type(schema_metadata.schema_data)}")
        ```
    """
    response = self._client.get(f"/projects/{project_id}/schema-metadata/{schema_metadata_id}")
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return SchemaMetadataResponse(**response_data)

get_schema_type(schema_data)

Detect the schema type from schema data structure.

Parameters:

Name Type Description Default
schema_data Dict[str, Any]

The schema data to analyze

required

Returns:

Type Description
Optional[str]

The detected schema type or None if unable to detect

Example
schema_type = client.schema_metadata.get_schema_type(schema_data)
print(f"Detected schema type: {schema_type}")
Source code in resources/schema_metadata.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def get_schema_type(self, schema_data: Dict[str, Any]) -> Optional[str]:
    """Detect the schema type from schema data structure.

    Args:
        schema_data: The schema data to analyze

    Returns:
        The detected schema type or None if unable to detect

    Example:
        ```python
        schema_type = client.schema_metadata.get_schema_type(schema_data)
        print(f"Detected schema type: {schema_type}")
        ```
    """
    return detect_schema_type(schema_data)

list(project_id, limit=100, offset=0)

List schema metadata for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
limit int

Maximum number of items to return

100
offset int

Number of items to skip

0

Returns:

Type Description
List[SchemaMetadataResponse]

List of schema metadata

Example
all_schemas = client.schema_metadata.list(project_id)
tables = [s for s in all_schemas if detect_schema_type(s.schema_data) == "table"]
Source code in resources/schema_metadata.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[SchemaMetadataResponse]:
    """List schema metadata for a project.

    Args:
        project_id: The project ID
        limit: Maximum number of items to return
        offset: Number of items to skip

    Returns:
        List of schema metadata

    Example:
        ```python
        all_schemas = client.schema_metadata.list(project_id)
        tables = [s for s in all_schemas if detect_schema_type(s.schema_data) == "table"]
        ```
    """
    endpoint = f"/projects/{project_id}/schema-metadata"
    params = {"limit": limit, "offset": offset}
    return self._paginate(endpoint, params=params, model_class=SchemaMetadataResponse)

list_always_displayed(project_id)

List schema metadata that are marked as always displayed.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
List[SchemaMetadataResponse]

List of schema metadata marked as always displayed

Example
important_schemas = client.schema_metadata.list_always_displayed(project_id)
Source code in resources/schema_metadata.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def list_always_displayed(self, project_id: str) -> List[SchemaMetadataResponse]:
    """List schema metadata that are marked as always displayed.

    Args:
        project_id: The project ID

    Returns:
        List of schema metadata marked as always displayed

    Example:
        ```python
        important_schemas = client.schema_metadata.list_always_displayed(project_id)
        ```
    """
    all_schemas = self.list(project_id)
    return [schema for schema in all_schemas if schema.is_always_displayed]

list_by_type(project_id, schema_type)

List schema metadata filtered by type.

Parameters:

Name Type Description Default
project_id str

The project ID

required
schema_type str

The schema type to filter by ('table', 'dimension', 'metric', 'relationship')

required

Returns:

Type Description
List[SchemaMetadataResponse]

List of schema metadata of the specified type

Example
# Get all table schemas
tables = client.schema_metadata.list_by_type(project_id, "table")

# Get all metrics
metrics = client.schema_metadata.list_by_type(project_id, "metric")
Source code in resources/schema_metadata.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def list_by_type(self, project_id: str, schema_type: str) -> List[SchemaMetadataResponse]:
    """List schema metadata filtered by type.

    Args:
        project_id: The project ID
        schema_type: The schema type to filter by ('table', 'dimension', 'metric', 'relationship')

    Returns:
        List of schema metadata of the specified type

    Example:
        ```python
        # Get all table schemas
        tables = client.schema_metadata.list_by_type(project_id, "table")

        # Get all metrics
        metrics = client.schema_metadata.list_by_type(project_id, "metric")
        ```
    """
    all_schemas = self.list(project_id)
    return [
        schema for schema in all_schemas 
        if detect_schema_type(schema.schema_data) == schema_type
    ]

update(project_id, schema_metadata_id, name=None, schema_data=None, description=None, is_always_displayed=None, validate=True, **kwargs)

Update schema metadata with validation.

Parameters:

Name Type Description Default
project_id str

The project ID

required
schema_metadata_id str

The schema metadata ID to update

required
name Optional[str]

New schema metadata name

None
schema_data Optional[Dict[str, Any]]

New schema data structure

None
description Optional[str]

New description

None
is_always_displayed Optional[bool]

New always displayed setting

None
validate bool

Whether to perform nested field validation (default: True)

True
**kwargs

Additional fields to update

{}

Returns:

Type Description
SchemaMetadataResponse

The updated schema metadata

Raises:

Type Description
ValidationError

If validation fails and validate=True

Example
# Update a dimension schema
result = client.schema_metadata.update(
    project_id=project_id,
    schema_metadata_id=schema_id,
    description="Updated dimension description",
    schema_data={
        "table": {
            "dimension": {
                "name": "updated_dimension",
                "content": {"type": "categorical", "values": ["A", "B", "C"]}
            }
        }
    }
)
Source code in resources/schema_metadata.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    project_id: str,
    schema_metadata_id: str,
    name: Optional[str] = None,
    schema_data: Optional[Dict[str, Any]] = None,
    description: Optional[str] = None,
    is_always_displayed: Optional[bool] = None,
    validate: bool = True,
    **kwargs
) -> SchemaMetadataResponse:
    """Update schema metadata with validation.

    Args:
        project_id: The project ID
        schema_metadata_id: The schema metadata ID to update
        name: New schema metadata name
        schema_data: New schema data structure
        description: New description
        is_always_displayed: New always displayed setting
        validate: Whether to perform nested field validation (default: True)
        **kwargs: Additional fields to update

    Returns:
        The updated schema metadata

    Raises:
        ValidationError: If validation fails and validate=True

    Example:
        ```python
        # Update a dimension schema
        result = client.schema_metadata.update(
            project_id=project_id,
            schema_metadata_id=schema_id,
            description="Updated dimension description",
            schema_data={
                "table": {
                    "dimension": {
                        "name": "updated_dimension",
                        "content": {"type": "categorical", "values": ["A", "B", "C"]}
                    }
                }
            }
        )
        ```
    """
    # Get current schema metadata first since API expects complete data
    current_schema = self.get(project_id, schema_metadata_id)

    # Use current values as defaults, override with provided values
    update_data = SchemaMetadataCreate(
        name=name if name is not None else current_schema.name,
        description=description if description is not None else current_schema.description,
        schema_data=schema_data if schema_data is not None else current_schema.schema_data,
        is_always_displayed=is_always_displayed if is_always_displayed is not None else current_schema.is_always_displayed,
        **kwargs
    )

    if validate:
        validation_errors = validate_schema_metadata_create(update_data)
        if validation_errors:
            raise ValidationError(f"Schema metadata validation failed: {'; '.join(validation_errors)}")

    response = self._client.put(
        f"/projects/{project_id}/schema-metadata/{schema_metadata_id}",
        data=update_data.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return SchemaMetadataResponse(**response_data)

validate_schema(schema_data, expected_type=None)

Validate schema data structure with nested field checks.

Parameters:

Name Type Description Default
schema_data Dict[str, Any]

The schema data to validate

required
expected_type Optional[str]

Optional expected schema type

None

Returns:

Type Description
List[str]

List of validation error messages (empty if valid)

Example
# Validate a table schema
table_data = {
    "table": {
        "name": "users",
        "columns": [{"name": "id", "type": "integer"}]
    }
}
errors = client.schema_metadata.validate_schema(table_data, "table")
if not errors:
    print("Schema is valid!")
Source code in resources/schema_metadata.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
def validate_schema(self, schema_data: Dict[str, Any], expected_type: Optional[str] = None) -> List[str]:
    """Validate schema data structure with nested field checks.

    Args:
        schema_data: The schema data to validate
        expected_type: Optional expected schema type

    Returns:
        List of validation error messages (empty if valid)

    Example:
        ```python
        # Validate a table schema
        table_data = {
            "table": {
                "name": "users",
                "columns": [{"name": "id", "type": "integer"}]
            }
        }
        errors = client.schema_metadata.validate_schema(table_data, "table")
        if not errors:
            print("Schema is valid!")
        ```
    """
    return validate_schema_metadata({"name": "validation_test", "schema_data": schema_data}, expected_type)

text2everything_sdk.resources.golden_examples.GoldenExamplesResource

Bases: BaseResource

Resource for managing golden examples (query-SQL pairs).

Source code in resources/golden_examples.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
class GoldenExamplesResource(BaseResource):
    """Resource for managing golden examples (query-SQL pairs)."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(
        self,
        project_id: str,
        user_query: str,
        sql_query: str,
        description: Optional[str] = None,
        is_always_displayed: bool = False,
        **kwargs
    ) -> GoldenExampleResponse:
        """Create a new golden example.

        Args:
            project_id: The project ID to create the golden example in
            user_query: The natural language query
            sql_query: The corresponding SQL query
            description: Optional description of the golden example
            is_always_displayed: Whether this example should always be displayed
            **kwargs: Additional golden example fields

        Returns:
            The created golden example with response details

        Example:
            ```python
            result = client.golden_examples.create(
                project_id=project_id,
                user_query="How many users do we have?",
                sql_query="SELECT COUNT(*) FROM users;",
                description="Count total users",
                is_always_displayed=True
            )
            ```
        """
        # Basic validation
        if not user_query or not user_query.strip():
            raise ValidationError("User query cannot be empty")

        if not sql_query or not sql_query.strip():
            raise ValidationError("SQL query cannot be empty")

        # Build the GoldenExampleCreate object internally
        golden_example = GoldenExampleCreate(
            user_query=user_query,
            sql_query=sql_query,
            description=description,
            is_always_displayed=is_always_displayed,
            **kwargs
        )

        response = self._client.post(
            f"/projects/{project_id}/golden-examples",
            data=golden_example.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return GoldenExampleResponse(**response_data)

    def get(self, project_id: str, golden_example_id: str) -> GoldenExampleResponse:
        """Get a golden example by ID.

        Args:
            project_id: The project ID
            golden_example_id: The golden example ID

        Returns:
            The golden example details

        Example:
            ```python
            golden_example = client.golden_examples.get(project_id, golden_example_id)
            print(f"Query: {golden_example.user_query}")
            print(f"SQL: {golden_example.sql_query}")
            ```
        """
        response = self._client.get(f"/projects/{project_id}/golden-examples/{golden_example_id}")
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return GoldenExampleResponse(**response_data)

    def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[GoldenExampleResponse]:
        """List golden examples for a project.

        Args:
            project_id: The project ID
            limit: Maximum number of items to return
            offset: Number of items to skip

        Returns:
            List of golden examples

        Example:
            ```python
            examples = client.golden_examples.list(project_id)
            for example in examples:
                print(f"{example.user_query}")
            ```
        """
        endpoint = f"/projects/{project_id}/golden-examples"
        params = {"limit": limit, "offset": offset}
        return self._paginate(endpoint, params=params, model_class=GoldenExampleResponse)

    def update(
        self,
        project_id: str,
        golden_example_id: str,
        user_query: Optional[str] = None,
        sql_query: Optional[str] = None,
        description: Optional[str] = None,
        is_always_displayed: Optional[bool] = None,
        **kwargs
    ) -> GoldenExampleResponse:
        """Update a golden example.

        Args:
            project_id: The project ID
            golden_example_id: The golden example ID to update
            user_query: New user query
            sql_query: New SQL query
            description: New description
            is_always_displayed: New always displayed setting
            **kwargs: Additional fields to update

        Returns:
            The updated golden example

        Example:
            ```python
            result = client.golden_examples.update(
                project_id=project_id,
                golden_example_id=example_id,
                description="Updated description",
                sql_query="SELECT COUNT(*) as total_users FROM users WHERE active = true;"
            )
            ```
        """
        # Get current golden example first since API expects complete data
        current_example = self.get(project_id, golden_example_id)

        # Use current values as defaults, override with provided values
        update_data = GoldenExampleCreate(
            user_query=user_query if user_query is not None else current_example.user_query,
            sql_query=sql_query if sql_query is not None else current_example.sql_query,
            description=description if description is not None else current_example.description,
            is_always_displayed=is_always_displayed if is_always_displayed is not None else current_example.is_always_displayed,
            **kwargs
        )

        # Validate non-empty queries
        if not update_data.user_query or not update_data.user_query.strip():
            raise ValidationError("User query cannot be empty")

        if not update_data.sql_query or not update_data.sql_query.strip():
            raise ValidationError("SQL query cannot be empty")

        response = self._client.put(
            f"/projects/{project_id}/golden-examples/{golden_example_id}",
            data=update_data.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return GoldenExampleResponse(**response_data)

    def delete(self, project_id: str, golden_example_id: str) -> bool:
        """Delete a golden example.

        Args:
            project_id: The project ID
            golden_example_id: The golden example ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.golden_examples.delete(project_id, golden_example_id)
            ```
        """
        self._client.delete(f"/projects/{project_id}/golden-examples/{golden_example_id}")
        return True

    def search_by_query(self, project_id: str, search_term: str) -> List[GoldenExampleResponse]:
        """Search golden examples by user query text.

        Args:
            project_id: The project ID
            search_term: Term to search for in user queries

        Returns:
            List of golden examples matching the search term

        Example:
            ```python
            # Find examples related to users
            user_examples = client.golden_examples.search_by_query(project_id, "user")
            ```
        """
        all_examples = self.list(project_id)
        search_term_lower = search_term.lower()

        return [
            example for example in all_examples
            if search_term_lower in example.user_query.lower()
        ]


    def bulk_create(
        self, 
        project_id: str, 
        golden_examples: List[Dict[str, Any]], 
        parallel: bool = True,
        max_workers: Optional[int] = None,
        max_concurrent: int = 8,
        use_connection_isolation: bool = True
    ) -> List[GoldenExampleResponse]:
        """Create multiple golden examples with optional parallel execution and rate limiting.

        Args:
            project_id: The project ID
            golden_examples: List of golden example dictionaries to create
            parallel: Whether to execute requests in parallel (default: True)
            max_workers: Maximum number of parallel workers (default: min(16, len(items)))
            max_concurrent: Maximum number of concurrent requests to prevent server overload (default: 8)
            use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

        Returns:
            List of created golden examples in the same order as input

        Raises:
            ValidationError: If any validation fails

        Example:
            ```python
            examples = [
                {
                    "user_query": "How many users?",
                    "sql_query": "SELECT COUNT(*) FROM users;",
                    "description": "Count total users"
                },
                {
                    "user_query": "How many active users?",
                    "sql_query": "SELECT COUNT(*) FROM users WHERE active = true;",
                    "is_always_displayed": True
                }
            ]
            # Parallel execution with rate limiting (default)
            results = client.golden_examples.bulk_create(project_id, examples)

            # Sequential execution
            results = client.golden_examples.bulk_create(project_id, examples, parallel=False)

            # Custom rate limiting
            results = client.golden_examples.bulk_create(project_id, examples, max_concurrent=4)

            # Disable connection isolation for shared connection pool
            results = client.golden_examples.bulk_create(project_id, examples, use_connection_isolation=False)
            ```
        """
        if not golden_examples:
            return []

        # Pre-validate all examples
        all_errors = []
        for i, example_data in enumerate(golden_examples):
            try:
                if not example_data.get("user_query", "").strip():
                    all_errors.append(f"Item {i} ({example_data.get('user_query', 'unnamed')}): User query cannot be empty")
                if not example_data.get("sql_query", "").strip():
                    all_errors.append(f"Item {i} ({example_data.get('user_query', 'unnamed')}): SQL query cannot be empty")
            except Exception as e:
                all_errors.append(f"Item {i}: Invalid data structure - {str(e)}")

        if all_errors:
            raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

        if not parallel or len(golden_examples) == 1:
            # Sequential execution
            results = []
            for example_data in golden_examples:
                result = self.create(
                    project_id=project_id,
                    **example_data
                )
                results.append(result)
            return results

        # Create the first item sequentially to avoid race conditions when creating collections
        first_result = self.create(
            project_id=project_id,
            **golden_examples[0]
        )

        # Parallel execution for remaining items
        remaining = golden_examples[1:]
        if max_workers is None:
            max_workers = min(16, len(remaining))

        def create_single_example(indexed_data):
            """Helper function to create a single golden example with error handling."""
            index, example_data = indexed_data
            try:
                if use_connection_isolation:
                    # Create isolated HTTP client for this request to avoid connection conflicts
                    return index, self._create_with_isolated_client(
                        project_id=project_id,
                        example_data=example_data
                    ), None
                else:
                    # Use shared connection pool
                    return index, self.create(
                        project_id=project_id,
                        **example_data
                    ), None
            except Exception as e:
                return index, None, f"Item {index} ({example_data.get('user_query', 'unnamed')}): {str(e)}"

        # Execute in parallel with RateLimitedExecutor
        results = [None] * len(golden_examples)
        results[0] = first_result
        errors = []

        with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
            # Submit tasks for remaining items with their original indices starting at 1
            indexed_data = list(enumerate(remaining, start=1))
            future_to_index = {
                executor.submit_rate_limited(create_single_example, item): item[0] 
                for item in indexed_data
            }

            # Collect results as they complete
            for future in concurrent.futures.as_completed(future_to_index):
                index, result, error = future.result()
                if error:
                    errors.append(error)
                else:
                    results[index] = result

        # Check for any errors
        if errors:
            successful_count = sum(1 for r in results if r is not None)
            raise ValidationError(
                f"Bulk create partially failed: {successful_count}/{len(golden_examples)} succeeded. "
                f"Errors: {'; '.join(errors)}"
            )

        return results

    def _create_with_isolated_client(self, project_id: str, example_data: Dict[str, Any]) -> GoldenExampleResponse:
        """
        Create a golden example using an isolated HTTP client to avoid connection conflicts.

        Args:
            project_id: The project ID
            example_data: Golden example data dictionary

        Returns:
            Created GoldenExampleResponse instance
        """
        # Create isolated HTTP client with same configuration as main client
        timeout_config = httpx.Timeout(
            connect=30,      # Connection timeout
            read=180,        # Read timeout for long requests
            write=30,        # Write timeout
            pool=300         # Pool timeout
        )

        limits_config = httpx.Limits(
            max_connections=1,           # Single connection for isolation
            max_keepalive_connections=0, # No keep-alive to avoid state issues
            keepalive_expiry=0           # Immediate expiry
        )

        with httpx.Client(
            timeout=timeout_config,
            limits=limits_config,
            http2=False  # Use HTTP/1.1 for better compatibility
        ) as isolated_client:
            # Prepare golden example
            golden_example = GoldenExampleCreate(
                user_query=example_data["user_query"],
                sql_query=example_data["sql_query"],
                description=example_data.get("description"),
                is_always_displayed=example_data.get("is_always_displayed", False),
                **{k: v for k, v in example_data.items() if k not in ["user_query", "sql_query", "description", "is_always_displayed"]}
            )

            # Build endpoint and headers
            url = self._client._build_url(f"/projects/{project_id}/golden-examples")
            headers = self._client._get_default_headers()

            # Make isolated request
            response = isolated_client.post(url, json=golden_example.model_dump(), headers=headers)
            response_data = self._client._handle_response(response)

            # Handle both list and single object responses
            if isinstance(response_data, list):
                if response_data:
                    response_data = response_data[0]  # Take first item from list
                else:
                    raise ValidationError("API returned empty list")

            return GoldenExampleResponse(**response_data)

    def list_always_displayed(self, project_id: str) -> List[GoldenExampleResponse]:
        """List golden examples that are marked as always displayed.

        Args:
            project_id: The project ID

        Returns:
            List of golden examples marked as always displayed

        Example:
            ```python
            important_examples = client.golden_examples.list_always_displayed(project_id)
            ```
        """
        all_examples = self.list(project_id)
        return [example for example in all_examples if example.is_always_displayed]

bulk_create(project_id, golden_examples, parallel=True, max_workers=None, max_concurrent=8, use_connection_isolation=True)

Create multiple golden examples with optional parallel execution and rate limiting.

Parameters:

Name Type Description Default
project_id str

The project ID

required
golden_examples List[Dict[str, Any]]

List of golden example dictionaries to create

required
parallel bool

Whether to execute requests in parallel (default: True)

True
max_workers Optional[int]

Maximum number of parallel workers (default: min(16, len(items)))

None
max_concurrent int

Maximum number of concurrent requests to prevent server overload (default: 8)

8
use_connection_isolation bool

Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

True

Returns:

Type Description
List[GoldenExampleResponse]

List of created golden examples in the same order as input

Raises:

Type Description
ValidationError

If any validation fails

Example
examples = [
    {
        "user_query": "How many users?",
        "sql_query": "SELECT COUNT(*) FROM users;",
        "description": "Count total users"
    },
    {
        "user_query": "How many active users?",
        "sql_query": "SELECT COUNT(*) FROM users WHERE active = true;",
        "is_always_displayed": True
    }
]
# Parallel execution with rate limiting (default)
results = client.golden_examples.bulk_create(project_id, examples)

# Sequential execution
results = client.golden_examples.bulk_create(project_id, examples, parallel=False)

# Custom rate limiting
results = client.golden_examples.bulk_create(project_id, examples, max_concurrent=4)

# Disable connection isolation for shared connection pool
results = client.golden_examples.bulk_create(project_id, examples, use_connection_isolation=False)
Source code in resources/golden_examples.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def bulk_create(
    self, 
    project_id: str, 
    golden_examples: List[Dict[str, Any]], 
    parallel: bool = True,
    max_workers: Optional[int] = None,
    max_concurrent: int = 8,
    use_connection_isolation: bool = True
) -> List[GoldenExampleResponse]:
    """Create multiple golden examples with optional parallel execution and rate limiting.

    Args:
        project_id: The project ID
        golden_examples: List of golden example dictionaries to create
        parallel: Whether to execute requests in parallel (default: True)
        max_workers: Maximum number of parallel workers (default: min(16, len(items)))
        max_concurrent: Maximum number of concurrent requests to prevent server overload (default: 8)
        use_connection_isolation: Use isolated HTTP clients for each request to prevent connection conflicts (default: True)

    Returns:
        List of created golden examples in the same order as input

    Raises:
        ValidationError: If any validation fails

    Example:
        ```python
        examples = [
            {
                "user_query": "How many users?",
                "sql_query": "SELECT COUNT(*) FROM users;",
                "description": "Count total users"
            },
            {
                "user_query": "How many active users?",
                "sql_query": "SELECT COUNT(*) FROM users WHERE active = true;",
                "is_always_displayed": True
            }
        ]
        # Parallel execution with rate limiting (default)
        results = client.golden_examples.bulk_create(project_id, examples)

        # Sequential execution
        results = client.golden_examples.bulk_create(project_id, examples, parallel=False)

        # Custom rate limiting
        results = client.golden_examples.bulk_create(project_id, examples, max_concurrent=4)

        # Disable connection isolation for shared connection pool
        results = client.golden_examples.bulk_create(project_id, examples, use_connection_isolation=False)
        ```
    """
    if not golden_examples:
        return []

    # Pre-validate all examples
    all_errors = []
    for i, example_data in enumerate(golden_examples):
        try:
            if not example_data.get("user_query", "").strip():
                all_errors.append(f"Item {i} ({example_data.get('user_query', 'unnamed')}): User query cannot be empty")
            if not example_data.get("sql_query", "").strip():
                all_errors.append(f"Item {i} ({example_data.get('user_query', 'unnamed')}): SQL query cannot be empty")
        except Exception as e:
            all_errors.append(f"Item {i}: Invalid data structure - {str(e)}")

    if all_errors:
        raise ValidationError(f"Bulk validation failed: {'; '.join(all_errors)}")

    if not parallel or len(golden_examples) == 1:
        # Sequential execution
        results = []
        for example_data in golden_examples:
            result = self.create(
                project_id=project_id,
                **example_data
            )
            results.append(result)
        return results

    # Create the first item sequentially to avoid race conditions when creating collections
    first_result = self.create(
        project_id=project_id,
        **golden_examples[0]
    )

    # Parallel execution for remaining items
    remaining = golden_examples[1:]
    if max_workers is None:
        max_workers = min(16, len(remaining))

    def create_single_example(indexed_data):
        """Helper function to create a single golden example with error handling."""
        index, example_data = indexed_data
        try:
            if use_connection_isolation:
                # Create isolated HTTP client for this request to avoid connection conflicts
                return index, self._create_with_isolated_client(
                    project_id=project_id,
                    example_data=example_data
                ), None
            else:
                # Use shared connection pool
                return index, self.create(
                    project_id=project_id,
                    **example_data
                ), None
        except Exception as e:
            return index, None, f"Item {index} ({example_data.get('user_query', 'unnamed')}): {str(e)}"

    # Execute in parallel with RateLimitedExecutor
    results = [None] * len(golden_examples)
    results[0] = first_result
    errors = []

    with RateLimitedExecutor(max_workers=max_workers, max_concurrent=max_concurrent) as executor:
        # Submit tasks for remaining items with their original indices starting at 1
        indexed_data = list(enumerate(remaining, start=1))
        future_to_index = {
            executor.submit_rate_limited(create_single_example, item): item[0] 
            for item in indexed_data
        }

        # Collect results as they complete
        for future in concurrent.futures.as_completed(future_to_index):
            index, result, error = future.result()
            if error:
                errors.append(error)
            else:
                results[index] = result

    # Check for any errors
    if errors:
        successful_count = sum(1 for r in results if r is not None)
        raise ValidationError(
            f"Bulk create partially failed: {successful_count}/{len(golden_examples)} succeeded. "
            f"Errors: {'; '.join(errors)}"
        )

    return results

create(project_id, user_query, sql_query, description=None, is_always_displayed=False, **kwargs)

Create a new golden example.

Parameters:

Name Type Description Default
project_id str

The project ID to create the golden example in

required
user_query str

The natural language query

required
sql_query str

The corresponding SQL query

required
description Optional[str]

Optional description of the golden example

None
is_always_displayed bool

Whether this example should always be displayed

False
**kwargs

Additional golden example fields

{}

Returns:

Type Description
GoldenExampleResponse

The created golden example with response details

Example
result = client.golden_examples.create(
    project_id=project_id,
    user_query="How many users do we have?",
    sql_query="SELECT COUNT(*) FROM users;",
    description="Count total users",
    is_always_displayed=True
)
Source code in resources/golden_examples.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def create(
    self,
    project_id: str,
    user_query: str,
    sql_query: str,
    description: Optional[str] = None,
    is_always_displayed: bool = False,
    **kwargs
) -> GoldenExampleResponse:
    """Create a new golden example.

    Args:
        project_id: The project ID to create the golden example in
        user_query: The natural language query
        sql_query: The corresponding SQL query
        description: Optional description of the golden example
        is_always_displayed: Whether this example should always be displayed
        **kwargs: Additional golden example fields

    Returns:
        The created golden example with response details

    Example:
        ```python
        result = client.golden_examples.create(
            project_id=project_id,
            user_query="How many users do we have?",
            sql_query="SELECT COUNT(*) FROM users;",
            description="Count total users",
            is_always_displayed=True
        )
        ```
    """
    # Basic validation
    if not user_query or not user_query.strip():
        raise ValidationError("User query cannot be empty")

    if not sql_query or not sql_query.strip():
        raise ValidationError("SQL query cannot be empty")

    # Build the GoldenExampleCreate object internally
    golden_example = GoldenExampleCreate(
        user_query=user_query,
        sql_query=sql_query,
        description=description,
        is_always_displayed=is_always_displayed,
        **kwargs
    )

    response = self._client.post(
        f"/projects/{project_id}/golden-examples",
        data=golden_example.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return GoldenExampleResponse(**response_data)

delete(project_id, golden_example_id)

Delete a golden example.

Parameters:

Name Type Description Default
project_id str

The project ID

required
golden_example_id str

The golden example ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.golden_examples.delete(project_id, golden_example_id)
Source code in resources/golden_examples.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def delete(self, project_id: str, golden_example_id: str) -> bool:
    """Delete a golden example.

    Args:
        project_id: The project ID
        golden_example_id: The golden example ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.golden_examples.delete(project_id, golden_example_id)
        ```
    """
    self._client.delete(f"/projects/{project_id}/golden-examples/{golden_example_id}")
    return True

get(project_id, golden_example_id)

Get a golden example by ID.

Parameters:

Name Type Description Default
project_id str

The project ID

required
golden_example_id str

The golden example ID

required

Returns:

Type Description
GoldenExampleResponse

The golden example details

Example
golden_example = client.golden_examples.get(project_id, golden_example_id)
print(f"Query: {golden_example.user_query}")
print(f"SQL: {golden_example.sql_query}")
Source code in resources/golden_examples.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get(self, project_id: str, golden_example_id: str) -> GoldenExampleResponse:
    """Get a golden example by ID.

    Args:
        project_id: The project ID
        golden_example_id: The golden example ID

    Returns:
        The golden example details

    Example:
        ```python
        golden_example = client.golden_examples.get(project_id, golden_example_id)
        print(f"Query: {golden_example.user_query}")
        print(f"SQL: {golden_example.sql_query}")
        ```
    """
    response = self._client.get(f"/projects/{project_id}/golden-examples/{golden_example_id}")
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return GoldenExampleResponse(**response_data)

list(project_id, limit=100, offset=0)

List golden examples for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
limit int

Maximum number of items to return

100
offset int

Number of items to skip

0

Returns:

Type Description
List[GoldenExampleResponse]

List of golden examples

Example
examples = client.golden_examples.list(project_id)
for example in examples:
    print(f"{example.user_query}")
Source code in resources/golden_examples.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[GoldenExampleResponse]:
    """List golden examples for a project.

    Args:
        project_id: The project ID
        limit: Maximum number of items to return
        offset: Number of items to skip

    Returns:
        List of golden examples

    Example:
        ```python
        examples = client.golden_examples.list(project_id)
        for example in examples:
            print(f"{example.user_query}")
        ```
    """
    endpoint = f"/projects/{project_id}/golden-examples"
    params = {"limit": limit, "offset": offset}
    return self._paginate(endpoint, params=params, model_class=GoldenExampleResponse)

list_always_displayed(project_id)

List golden examples that are marked as always displayed.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
List[GoldenExampleResponse]

List of golden examples marked as always displayed

Example
important_examples = client.golden_examples.list_always_displayed(project_id)
Source code in resources/golden_examples.py
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def list_always_displayed(self, project_id: str) -> List[GoldenExampleResponse]:
    """List golden examples that are marked as always displayed.

    Args:
        project_id: The project ID

    Returns:
        List of golden examples marked as always displayed

    Example:
        ```python
        important_examples = client.golden_examples.list_always_displayed(project_id)
        ```
    """
    all_examples = self.list(project_id)
    return [example for example in all_examples if example.is_always_displayed]

search_by_query(project_id, search_term)

Search golden examples by user query text.

Parameters:

Name Type Description Default
project_id str

The project ID

required
search_term str

Term to search for in user queries

required

Returns:

Type Description
List[GoldenExampleResponse]

List of golden examples matching the search term

Example
# Find examples related to users
user_examples = client.golden_examples.search_by_query(project_id, "user")
Source code in resources/golden_examples.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def search_by_query(self, project_id: str, search_term: str) -> List[GoldenExampleResponse]:
    """Search golden examples by user query text.

    Args:
        project_id: The project ID
        search_term: Term to search for in user queries

    Returns:
        List of golden examples matching the search term

    Example:
        ```python
        # Find examples related to users
        user_examples = client.golden_examples.search_by_query(project_id, "user")
        ```
    """
    all_examples = self.list(project_id)
    search_term_lower = search_term.lower()

    return [
        example for example in all_examples
        if search_term_lower in example.user_query.lower()
    ]

update(project_id, golden_example_id, user_query=None, sql_query=None, description=None, is_always_displayed=None, **kwargs)

Update a golden example.

Parameters:

Name Type Description Default
project_id str

The project ID

required
golden_example_id str

The golden example ID to update

required
user_query Optional[str]

New user query

None
sql_query Optional[str]

New SQL query

None
description Optional[str]

New description

None
is_always_displayed Optional[bool]

New always displayed setting

None
**kwargs

Additional fields to update

{}

Returns:

Type Description
GoldenExampleResponse

The updated golden example

Example
result = client.golden_examples.update(
    project_id=project_id,
    golden_example_id=example_id,
    description="Updated description",
    sql_query="SELECT COUNT(*) as total_users FROM users WHERE active = true;"
)
Source code in resources/golden_examples.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def update(
    self,
    project_id: str,
    golden_example_id: str,
    user_query: Optional[str] = None,
    sql_query: Optional[str] = None,
    description: Optional[str] = None,
    is_always_displayed: Optional[bool] = None,
    **kwargs
) -> GoldenExampleResponse:
    """Update a golden example.

    Args:
        project_id: The project ID
        golden_example_id: The golden example ID to update
        user_query: New user query
        sql_query: New SQL query
        description: New description
        is_always_displayed: New always displayed setting
        **kwargs: Additional fields to update

    Returns:
        The updated golden example

    Example:
        ```python
        result = client.golden_examples.update(
            project_id=project_id,
            golden_example_id=example_id,
            description="Updated description",
            sql_query="SELECT COUNT(*) as total_users FROM users WHERE active = true;"
        )
        ```
    """
    # Get current golden example first since API expects complete data
    current_example = self.get(project_id, golden_example_id)

    # Use current values as defaults, override with provided values
    update_data = GoldenExampleCreate(
        user_query=user_query if user_query is not None else current_example.user_query,
        sql_query=sql_query if sql_query is not None else current_example.sql_query,
        description=description if description is not None else current_example.description,
        is_always_displayed=is_always_displayed if is_always_displayed is not None else current_example.is_always_displayed,
        **kwargs
    )

    # Validate non-empty queries
    if not update_data.user_query or not update_data.user_query.strip():
        raise ValidationError("User query cannot be empty")

    if not update_data.sql_query or not update_data.sql_query.strip():
        raise ValidationError("SQL query cannot be empty")

    response = self._client.put(
        f"/projects/{project_id}/golden-examples/{golden_example_id}",
        data=update_data.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return GoldenExampleResponse(**response_data)

text2everything_sdk.resources.connectors.ConnectorsResource

Bases: BaseResource

Resource for managing database connectors.

Source code in resources/connectors.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class ConnectorsResource(BaseResource):
    """Resource for managing database connectors."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(
        self,
        name: str,
        db_type: str,
        host: str,
        username: str,
        password: str,
        database: str,
        port: Optional[int] = None,
        description: Optional[str] = None,
        config: Optional[dict] = None,
        **kwargs
    ) -> Connector:
        """Create a new database connector.

        Args:
            name: Connector name
            db_type: Database type (postgres, mysql, sqlserver, snowflake)
            host: Database host
            port: Database port
            username: Database username
            password: Database password
            database: Database name
            description: Optional connector description
            config: Optional additional configuration
            **kwargs: Additional connector fields

        Returns:
            The created connector

        Example:
            ```python
            result = client.connectors.create(
                name="Production DB",
                description="Main production database",
                db_type="postgres",
                host="db.example.com",
                port=5432,
                username="app_user",
                password="secure_password",
                database="production"
            )
            ```
        """
        # Validate connector type
        if db_type.lower() not in [e.value for e in ConnectorType]:
            valid_types = ", ".join([e.value for e in ConnectorType])
            raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

        # Basic validation
        if not name or not name.strip():
            raise ValidationError("Connector name cannot be empty")

        if not host or not host.strip():
            raise ValidationError("Host cannot be empty")

        if not username or not username.strip():
            raise ValidationError("Username cannot be empty")

        if not password or not password.strip():
            raise ValidationError("Password cannot be empty")

        if not database or not database.strip():
            raise ValidationError("Database name cannot be empty")

        # Set default port based on database type if not provided
        if port is None:
            port_defaults = {
                "postgres": 5432,
                "mysql": 3306,
                "sqlserver": 1433,
                "snowflake": 443  # Snowflake typically uses HTTPS port
            }
            port = port_defaults.get(db_type.lower(), 5432)

        # Build the ConnectorCreate object internally
        connector = ConnectorCreate(
            name=name,
            db_type=db_type,
            host=host,
            port=port,
            username=username,
            password=password,
            database=database,
            description=description,
            config=config,
            **kwargs
        )

        response = self._client.post(
            "/connectors",
            data=connector.model_dump()
        )
        return Connector(**response)

    def get(self, connector_id: str) -> Connector:
        """Get a connector by ID.

        Args:
            connector_id: The connector ID

        Returns:
            The connector details

        Example:
            ```python
            connector = client.connectors.get(connector_id)
            print(f"Database: {connector.db_type} at {connector.host}")
            ```
        """
        response = self._client.get(f"/connectors/{connector_id}")
        return Connector(**response)

    def list(self, limit: int = 100, offset: int = 0) -> List[Connector]:
        """List all connectors.

        Args:
            limit: Maximum number of items to return
            offset: Number of items to skip

        Returns:
            List of connectors

        Example:
            ```python
            connectors = client.connectors.list()
            for connector in connectors:
                print(f"{connector.name}: {connector.db_type}")
            ```
        """
        endpoint = "/connectors"
        params = {"limit": limit, "skip": offset}
        return self._paginate(endpoint, params=params, model_class=Connector)

    def update(
        self,
        connector_id: str,
        name: Optional[str] = None,
        db_type: Optional[str] = None,
        host: Optional[str] = None,
        port: Optional[int] = None,
        username: Optional[str] = None,
        password: Optional[str] = None,
        database: Optional[str] = None,
        description: Optional[str] = None,
        config: Optional[dict] = None,
        **kwargs
    ) -> Connector:
        """Update a connector.

        Args:
            connector_id: The connector ID to update
            name: New connector name
        r    db_type: New database type
            host: New database host
            port: New database port
            username: New database username
            password: New database password
            database: New database name
            description: New connector description
            config: New additional configuration
            **kwargs: Additional fields to update

        Returns:
            The updated connector

        Example:
            ```python
            result = client.connectors.update(
                connector_id,
                description="Updated production database",
                port=5433
            )
            ```
        """
        # Validate connector type if provided
        if db_type and db_type.lower() not in [e.value for e in ConnectorType]:
            valid_types = ", ".join([e.value for e in ConnectorType])
            raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

        # Get current connector first since API expects complete data
        current_connector = self.get(connector_id)

        # Use current values as defaults, override with provided values
        update_data = ConnectorCreate(
            name=name if name is not None else current_connector.name,
            description=description if description is not None else current_connector.description,
            db_type=db_type if db_type is not None else current_connector.db_type,
            host=host if host is not None else current_connector.host,
            port=port if port is not None else current_connector.port,
            username=username if username is not None else current_connector.username,
            password=password if password is not None else current_connector.password,
            database=database if database is not None else current_connector.database,
            config=config if config is not None else current_connector.config,
            **kwargs
        )

        response = self._client.put(
            f"/connectors/{connector_id}",
            data=update_data.model_dump()
        )
        return Connector(**response)

    def delete(self, connector_id: str) -> bool:
        """Delete a connector.

        Args:
            connector_id: The connector ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.connectors.delete(connector_id)
            ```
        """
        self._client.delete(f"/connectors/{connector_id}")
        return True

    def test_connection(self, connector_id: str) -> bool:
        """Test a connector's database connection.

        Args:
            connector_id: The connector ID to test

        Returns:
            True if connection is successful

        Raises:
            ValidationError: If connection fails

        Example:
            ```python
            try:
                success = client.connectors.test_connection(connector_id)
                print("Connection successful!")
            except ValidationError as e:
                print(f"Connection failed: {e}")
            ```
        """
        try:
            # This would typically be a separate endpoint, but for now we'll use get
            # In a real implementation, there might be a POST /connectors/{id}/test endpoint
            connector = self.get(connector_id)
            return True
        except Exception as e:
            raise ValidationError(f"Connection test failed: {str(e)}")

    def list_by_type(self, db_type: str) -> List[Connector]:
        """List connectors by database type.

        Args:
            db_type: The database type to filter by

        Returns:
            List of connectors of the specified type

        Example:
            ```python
            postgres_connectors = client.connectors.list_by_type("postgres")
            ```
        """
        # Validate db_type
        if db_type.lower() not in [e.value for e in ConnectorType]:
            valid_types = ", ".join([e.value for e in ConnectorType])
            raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

        all_connectors = self.list()
        return [conn for conn in all_connectors if conn.db_type.lower() == db_type.lower()]

create(name, db_type, host, username, password, database, port=None, description=None, config=None, **kwargs)

Create a new database connector.

Parameters:

Name Type Description Default
name str

Connector name

required
db_type str

Database type (postgres, mysql, sqlserver, snowflake)

required
host str

Database host

required
port Optional[int]

Database port

None
username str

Database username

required
password str

Database password

required
database str

Database name

required
description Optional[str]

Optional connector description

None
config Optional[dict]

Optional additional configuration

None
**kwargs

Additional connector fields

{}

Returns:

Type Description
Connector

The created connector

Example
result = client.connectors.create(
    name="Production DB",
    description="Main production database",
    db_type="postgres",
    host="db.example.com",
    port=5432,
    username="app_user",
    password="secure_password",
    database="production"
)
Source code in resources/connectors.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def create(
    self,
    name: str,
    db_type: str,
    host: str,
    username: str,
    password: str,
    database: str,
    port: Optional[int] = None,
    description: Optional[str] = None,
    config: Optional[dict] = None,
    **kwargs
) -> Connector:
    """Create a new database connector.

    Args:
        name: Connector name
        db_type: Database type (postgres, mysql, sqlserver, snowflake)
        host: Database host
        port: Database port
        username: Database username
        password: Database password
        database: Database name
        description: Optional connector description
        config: Optional additional configuration
        **kwargs: Additional connector fields

    Returns:
        The created connector

    Example:
        ```python
        result = client.connectors.create(
            name="Production DB",
            description="Main production database",
            db_type="postgres",
            host="db.example.com",
            port=5432,
            username="app_user",
            password="secure_password",
            database="production"
        )
        ```
    """
    # Validate connector type
    if db_type.lower() not in [e.value for e in ConnectorType]:
        valid_types = ", ".join([e.value for e in ConnectorType])
        raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

    # Basic validation
    if not name or not name.strip():
        raise ValidationError("Connector name cannot be empty")

    if not host or not host.strip():
        raise ValidationError("Host cannot be empty")

    if not username or not username.strip():
        raise ValidationError("Username cannot be empty")

    if not password or not password.strip():
        raise ValidationError("Password cannot be empty")

    if not database or not database.strip():
        raise ValidationError("Database name cannot be empty")

    # Set default port based on database type if not provided
    if port is None:
        port_defaults = {
            "postgres": 5432,
            "mysql": 3306,
            "sqlserver": 1433,
            "snowflake": 443  # Snowflake typically uses HTTPS port
        }
        port = port_defaults.get(db_type.lower(), 5432)

    # Build the ConnectorCreate object internally
    connector = ConnectorCreate(
        name=name,
        db_type=db_type,
        host=host,
        port=port,
        username=username,
        password=password,
        database=database,
        description=description,
        config=config,
        **kwargs
    )

    response = self._client.post(
        "/connectors",
        data=connector.model_dump()
    )
    return Connector(**response)

delete(connector_id)

Delete a connector.

Parameters:

Name Type Description Default
connector_id str

The connector ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.connectors.delete(connector_id)
Source code in resources/connectors.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def delete(self, connector_id: str) -> bool:
    """Delete a connector.

    Args:
        connector_id: The connector ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.connectors.delete(connector_id)
        ```
    """
    self._client.delete(f"/connectors/{connector_id}")
    return True

get(connector_id)

Get a connector by ID.

Parameters:

Name Type Description Default
connector_id str

The connector ID

required

Returns:

Type Description
Connector

The connector details

Example
connector = client.connectors.get(connector_id)
print(f"Database: {connector.db_type} at {connector.host}")
Source code in resources/connectors.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def get(self, connector_id: str) -> Connector:
    """Get a connector by ID.

    Args:
        connector_id: The connector ID

    Returns:
        The connector details

    Example:
        ```python
        connector = client.connectors.get(connector_id)
        print(f"Database: {connector.db_type} at {connector.host}")
        ```
    """
    response = self._client.get(f"/connectors/{connector_id}")
    return Connector(**response)

list(limit=100, offset=0)

List all connectors.

Parameters:

Name Type Description Default
limit int

Maximum number of items to return

100
offset int

Number of items to skip

0

Returns:

Type Description
List[Connector]

List of connectors

Example
connectors = client.connectors.list()
for connector in connectors:
    print(f"{connector.name}: {connector.db_type}")
Source code in resources/connectors.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def list(self, limit: int = 100, offset: int = 0) -> List[Connector]:
    """List all connectors.

    Args:
        limit: Maximum number of items to return
        offset: Number of items to skip

    Returns:
        List of connectors

    Example:
        ```python
        connectors = client.connectors.list()
        for connector in connectors:
            print(f"{connector.name}: {connector.db_type}")
        ```
    """
    endpoint = "/connectors"
    params = {"limit": limit, "skip": offset}
    return self._paginate(endpoint, params=params, model_class=Connector)

list_by_type(db_type)

List connectors by database type.

Parameters:

Name Type Description Default
db_type str

The database type to filter by

required

Returns:

Type Description
List[Connector]

List of connectors of the specified type

Example
postgres_connectors = client.connectors.list_by_type("postgres")
Source code in resources/connectors.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def list_by_type(self, db_type: str) -> List[Connector]:
    """List connectors by database type.

    Args:
        db_type: The database type to filter by

    Returns:
        List of connectors of the specified type

    Example:
        ```python
        postgres_connectors = client.connectors.list_by_type("postgres")
        ```
    """
    # Validate db_type
    if db_type.lower() not in [e.value for e in ConnectorType]:
        valid_types = ", ".join([e.value for e in ConnectorType])
        raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

    all_connectors = self.list()
    return [conn for conn in all_connectors if conn.db_type.lower() == db_type.lower()]

test_connection(connector_id)

Test a connector's database connection.

Parameters:

Name Type Description Default
connector_id str

The connector ID to test

required

Returns:

Type Description
bool

True if connection is successful

Raises:

Type Description
ValidationError

If connection fails

Example
try:
    success = client.connectors.test_connection(connector_id)
    print("Connection successful!")
except ValidationError as e:
    print(f"Connection failed: {e}")
Source code in resources/connectors.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def test_connection(self, connector_id: str) -> bool:
    """Test a connector's database connection.

    Args:
        connector_id: The connector ID to test

    Returns:
        True if connection is successful

    Raises:
        ValidationError: If connection fails

    Example:
        ```python
        try:
            success = client.connectors.test_connection(connector_id)
            print("Connection successful!")
        except ValidationError as e:
            print(f"Connection failed: {e}")
        ```
    """
    try:
        # This would typically be a separate endpoint, but for now we'll use get
        # In a real implementation, there might be a POST /connectors/{id}/test endpoint
        connector = self.get(connector_id)
        return True
    except Exception as e:
        raise ValidationError(f"Connection test failed: {str(e)}")

update(connector_id, name=None, db_type=None, host=None, port=None, username=None, password=None, database=None, description=None, config=None, **kwargs)

Update a connector.

Parameters:

Name Type Description Default
connector_id str

The connector ID to update

required
name Optional[str]

New connector name

None

r db_type: New database type host: New database host port: New database port username: New database username password: New database password database: New database name description: New connector description config: New additional configuration **kwargs: Additional fields to update

Returns:

Type Description
Connector

The updated connector

Example
result = client.connectors.update(
    connector_id,
    description="Updated production database",
    port=5433
)
Source code in resources/connectors.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def update(
    self,
    connector_id: str,
    name: Optional[str] = None,
    db_type: Optional[str] = None,
    host: Optional[str] = None,
    port: Optional[int] = None,
    username: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    description: Optional[str] = None,
    config: Optional[dict] = None,
    **kwargs
) -> Connector:
    """Update a connector.

    Args:
        connector_id: The connector ID to update
        name: New connector name
    r    db_type: New database type
        host: New database host
        port: New database port
        username: New database username
        password: New database password
        database: New database name
        description: New connector description
        config: New additional configuration
        **kwargs: Additional fields to update

    Returns:
        The updated connector

    Example:
        ```python
        result = client.connectors.update(
            connector_id,
            description="Updated production database",
            port=5433
        )
        ```
    """
    # Validate connector type if provided
    if db_type and db_type.lower() not in [e.value for e in ConnectorType]:
        valid_types = ", ".join([e.value for e in ConnectorType])
        raise ValidationError(f"Invalid database type. Supported types are: {valid_types}")

    # Get current connector first since API expects complete data
    current_connector = self.get(connector_id)

    # Use current values as defaults, override with provided values
    update_data = ConnectorCreate(
        name=name if name is not None else current_connector.name,
        description=description if description is not None else current_connector.description,
        db_type=db_type if db_type is not None else current_connector.db_type,
        host=host if host is not None else current_connector.host,
        port=port if port is not None else current_connector.port,
        username=username if username is not None else current_connector.username,
        password=password if password is not None else current_connector.password,
        database=database if database is not None else current_connector.database,
        config=config if config is not None else current_connector.config,
        **kwargs
    )

    response = self._client.put(
        f"/connectors/{connector_id}",
        data=update_data.model_dump()
    )
    return Connector(**response)

text2everything_sdk.resources.chat_sessions.ChatSessionsResource

Bases: BaseResource

Resource for managing H2OGPTE chat sessions.

Source code in resources/chat_sessions.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class ChatSessionsResource(BaseResource):
    """Resource for managing H2OGPTE chat sessions."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(
        self,
        project_id: str,
        name: Optional[str] = None,
        custom_tool_id: Optional[str] = None,
        **kwargs
    ) -> ChatSessionResponse:
        """Create a new H2OGPTE chat session.

        Args:
            project_id: The project ID
            name: Optional name for the session
            custom_tool_id: Optional custom tool to associate
            **kwargs: Additional session fields

        Returns:
            The created chat session

        Example:
            ```python
            result = client.chat_sessions.create(
                project_id=project_id,
                name="Data Analysis Session",
                custom_tool_id="tool-123"
            )
            print(f"Session created: {result.id}")
            ```
        """
        # Build the ChatSessionCreate object internally
        session = ChatSessionCreate(
            name=name,
            custom_tool_id=custom_tool_id,
            **kwargs
        )

        response = self._client.post(
            f"/projects/{project_id}/chat-sessions",
            data=session.model_dump()
        )
        return ChatSessionResponse(**response)

    # TODO: The get chat session API endpoint is not working properly
    # def get(self, project_id: str, session_id: str) -> ChatSessionResponse:
    #     """Get a specific H2OGPTE chat session.
    #     
    #     Args:
    #         project_id: The project ID
    #         session_id: The chat session ID
    #         
    #     Returns:
    #         The chat session details
    #         
    #     Example:
    #         ```python
    #         session = client.chat_sessions.get(project_id, session_id)
    #         print(f"Session: {session.name}")
    #         print(f"Created: {session.created_at}")
    #         ```
    #     """
    #     response = self._client.get(f"/projects/{project_id}/chat-sessions/{session_id}")
    #     return ChatSessionResponse(**response)

    def list(self, project_id: str, offset: int = 0, limit: int = 10) -> List[ChatSessionResponse]:
        """List recent H2OGPTE chat sessions for a project.

        Args:
            project_id: The project ID
            offset: Number of sessions to skip
            limit: Maximum number of sessions to return

        Returns:
            List of chat sessions

        Example:
            ```python
            sessions = client.chat_sessions.list(project_id, limit=20)
            for session in sessions:
                print(f"{session.name}: {session.id}")
            ```
        """
        endpoint = f"/projects/{project_id}/chat-sessions"
        params = {"offset": offset, "limit": limit}
        response = self._client.get(endpoint, params=params)
        return [ChatSessionResponse(**item) for item in response]

    def update_custom_tool(self, project_id: str, session_id: str, 
                          custom_tool_id: str = None) -> ChatSessionResponse:
        """Update the custom tool associated with a chat session.

        Args:
            project_id: The project ID
            session_id: The chat session ID
            custom_tool_id: The custom tool ID to associate (None to detach)

        Returns:
            The updated chat session

        Example:
            ```python
            # Attach a custom tool
            session = client.chat_sessions.update_custom_tool(
                project_id, session_id, "tool-456"
            )

            # Detach custom tool
            session = client.chat_sessions.update_custom_tool(
                project_id, session_id, None
            )
            ```
        """
        update_request = ChatSessionUpdateRequest(custom_tool_id=custom_tool_id)
        response = self._client.put(
            f"/projects/{project_id}/chat-sessions/{session_id}/custom-tool",
            data=update_request.model_dump()
        )
        return ChatSessionResponse(**response)

    def get_custom_tool(self, project_id: str, session_id: str) -> Optional[CustomTool]:
        """Get the custom tool associated with a chat session.

        Args:
            project_id: The project ID
            session_id: The chat session ID

        Returns:
            The associated custom tool, or None if no tool is associated

        Example:
            ```python
            tool = client.chat_sessions.get_custom_tool(project_id, session_id)
            if tool:
                print(f"Associated tool: {tool.name}")
            else:
                print("No custom tool associated")
            ```
        """
        response = self._client.get(f"/projects/{project_id}/chat-sessions/{session_id}/custom-tool")
        return CustomTool(**response) if response else None

    def get_questions(self, project_id: str, session_id: str, 
                     limit: int = 10) -> List[ChatSessionQuestion]:
        """Get suggested questions for a chat session.

        Args:
            project_id: The project ID
            session_id: The chat session ID
            limit: Maximum number of questions to return

        Returns:
            List of suggested questions

        Example:
            ```python
            questions = client.chat_sessions.get_questions(project_id, session_id)
            for question in questions:
                print(f"Suggested: {question.question}")
            ```
        """
        endpoint = f"/projects/{project_id}/chat-sessions/{session_id}/questions"
        params = {"limit": limit}
        response = self._client.get(endpoint, params=params)
        return [ChatSessionQuestion(**item) for item in response]

    def delete(self, project_id: str, session_id: str) -> bool:
        """Delete a H2OGPTE chat session.

        Args:
            project_id: The project ID
            session_id: The chat session ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.chat_sessions.delete(project_id, session_id)
            ```
        """
        self._client.delete(f"/projects/{project_id}/chat-sessions/{session_id}")
        return True

    def create_with_tool(self, project_id: str, name: str = None, 
                        custom_tool_id: str = None) -> ChatSessionResponse:
        """Create a new chat session with an optional custom tool.

        Args:
            project_id: The project ID
            name: Optional name for the session
            custom_tool_id: Optional custom tool to associate

        Returns:
            The created chat session

        Example:
            ```python
            session = client.chat_sessions.create_with_tool(
                project_id="proj-123",
                name="Analysis Session",
                custom_tool_id="tool-456"
            )
            ```
        """
        return self.create(
            project_id=project_id,
            name=name,
            custom_tool_id=custom_tool_id
        )

create(project_id, name=None, custom_tool_id=None, **kwargs)

Create a new H2OGPTE chat session.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name Optional[str]

Optional name for the session

None
custom_tool_id Optional[str]

Optional custom tool to associate

None
**kwargs

Additional session fields

{}

Returns:

Type Description
ChatSessionResponse

The created chat session

Example
result = client.chat_sessions.create(
    project_id=project_id,
    name="Data Analysis Session",
    custom_tool_id="tool-123"
)
print(f"Session created: {result.id}")
Source code in resources/chat_sessions.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def create(
    self,
    project_id: str,
    name: Optional[str] = None,
    custom_tool_id: Optional[str] = None,
    **kwargs
) -> ChatSessionResponse:
    """Create a new H2OGPTE chat session.

    Args:
        project_id: The project ID
        name: Optional name for the session
        custom_tool_id: Optional custom tool to associate
        **kwargs: Additional session fields

    Returns:
        The created chat session

    Example:
        ```python
        result = client.chat_sessions.create(
            project_id=project_id,
            name="Data Analysis Session",
            custom_tool_id="tool-123"
        )
        print(f"Session created: {result.id}")
        ```
    """
    # Build the ChatSessionCreate object internally
    session = ChatSessionCreate(
        name=name,
        custom_tool_id=custom_tool_id,
        **kwargs
    )

    response = self._client.post(
        f"/projects/{project_id}/chat-sessions",
        data=session.model_dump()
    )
    return ChatSessionResponse(**response)

create_with_tool(project_id, name=None, custom_tool_id=None)

Create a new chat session with an optional custom tool.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name str

Optional name for the session

None
custom_tool_id str

Optional custom tool to associate

None

Returns:

Type Description
ChatSessionResponse

The created chat session

Example
session = client.chat_sessions.create_with_tool(
    project_id="proj-123",
    name="Analysis Session",
    custom_tool_id="tool-456"
)
Source code in resources/chat_sessions.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def create_with_tool(self, project_id: str, name: str = None, 
                    custom_tool_id: str = None) -> ChatSessionResponse:
    """Create a new chat session with an optional custom tool.

    Args:
        project_id: The project ID
        name: Optional name for the session
        custom_tool_id: Optional custom tool to associate

    Returns:
        The created chat session

    Example:
        ```python
        session = client.chat_sessions.create_with_tool(
            project_id="proj-123",
            name="Analysis Session",
            custom_tool_id="tool-456"
        )
        ```
    """
    return self.create(
        project_id=project_id,
        name=name,
        custom_tool_id=custom_tool_id
    )

delete(project_id, session_id)

Delete a H2OGPTE chat session.

Parameters:

Name Type Description Default
project_id str

The project ID

required
session_id str

The chat session ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.chat_sessions.delete(project_id, session_id)
Source code in resources/chat_sessions.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def delete(self, project_id: str, session_id: str) -> bool:
    """Delete a H2OGPTE chat session.

    Args:
        project_id: The project ID
        session_id: The chat session ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.chat_sessions.delete(project_id, session_id)
        ```
    """
    self._client.delete(f"/projects/{project_id}/chat-sessions/{session_id}")
    return True

get_custom_tool(project_id, session_id)

Get the custom tool associated with a chat session.

Parameters:

Name Type Description Default
project_id str

The project ID

required
session_id str

The chat session ID

required

Returns:

Type Description
Optional[CustomTool]

The associated custom tool, or None if no tool is associated

Example
tool = client.chat_sessions.get_custom_tool(project_id, session_id)
if tool:
    print(f"Associated tool: {tool.name}")
else:
    print("No custom tool associated")
Source code in resources/chat_sessions.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_custom_tool(self, project_id: str, session_id: str) -> Optional[CustomTool]:
    """Get the custom tool associated with a chat session.

    Args:
        project_id: The project ID
        session_id: The chat session ID

    Returns:
        The associated custom tool, or None if no tool is associated

    Example:
        ```python
        tool = client.chat_sessions.get_custom_tool(project_id, session_id)
        if tool:
            print(f"Associated tool: {tool.name}")
        else:
            print("No custom tool associated")
        ```
    """
    response = self._client.get(f"/projects/{project_id}/chat-sessions/{session_id}/custom-tool")
    return CustomTool(**response) if response else None

get_questions(project_id, session_id, limit=10)

Get suggested questions for a chat session.

Parameters:

Name Type Description Default
project_id str

The project ID

required
session_id str

The chat session ID

required
limit int

Maximum number of questions to return

10

Returns:

Type Description
List[ChatSessionQuestion]

List of suggested questions

Example
questions = client.chat_sessions.get_questions(project_id, session_id)
for question in questions:
    print(f"Suggested: {question.question}")
Source code in resources/chat_sessions.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_questions(self, project_id: str, session_id: str, 
                 limit: int = 10) -> List[ChatSessionQuestion]:
    """Get suggested questions for a chat session.

    Args:
        project_id: The project ID
        session_id: The chat session ID
        limit: Maximum number of questions to return

    Returns:
        List of suggested questions

    Example:
        ```python
        questions = client.chat_sessions.get_questions(project_id, session_id)
        for question in questions:
            print(f"Suggested: {question.question}")
        ```
    """
    endpoint = f"/projects/{project_id}/chat-sessions/{session_id}/questions"
    params = {"limit": limit}
    response = self._client.get(endpoint, params=params)
    return [ChatSessionQuestion(**item) for item in response]

list(project_id, offset=0, limit=10)

List recent H2OGPTE chat sessions for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
offset int

Number of sessions to skip

0
limit int

Maximum number of sessions to return

10

Returns:

Type Description
List[ChatSessionResponse]

List of chat sessions

Example
sessions = client.chat_sessions.list(project_id, limit=20)
for session in sessions:
    print(f"{session.name}: {session.id}")
Source code in resources/chat_sessions.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def list(self, project_id: str, offset: int = 0, limit: int = 10) -> List[ChatSessionResponse]:
    """List recent H2OGPTE chat sessions for a project.

    Args:
        project_id: The project ID
        offset: Number of sessions to skip
        limit: Maximum number of sessions to return

    Returns:
        List of chat sessions

    Example:
        ```python
        sessions = client.chat_sessions.list(project_id, limit=20)
        for session in sessions:
            print(f"{session.name}: {session.id}")
        ```
    """
    endpoint = f"/projects/{project_id}/chat-sessions"
    params = {"offset": offset, "limit": limit}
    response = self._client.get(endpoint, params=params)
    return [ChatSessionResponse(**item) for item in response]

update_custom_tool(project_id, session_id, custom_tool_id=None)

Update the custom tool associated with a chat session.

Parameters:

Name Type Description Default
project_id str

The project ID

required
session_id str

The chat session ID

required
custom_tool_id str

The custom tool ID to associate (None to detach)

None

Returns:

Type Description
ChatSessionResponse

The updated chat session

Example
# Attach a custom tool
session = client.chat_sessions.update_custom_tool(
    project_id, session_id, "tool-456"
)

# Detach custom tool
session = client.chat_sessions.update_custom_tool(
    project_id, session_id, None
)
Source code in resources/chat_sessions.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def update_custom_tool(self, project_id: str, session_id: str, 
                      custom_tool_id: str = None) -> ChatSessionResponse:
    """Update the custom tool associated with a chat session.

    Args:
        project_id: The project ID
        session_id: The chat session ID
        custom_tool_id: The custom tool ID to associate (None to detach)

    Returns:
        The updated chat session

    Example:
        ```python
        # Attach a custom tool
        session = client.chat_sessions.update_custom_tool(
            project_id, session_id, "tool-456"
        )

        # Detach custom tool
        session = client.chat_sessions.update_custom_tool(
            project_id, session_id, None
        )
        ```
    """
    update_request = ChatSessionUpdateRequest(custom_tool_id=custom_tool_id)
    response = self._client.put(
        f"/projects/{project_id}/chat-sessions/{session_id}/custom-tool",
        data=update_request.model_dump()
    )
    return ChatSessionResponse(**response)

text2everything_sdk.resources.chat.ChatResource

Bases: BaseResource

Resource for natural language to SQL chat functionality.

Source code in resources/chat.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class ChatResource(BaseResource):
    """Resource for natural language to SQL chat functionality."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def chat_to_sql(
        self,
        project_id: str,
        chat_session_id: str,
        query: str,
        schema_metadata_id: str = None,
        contexts_limit: int = None,
        examples_limit: int = None,
        **kwargs
    ) -> ChatResponse:
        """Convert natural language query to SQL.

        Args:
            project_id: The project ID
            chat_session_id: Target chat session for history/summary context
            query: Natural language query
            schema_metadata_id: Optional schema metadata ID
            contexts_limit: Optional limit for contexts
            examples_limit: Optional limit for examples
            **kwargs: Additional chat request fields

        Returns:
            Chat response with generated SQL and explanation

        Example:
            ```python
            response = client.chat.chat_to_sql(
                project_id=project_id,
                chat_session_id=chat_session_id,
                query="How many active users do we have?",
                schema_metadata_id="schema-456",
                contexts_limit=5,
                examples_limit=3
            )
            print(f"Generated SQL: {response.sql_query}")
            print(f"Explanation: {response.explanation}")
            ```
        """
        # Basic validation
        if not query or not query.strip():
            raise ValidationError("Query cannot be empty")

        if not chat_session_id or not chat_session_id.strip():
            raise ValidationError("chat_session_id cannot be empty")

        # Build the ChatRequest object internally
        request = ChatRequest(
            query=query,
            schema_metadata_id=schema_metadata_id,
            contexts_limit=contexts_limit,
            examples_limit=examples_limit,
            **kwargs
        )

        response = self._client.post(
            f"/projects/{project_id}/chat-sessions/{chat_session_id}/chat-to-sql",
            data=request.model_dump()
        )
        return ChatResponse(**response)

    def chat_to_answer(
        self,
        project_id: str,
        chat_session_id: str,
        query: str,
        connector_id: str,
        custom_tool_id: str = None,
        use_agent: bool = False,
        agent_accuracy: str = "medium",
        auto_add_feedback: dict = None,
        **kwargs
    ) -> ChatToAnswerResponse:
        """Convert natural language query to SQL and execute it.

        Args:
            project_id: The project ID
            chat_session_id: Target chat session for history/summary context
            query: Natural language query
            connector_id: Required connector ID for SQL execution
            custom_tool_id: Optional custom tool to use
            use_agent: Whether to use agent functionality
            agent_accuracy: Agent accuracy level ("low", "medium", "high")
            auto_add_feedback: Optional auto feedback configuration
            **kwargs: Additional chat to answer request fields

        Returns:
            Chat response with SQL, execution results, and optional feedback

        Example:
            ```python
            response = client.chat.chat_to_answer(
                project_id=project_id,
                query="Show me the top 10 customers by revenue",
                connector_id="conn-789",
                use_agent=True,
                agent_accuracy="high"
            )

            if response.execution_result:
                print(f"Query executed in {response.execution_result.execution_time_ms}ms")
                print(f"Results: {response.execution_result.result}")

            if response.agent_tool_response:
                print(f"Agent response: {response.agent_tool_response}")
            ```
        """
        # Basic validation
        if not query or not query.strip():
            raise ValidationError("Query cannot be empty")


        if not connector_id or not connector_id.strip():
            raise ValidationError("Connector ID cannot be empty")
        if not chat_session_id or not chat_session_id.strip():
            raise ValidationError("chat_session_id cannot be empty")

        # Build the ChatToAnswerRequest object internally
        # Handle auto_add_feedback parameter - if None, let the model use its default
        request_data = {
            "query": query,
            "connector_id": connector_id,  # Include connector_id in request body
            "custom_tool_id": custom_tool_id,
            "use_agent": use_agent,
            "agent_accuracy": agent_accuracy,
            **kwargs
        }

        # Handle auto_add_feedback - convert dict to AutoFeedbackConfig if needed
        if auto_add_feedback is not None:
            if isinstance(auto_add_feedback, dict):
                auto_add_feedback = AutoFeedbackConfig(**auto_add_feedback)
            request_data["auto_add_feedback"] = auto_add_feedback

        request = ChatToAnswerRequest(**request_data)

        # Send request without query parameters
        endpoint = f"/projects/{project_id}/chat-sessions/{chat_session_id}/chat-to-answer"

        response = self._client.post(
            endpoint,
            data=request.model_dump()
        )
        return ChatToAnswerResponse(**response)

    def chat_with_context(self, project_id: str, chat_session_id: str, query: str,
                         context_id: str = None, schema_metadata_id: str = None,
                         example_id: str = None, **kwargs) -> ChatResponse:
        """Chat with specific context, schema, or example.

        Args:
            project_id: The project ID
            query: Natural language query
            context_id: Optional specific context to use
            schema_metadata_id: Optional specific schema metadata to use
            example_id: Optional specific example to use
            **kwargs: Additional chat parameters

        Returns:
            Chat response with generated SQL

        Example:
            ```python
            response = client.chat.chat_with_context(
                project_id="proj-123",
                chat_session_id="chat-abc",
                query="Count active users",
                context_id="ctx-789",
                schema_metadata_id="schema-101",
                llm="gpt-4",
                use_agent=True
            )
            ```
        """
        return self.chat_to_sql(
            project_id=project_id,
            chat_session_id=chat_session_id,
            query=query,
            context_id=context_id,
            schema_metadata_id=schema_metadata_id,
            example_id=example_id,
            **kwargs
        )

    def chat_with_agent(self, project_id: str, chat_session_id: str, query: str,
                       connector_id: str, custom_tool_id: str = None, 
                       agent_accuracy: str = "medium", **kwargs) -> ChatToAnswerResponse:
        """Chat using custom tools and agent functionality.

        Args:
            project_id: The project ID
            query: Natural language query
            connector_id: Required connector ID for SQL execution
            custom_tool_id: Optional custom tool to use
            agent_accuracy: Agent accuracy level ("quick", "basic", "standard", "maximum")
            **kwargs: Additional chat parameters

        Returns:
            Chat response with agent tool response

        Example:
            ```python
            response = client.chat.chat_with_agent(
                project_id="proj-123",
                chat_session_id="chat-abc",
                query="Analyze customer churn patterns",
                connector_id="conn-123",
                custom_tool_id="tool-789",
                agent_accuracy="quick|basic|standard|maximum"
            )
            print(f"Agent response: {response.agent_tool_response}")
            ```
        """
        return self.chat_to_answer(
            project_id=project_id,
            chat_session_id=chat_session_id,
            query=query,
            connector_id=connector_id,
            custom_tool_id=custom_tool_id,
            use_agent=True,
            agent_accuracy=agent_accuracy,
            **kwargs
        )

chat_to_answer(project_id, chat_session_id, query, connector_id, custom_tool_id=None, use_agent=False, agent_accuracy='medium', auto_add_feedback=None, **kwargs)

Convert natural language query to SQL and execute it.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_session_id str

Target chat session for history/summary context

required
query str

Natural language query

required
connector_id str

Required connector ID for SQL execution

required
custom_tool_id str

Optional custom tool to use

None
use_agent bool

Whether to use agent functionality

False
agent_accuracy str

Agent accuracy level ("low", "medium", "high")

'medium'
auto_add_feedback dict

Optional auto feedback configuration

None
**kwargs

Additional chat to answer request fields

{}

Returns:

Type Description
ChatToAnswerResponse

Chat response with SQL, execution results, and optional feedback

Example
response = client.chat.chat_to_answer(
    project_id=project_id,
    query="Show me the top 10 customers by revenue",
    connector_id="conn-789",
    use_agent=True,
    agent_accuracy="high"
)

if response.execution_result:
    print(f"Query executed in {response.execution_result.execution_time_ms}ms")
    print(f"Results: {response.execution_result.result}")

if response.agent_tool_response:
    print(f"Agent response: {response.agent_tool_response}")
Source code in resources/chat.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def chat_to_answer(
    self,
    project_id: str,
    chat_session_id: str,
    query: str,
    connector_id: str,
    custom_tool_id: str = None,
    use_agent: bool = False,
    agent_accuracy: str = "medium",
    auto_add_feedback: dict = None,
    **kwargs
) -> ChatToAnswerResponse:
    """Convert natural language query to SQL and execute it.

    Args:
        project_id: The project ID
        chat_session_id: Target chat session for history/summary context
        query: Natural language query
        connector_id: Required connector ID for SQL execution
        custom_tool_id: Optional custom tool to use
        use_agent: Whether to use agent functionality
        agent_accuracy: Agent accuracy level ("low", "medium", "high")
        auto_add_feedback: Optional auto feedback configuration
        **kwargs: Additional chat to answer request fields

    Returns:
        Chat response with SQL, execution results, and optional feedback

    Example:
        ```python
        response = client.chat.chat_to_answer(
            project_id=project_id,
            query="Show me the top 10 customers by revenue",
            connector_id="conn-789",
            use_agent=True,
            agent_accuracy="high"
        )

        if response.execution_result:
            print(f"Query executed in {response.execution_result.execution_time_ms}ms")
            print(f"Results: {response.execution_result.result}")

        if response.agent_tool_response:
            print(f"Agent response: {response.agent_tool_response}")
        ```
    """
    # Basic validation
    if not query or not query.strip():
        raise ValidationError("Query cannot be empty")


    if not connector_id or not connector_id.strip():
        raise ValidationError("Connector ID cannot be empty")
    if not chat_session_id or not chat_session_id.strip():
        raise ValidationError("chat_session_id cannot be empty")

    # Build the ChatToAnswerRequest object internally
    # Handle auto_add_feedback parameter - if None, let the model use its default
    request_data = {
        "query": query,
        "connector_id": connector_id,  # Include connector_id in request body
        "custom_tool_id": custom_tool_id,
        "use_agent": use_agent,
        "agent_accuracy": agent_accuracy,
        **kwargs
    }

    # Handle auto_add_feedback - convert dict to AutoFeedbackConfig if needed
    if auto_add_feedback is not None:
        if isinstance(auto_add_feedback, dict):
            auto_add_feedback = AutoFeedbackConfig(**auto_add_feedback)
        request_data["auto_add_feedback"] = auto_add_feedback

    request = ChatToAnswerRequest(**request_data)

    # Send request without query parameters
    endpoint = f"/projects/{project_id}/chat-sessions/{chat_session_id}/chat-to-answer"

    response = self._client.post(
        endpoint,
        data=request.model_dump()
    )
    return ChatToAnswerResponse(**response)

chat_to_sql(project_id, chat_session_id, query, schema_metadata_id=None, contexts_limit=None, examples_limit=None, **kwargs)

Convert natural language query to SQL.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_session_id str

Target chat session for history/summary context

required
query str

Natural language query

required
schema_metadata_id str

Optional schema metadata ID

None
contexts_limit int

Optional limit for contexts

None
examples_limit int

Optional limit for examples

None
**kwargs

Additional chat request fields

{}

Returns:

Type Description
ChatResponse

Chat response with generated SQL and explanation

Example
response = client.chat.chat_to_sql(
    project_id=project_id,
    chat_session_id=chat_session_id,
    query="How many active users do we have?",
    schema_metadata_id="schema-456",
    contexts_limit=5,
    examples_limit=3
)
print(f"Generated SQL: {response.sql_query}")
print(f"Explanation: {response.explanation}")
Source code in resources/chat.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def chat_to_sql(
    self,
    project_id: str,
    chat_session_id: str,
    query: str,
    schema_metadata_id: str = None,
    contexts_limit: int = None,
    examples_limit: int = None,
    **kwargs
) -> ChatResponse:
    """Convert natural language query to SQL.

    Args:
        project_id: The project ID
        chat_session_id: Target chat session for history/summary context
        query: Natural language query
        schema_metadata_id: Optional schema metadata ID
        contexts_limit: Optional limit for contexts
        examples_limit: Optional limit for examples
        **kwargs: Additional chat request fields

    Returns:
        Chat response with generated SQL and explanation

    Example:
        ```python
        response = client.chat.chat_to_sql(
            project_id=project_id,
            chat_session_id=chat_session_id,
            query="How many active users do we have?",
            schema_metadata_id="schema-456",
            contexts_limit=5,
            examples_limit=3
        )
        print(f"Generated SQL: {response.sql_query}")
        print(f"Explanation: {response.explanation}")
        ```
    """
    # Basic validation
    if not query or not query.strip():
        raise ValidationError("Query cannot be empty")

    if not chat_session_id or not chat_session_id.strip():
        raise ValidationError("chat_session_id cannot be empty")

    # Build the ChatRequest object internally
    request = ChatRequest(
        query=query,
        schema_metadata_id=schema_metadata_id,
        contexts_limit=contexts_limit,
        examples_limit=examples_limit,
        **kwargs
    )

    response = self._client.post(
        f"/projects/{project_id}/chat-sessions/{chat_session_id}/chat-to-sql",
        data=request.model_dump()
    )
    return ChatResponse(**response)

chat_with_agent(project_id, chat_session_id, query, connector_id, custom_tool_id=None, agent_accuracy='medium', **kwargs)

Chat using custom tools and agent functionality.

Parameters:

Name Type Description Default
project_id str

The project ID

required
query str

Natural language query

required
connector_id str

Required connector ID for SQL execution

required
custom_tool_id str

Optional custom tool to use

None
agent_accuracy str

Agent accuracy level ("quick", "basic", "standard", "maximum")

'medium'
**kwargs

Additional chat parameters

{}

Returns:

Type Description
ChatToAnswerResponse

Chat response with agent tool response

Example
response = client.chat.chat_with_agent(
    project_id="proj-123",
    chat_session_id="chat-abc",
    query="Analyze customer churn patterns",
    connector_id="conn-123",
    custom_tool_id="tool-789",
    agent_accuracy="quick|basic|standard|maximum"
)
print(f"Agent response: {response.agent_tool_response}")
Source code in resources/chat.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def chat_with_agent(self, project_id: str, chat_session_id: str, query: str,
                   connector_id: str, custom_tool_id: str = None, 
                   agent_accuracy: str = "medium", **kwargs) -> ChatToAnswerResponse:
    """Chat using custom tools and agent functionality.

    Args:
        project_id: The project ID
        query: Natural language query
        connector_id: Required connector ID for SQL execution
        custom_tool_id: Optional custom tool to use
        agent_accuracy: Agent accuracy level ("quick", "basic", "standard", "maximum")
        **kwargs: Additional chat parameters

    Returns:
        Chat response with agent tool response

    Example:
        ```python
        response = client.chat.chat_with_agent(
            project_id="proj-123",
            chat_session_id="chat-abc",
            query="Analyze customer churn patterns",
            connector_id="conn-123",
            custom_tool_id="tool-789",
            agent_accuracy="quick|basic|standard|maximum"
        )
        print(f"Agent response: {response.agent_tool_response}")
        ```
    """
    return self.chat_to_answer(
        project_id=project_id,
        chat_session_id=chat_session_id,
        query=query,
        connector_id=connector_id,
        custom_tool_id=custom_tool_id,
        use_agent=True,
        agent_accuracy=agent_accuracy,
        **kwargs
    )

chat_with_context(project_id, chat_session_id, query, context_id=None, schema_metadata_id=None, example_id=None, **kwargs)

Chat with specific context, schema, or example.

Parameters:

Name Type Description Default
project_id str

The project ID

required
query str

Natural language query

required
context_id str

Optional specific context to use

None
schema_metadata_id str

Optional specific schema metadata to use

None
example_id str

Optional specific example to use

None
**kwargs

Additional chat parameters

{}

Returns:

Type Description
ChatResponse

Chat response with generated SQL

Example
response = client.chat.chat_with_context(
    project_id="proj-123",
    chat_session_id="chat-abc",
    query="Count active users",
    context_id="ctx-789",
    schema_metadata_id="schema-101",
    llm="gpt-4",
    use_agent=True
)
Source code in resources/chat.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def chat_with_context(self, project_id: str, chat_session_id: str, query: str,
                     context_id: str = None, schema_metadata_id: str = None,
                     example_id: str = None, **kwargs) -> ChatResponse:
    """Chat with specific context, schema, or example.

    Args:
        project_id: The project ID
        query: Natural language query
        context_id: Optional specific context to use
        schema_metadata_id: Optional specific schema metadata to use
        example_id: Optional specific example to use
        **kwargs: Additional chat parameters

    Returns:
        Chat response with generated SQL

    Example:
        ```python
        response = client.chat.chat_with_context(
            project_id="proj-123",
            chat_session_id="chat-abc",
            query="Count active users",
            context_id="ctx-789",
            schema_metadata_id="schema-101",
            llm="gpt-4",
            use_agent=True
        )
        ```
    """
    return self.chat_to_sql(
        project_id=project_id,
        chat_session_id=chat_session_id,
        query=query,
        context_id=context_id,
        schema_metadata_id=schema_metadata_id,
        example_id=example_id,
        **kwargs
    )

text2everything_sdk.resources.executions.ExecutionsResource

Bases: BaseResource

Resource for executing SQL queries against database connectors.

Source code in resources/executions.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class ExecutionsResource(BaseResource):
    """Resource for executing SQL queries against database connectors."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def execute_sql(
        self,
        connector_id: str,
        chat_message_id: str = None,
        sql_query: str = None,
        chat_session_id: str = None,
        **kwargs
    ) -> SQLExecuteResponse:
        """Execute a SQL query using the specified connector.

        Args:
            connector_id: The database connector ID
            chat_message_id: Optional chat message ID containing SQL query
            sql_query: Optional SQL query to execute directly
            chat_session_id: Optional chat session ID for context
            **kwargs: Additional execution request fields

        Returns:
            SQL execution response with results and performance metrics

        Example:
            ```python
            # Execute SQL from a chat message
            result = client.executions.execute_sql(
                connector_id="conn-123",
                chat_message_id="msg-456"
            )
            print(f"Execution time: {result.execution_time_ms}ms")
            print(f"Result: {result.result}")

            # Execute SQL directly
            result = client.executions.execute_sql(
                connector_id="conn-123",
                sql_query="SELECT COUNT(*) FROM users"
            )
            ```
        """
        # Validate that exactly one of chat_message_id or sql_query is provided
        if bool(chat_message_id) == bool(sql_query):
            raise ValidationError("Exactly one of chat_message_id or sql_query must be provided")

        if not connector_id or not connector_id.strip():
            raise ValidationError("Connector ID cannot be empty")

        # Build the SQLExecuteRequest object internally
        request = SQLExecuteRequest(
            connector_id=connector_id,
            chat_message_id=chat_message_id,
            sql_query=sql_query,
            chat_session_id=chat_session_id,
            **kwargs
        )

        response = self._client.post(
            "/sql/execute",
            data=request.model_dump(by_alias=True)
        )
        return SQLExecuteResponse(**response)

    def execute_from_chat(self, connector_id: str, chat_message_id: str, 
                         chat_session_id: str = None) -> SQLExecuteResponse:
        """Execute SQL from a chat message.

        Args:
            connector_id: The database connector ID
            chat_message_id: The chat message containing the SQL query
            chat_session_id: Optional chat session ID for context

        Returns:
            SQL execution response

        Example:
            ```python
            result = client.executions.execute_from_chat(
                connector_id="conn-123",
                chat_message_id="msg-456"
            )
            ```
        """
        return self.execute_sql(
            connector_id=connector_id,
            chat_message_id=chat_message_id,
            chat_session_id=chat_session_id
        )

    def execute_query(self, connector_id: str, sql_query: str,
                     chat_session_id: str = None) -> SQLExecuteResponse:
        """Execute a SQL query directly.

        Args:
            connector_id: The database connector ID
            sql_query: The SQL query to execute
            chat_session_id: Optional chat session ID for context

        Returns:
            SQL execution response

        Example:
            ```python
            result = client.executions.execute_query(
                connector_id="conn-123",
                sql_query="SELECT * FROM users WHERE active = true LIMIT 10"
            )
            print(f"Found {len(result.result.get('data', []))} rows")
            ```
        """
        if not sql_query or not sql_query.strip():
            raise ValidationError("SQL query cannot be empty")

        return self.execute_sql(
            connector_id=connector_id,
            sql_query=sql_query,
            chat_session_id=chat_session_id
        )

    def get_execution(self, execution_id: str) -> Execution:
        """Get execution details by ID.

        Args:
            execution_id: The execution ID

        Returns:
            Execution details

        Example:
            ```python
            execution = client.executions.get_execution("exec-123")
            print(f"Query: {execution.sql_query}")
            print(f"Time: {execution.execution_time_ms}ms")
            ```
        """
        # Note: This endpoint might not exist in the current API
        # This is a placeholder for future implementation
        response = self._client.get(f"/executions/{execution_id}")
        return Execution(**response)

execute_from_chat(connector_id, chat_message_id, chat_session_id=None)

Execute SQL from a chat message.

Parameters:

Name Type Description Default
connector_id str

The database connector ID

required
chat_message_id str

The chat message containing the SQL query

required
chat_session_id str

Optional chat session ID for context

None

Returns:

Type Description
SQLExecuteResponse

SQL execution response

Example
result = client.executions.execute_from_chat(
    connector_id="conn-123",
    chat_message_id="msg-456"
)
Source code in resources/executions.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def execute_from_chat(self, connector_id: str, chat_message_id: str, 
                     chat_session_id: str = None) -> SQLExecuteResponse:
    """Execute SQL from a chat message.

    Args:
        connector_id: The database connector ID
        chat_message_id: The chat message containing the SQL query
        chat_session_id: Optional chat session ID for context

    Returns:
        SQL execution response

    Example:
        ```python
        result = client.executions.execute_from_chat(
            connector_id="conn-123",
            chat_message_id="msg-456"
        )
        ```
    """
    return self.execute_sql(
        connector_id=connector_id,
        chat_message_id=chat_message_id,
        chat_session_id=chat_session_id
    )

execute_query(connector_id, sql_query, chat_session_id=None)

Execute a SQL query directly.

Parameters:

Name Type Description Default
connector_id str

The database connector ID

required
sql_query str

The SQL query to execute

required
chat_session_id str

Optional chat session ID for context

None

Returns:

Type Description
SQLExecuteResponse

SQL execution response

Example
result = client.executions.execute_query(
    connector_id="conn-123",
    sql_query="SELECT * FROM users WHERE active = true LIMIT 10"
)
print(f"Found {len(result.result.get('data', []))} rows")
Source code in resources/executions.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def execute_query(self, connector_id: str, sql_query: str,
                 chat_session_id: str = None) -> SQLExecuteResponse:
    """Execute a SQL query directly.

    Args:
        connector_id: The database connector ID
        sql_query: The SQL query to execute
        chat_session_id: Optional chat session ID for context

    Returns:
        SQL execution response

    Example:
        ```python
        result = client.executions.execute_query(
            connector_id="conn-123",
            sql_query="SELECT * FROM users WHERE active = true LIMIT 10"
        )
        print(f"Found {len(result.result.get('data', []))} rows")
        ```
    """
    if not sql_query or not sql_query.strip():
        raise ValidationError("SQL query cannot be empty")

    return self.execute_sql(
        connector_id=connector_id,
        sql_query=sql_query,
        chat_session_id=chat_session_id
    )

execute_sql(connector_id, chat_message_id=None, sql_query=None, chat_session_id=None, **kwargs)

Execute a SQL query using the specified connector.

Parameters:

Name Type Description Default
connector_id str

The database connector ID

required
chat_message_id str

Optional chat message ID containing SQL query

None
sql_query str

Optional SQL query to execute directly

None
chat_session_id str

Optional chat session ID for context

None
**kwargs

Additional execution request fields

{}

Returns:

Type Description
SQLExecuteResponse

SQL execution response with results and performance metrics

Example
# Execute SQL from a chat message
result = client.executions.execute_sql(
    connector_id="conn-123",
    chat_message_id="msg-456"
)
print(f"Execution time: {result.execution_time_ms}ms")
print(f"Result: {result.result}")

# Execute SQL directly
result = client.executions.execute_sql(
    connector_id="conn-123",
    sql_query="SELECT COUNT(*) FROM users"
)
Source code in resources/executions.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def execute_sql(
    self,
    connector_id: str,
    chat_message_id: str = None,
    sql_query: str = None,
    chat_session_id: str = None,
    **kwargs
) -> SQLExecuteResponse:
    """Execute a SQL query using the specified connector.

    Args:
        connector_id: The database connector ID
        chat_message_id: Optional chat message ID containing SQL query
        sql_query: Optional SQL query to execute directly
        chat_session_id: Optional chat session ID for context
        **kwargs: Additional execution request fields

    Returns:
        SQL execution response with results and performance metrics

    Example:
        ```python
        # Execute SQL from a chat message
        result = client.executions.execute_sql(
            connector_id="conn-123",
            chat_message_id="msg-456"
        )
        print(f"Execution time: {result.execution_time_ms}ms")
        print(f"Result: {result.result}")

        # Execute SQL directly
        result = client.executions.execute_sql(
            connector_id="conn-123",
            sql_query="SELECT COUNT(*) FROM users"
        )
        ```
    """
    # Validate that exactly one of chat_message_id or sql_query is provided
    if bool(chat_message_id) == bool(sql_query):
        raise ValidationError("Exactly one of chat_message_id or sql_query must be provided")

    if not connector_id or not connector_id.strip():
        raise ValidationError("Connector ID cannot be empty")

    # Build the SQLExecuteRequest object internally
    request = SQLExecuteRequest(
        connector_id=connector_id,
        chat_message_id=chat_message_id,
        sql_query=sql_query,
        chat_session_id=chat_session_id,
        **kwargs
    )

    response = self._client.post(
        "/sql/execute",
        data=request.model_dump(by_alias=True)
    )
    return SQLExecuteResponse(**response)

get_execution(execution_id)

Get execution details by ID.

Parameters:

Name Type Description Default
execution_id str

The execution ID

required

Returns:

Type Description
Execution

Execution details

Example
execution = client.executions.get_execution("exec-123")
print(f"Query: {execution.sql_query}")
print(f"Time: {execution.execution_time_ms}ms")
Source code in resources/executions.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def get_execution(self, execution_id: str) -> Execution:
    """Get execution details by ID.

    Args:
        execution_id: The execution ID

    Returns:
        Execution details

    Example:
        ```python
        execution = client.executions.get_execution("exec-123")
        print(f"Query: {execution.sql_query}")
        print(f"Time: {execution.execution_time_ms}ms")
        ```
    """
    # Note: This endpoint might not exist in the current API
    # This is a placeholder for future implementation
    response = self._client.get(f"/executions/{execution_id}")
    return Execution(**response)

text2everything_sdk.resources.feedback.FeedbackResource

Bases: BaseResource

Resource for managing feedback on chat messages and SQL executions.

Source code in resources/feedback.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class FeedbackResource(BaseResource):
    """Resource for managing feedback on chat messages and SQL executions."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(
        self,
        project_id: str,
        chat_message_id: str,
        feedback: str,
        is_positive: bool,
        execution_id: Optional[str] = None,
        **kwargs
    ) -> FeedbackResponse:
        """Create feedback for a chat message or execution.

        Args:
            project_id: The project ID
            chat_message_id: The chat message ID
            feedback: The feedback text
            is_positive: Whether the feedback is positive
            execution_id: Optional execution ID
            **kwargs: Additional feedback fields

        Returns:
            The created feedback with response details

        Example:
            ```python
            result = client.feedback.create(
                project_id=project_id,
                chat_message_id="msg-123",
                feedback="The SQL query worked perfectly!",
                is_positive=True,
                execution_id="exec-456"
            )
            ```
        """
        # Basic validation
        if not chat_message_id or not chat_message_id.strip():
            raise ValidationError("Chat message ID cannot be empty")

        if not feedback or not feedback.strip():
            raise ValidationError("Feedback text cannot be empty")

        # Build the FeedbackCreate object internally
        feedback_data = FeedbackCreate(
            chat_message_id=chat_message_id,
            feedback=feedback,
            is_positive=is_positive,
            execution_id=execution_id,
            **kwargs
        )

        response = self._client.post(
            f"/projects/{project_id}/feedback",
            data=feedback_data.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return FeedbackResponse(**response_data)

    def get(self, project_id: str, feedback_id: str) -> FeedbackResponse:
        """Get feedback by ID.

        Args:
            project_id: The project ID
            feedback_id: The feedback ID

        Returns:
            The feedback details

        Example:
            ```python
            feedback = client.feedback.get(project_id, feedback_id)
            print(f"Feedback: {feedback.feedback}")
            print(f"Positive: {feedback.is_positive}")
            ```
        """
        response = self._client.get(f"/projects/{project_id}/feedback/{feedback_id}")
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return FeedbackResponse(**response_data)

    def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[FeedbackResponse]:
        """List feedback for a project.

        Args:
            project_id: The project ID
            limit: Maximum number of items to return
            offset: Number of items to skip

        Returns:
            List of feedback items

        Example:
            ```python
            feedback_list = client.feedback.list(project_id)
            for feedback in feedback_list:
                status = "👍" if feedback.is_positive else "👎"
                print(f"{status} {feedback.feedback}")
            ```
        """
        endpoint = f"/projects/{project_id}/feedback"
        params = {"limit": limit, "offset": offset}
        return self._paginate(endpoint, params=params, model_class=FeedbackResponse)

    def update(
        self,
        project_id: str,
        feedback_id: str,
        chat_message_id: Optional[str] = None,
        feedback: Optional[str] = None,
        is_positive: Optional[bool] = None,
        execution_id: Optional[str] = None,
        **kwargs
    ) -> FeedbackResponse:
        """Update feedback.

        Args:
            project_id: The project ID
            feedback_id: The feedback ID to update
            chat_message_id: New chat message ID
            feedback: New feedback text
            is_positive: New positive/negative setting
            execution_id: New execution ID
            **kwargs: Additional fields to update

        Returns:
            The updated feedback

        Example:
            ```python
            result = client.feedback.update(
                project_id=project_id,
                feedback_id=feedback_id,
                feedback="Updated: The query was excellent!",
                is_positive=True
            )
            ```
        """
        # Get current feedback first since API expects complete data
        current_feedback = self.get(project_id, feedback_id)

        # Use current values as defaults, override with provided values
        update_data = FeedbackCreate(
            chat_message_id=chat_message_id if chat_message_id is not None else current_feedback.chat_message_id,
            feedback=feedback if feedback is not None else current_feedback.feedback,
            is_positive=is_positive if is_positive is not None else current_feedback.is_positive,
            execution_id=execution_id if execution_id is not None else current_feedback.execution_id,
            **kwargs
        )

        response = self._client.put(
            f"/projects/{project_id}/feedback/{feedback_id}",
            data=update_data.model_dump()
        )
        # Handle both list and single object responses
        if isinstance(response, list):
            if response:
                response_data = response[0]  # Take first item from list
            else:
                raise ValidationError("API returned empty list")
        else:
            response_data = response

        return FeedbackResponse(**response_data)

    def delete(self, project_id: str, feedback_id: str) -> bool:
        """Delete feedback.

        Args:
            project_id: The project ID
            feedback_id: The feedback ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.feedback.delete(project_id, feedback_id)
            ```
        """
        self._client.delete(f"/projects/{project_id}/feedback/{feedback_id}")
        return True

    def create_positive(self, project_id: str, chat_message_id: str, 
                       feedback_text: str, execution_id: str = None) -> FeedbackResponse:
        """Create positive feedback for a chat message.

        Args:
            project_id: The project ID
            chat_message_id: The chat message ID
            feedback_text: The feedback text
            execution_id: Optional execution ID

        Returns:
            The created positive feedback

        Example:
            ```python
            feedback = client.feedback.create_positive(
                project_id="proj-123",
                chat_message_id="msg-456",
                feedback_text="Perfect SQL query!",
                execution_id="exec-789"
            )
            ```
        """
        return self.create(
            project_id=project_id,
            chat_message_id=chat_message_id,
            feedback=feedback_text,
            is_positive=True,
            execution_id=execution_id
        )

    def create_negative(self, project_id: str, chat_message_id: str, 
                       feedback: str, execution_id: str = None) -> FeedbackResponse:
        """Create negative feedback for a chat message.

        Args:
            project_id: The project ID
            chat_message_id: The chat message ID
            feedback_text: The feedback text
            execution_id: Optional execution ID

        Returns:
            The created negative feedback

        Example:
            ```python
            feedback = client.feedback.create_negative(
                project_id="proj-123",
                chat_message_id="msg-456",
                feedback_text="Query returned incorrect results",
                execution_id="exec-789"
            )
            ```
        """
        return self.create(
            project_id=project_id,
            chat_message_id=chat_message_id,
            feedback=feedback,
            is_positive=False,
            execution_id=execution_id
        )

    def list_positive(self, project_id: str) -> List[FeedbackResponse]:
        """List all positive feedback for a project.

        Args:
            project_id: The project ID

        Returns:
            List of positive feedback items

        Example:
            ```python
            positive_feedback = client.feedback.list_positive(project_id)
            print(f"Found {len(positive_feedback)} positive feedback items")
            ```
        """
        all_feedback = self.list(project_id)
        return [fb for fb in all_feedback if fb.is_positive]

    def list_negative(self, project_id: str) -> List[FeedbackResponse]:
        """List all negative feedback for a project.

        Args:
            project_id: The project ID

        Returns:
            List of negative feedback items

        Example:
            ```python
            negative_feedback = client.feedback.list_negative(project_id)
            print(f"Found {len(negative_feedback)} negative feedback items")
            ```
        """
        all_feedback = self.list(project_id)
        return [fb for fb in all_feedback if not fb.is_positive]

    def get_feedback_for_message(self, project_id: str, chat_message_id: str) -> List[FeedbackResponse]:
        """Get all feedback for a specific chat message.

        Args:
            project_id: The project ID
            chat_message_id: The chat message ID

        Returns:
            List of feedback for the message

        Example:
            ```python
            message_feedback = client.feedback.get_feedback_for_message(
                project_id, "msg-123"
            )
            for feedback in message_feedback:
                print(f"Feedback: {feedback.feedback}")
            ```
        """
        all_feedback = self.list(project_id)
        return [fb for fb in all_feedback if fb.chat_message_id == chat_message_id]

create(project_id, chat_message_id, feedback, is_positive, execution_id=None, **kwargs)

Create feedback for a chat message or execution.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_message_id str

The chat message ID

required
feedback str

The feedback text

required
is_positive bool

Whether the feedback is positive

required
execution_id Optional[str]

Optional execution ID

None
**kwargs

Additional feedback fields

{}

Returns:

Type Description
FeedbackResponse

The created feedback with response details

Example
result = client.feedback.create(
    project_id=project_id,
    chat_message_id="msg-123",
    feedback="The SQL query worked perfectly!",
    is_positive=True,
    execution_id="exec-456"
)
Source code in resources/feedback.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create(
    self,
    project_id: str,
    chat_message_id: str,
    feedback: str,
    is_positive: bool,
    execution_id: Optional[str] = None,
    **kwargs
) -> FeedbackResponse:
    """Create feedback for a chat message or execution.

    Args:
        project_id: The project ID
        chat_message_id: The chat message ID
        feedback: The feedback text
        is_positive: Whether the feedback is positive
        execution_id: Optional execution ID
        **kwargs: Additional feedback fields

    Returns:
        The created feedback with response details

    Example:
        ```python
        result = client.feedback.create(
            project_id=project_id,
            chat_message_id="msg-123",
            feedback="The SQL query worked perfectly!",
            is_positive=True,
            execution_id="exec-456"
        )
        ```
    """
    # Basic validation
    if not chat_message_id or not chat_message_id.strip():
        raise ValidationError("Chat message ID cannot be empty")

    if not feedback or not feedback.strip():
        raise ValidationError("Feedback text cannot be empty")

    # Build the FeedbackCreate object internally
    feedback_data = FeedbackCreate(
        chat_message_id=chat_message_id,
        feedback=feedback,
        is_positive=is_positive,
        execution_id=execution_id,
        **kwargs
    )

    response = self._client.post(
        f"/projects/{project_id}/feedback",
        data=feedback_data.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return FeedbackResponse(**response_data)

create_negative(project_id, chat_message_id, feedback, execution_id=None)

Create negative feedback for a chat message.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_message_id str

The chat message ID

required
feedback_text

The feedback text

required
execution_id str

Optional execution ID

None

Returns:

Type Description
FeedbackResponse

The created negative feedback

Example
feedback = client.feedback.create_negative(
    project_id="proj-123",
    chat_message_id="msg-456",
    feedback_text="Query returned incorrect results",
    execution_id="exec-789"
)
Source code in resources/feedback.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def create_negative(self, project_id: str, chat_message_id: str, 
                   feedback: str, execution_id: str = None) -> FeedbackResponse:
    """Create negative feedback for a chat message.

    Args:
        project_id: The project ID
        chat_message_id: The chat message ID
        feedback_text: The feedback text
        execution_id: Optional execution ID

    Returns:
        The created negative feedback

    Example:
        ```python
        feedback = client.feedback.create_negative(
            project_id="proj-123",
            chat_message_id="msg-456",
            feedback_text="Query returned incorrect results",
            execution_id="exec-789"
        )
        ```
    """
    return self.create(
        project_id=project_id,
        chat_message_id=chat_message_id,
        feedback=feedback,
        is_positive=False,
        execution_id=execution_id
    )

create_positive(project_id, chat_message_id, feedback_text, execution_id=None)

Create positive feedback for a chat message.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_message_id str

The chat message ID

required
feedback_text str

The feedback text

required
execution_id str

Optional execution ID

None

Returns:

Type Description
FeedbackResponse

The created positive feedback

Example
feedback = client.feedback.create_positive(
    project_id="proj-123",
    chat_message_id="msg-456",
    feedback_text="Perfect SQL query!",
    execution_id="exec-789"
)
Source code in resources/feedback.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def create_positive(self, project_id: str, chat_message_id: str, 
                   feedback_text: str, execution_id: str = None) -> FeedbackResponse:
    """Create positive feedback for a chat message.

    Args:
        project_id: The project ID
        chat_message_id: The chat message ID
        feedback_text: The feedback text
        execution_id: Optional execution ID

    Returns:
        The created positive feedback

    Example:
        ```python
        feedback = client.feedback.create_positive(
            project_id="proj-123",
            chat_message_id="msg-456",
            feedback_text="Perfect SQL query!",
            execution_id="exec-789"
        )
        ```
    """
    return self.create(
        project_id=project_id,
        chat_message_id=chat_message_id,
        feedback=feedback_text,
        is_positive=True,
        execution_id=execution_id
    )

delete(project_id, feedback_id)

Delete feedback.

Parameters:

Name Type Description Default
project_id str

The project ID

required
feedback_id str

The feedback ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.feedback.delete(project_id, feedback_id)
Source code in resources/feedback.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def delete(self, project_id: str, feedback_id: str) -> bool:
    """Delete feedback.

    Args:
        project_id: The project ID
        feedback_id: The feedback ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.feedback.delete(project_id, feedback_id)
        ```
    """
    self._client.delete(f"/projects/{project_id}/feedback/{feedback_id}")
    return True

get(project_id, feedback_id)

Get feedback by ID.

Parameters:

Name Type Description Default
project_id str

The project ID

required
feedback_id str

The feedback ID

required

Returns:

Type Description
FeedbackResponse

The feedback details

Example
feedback = client.feedback.get(project_id, feedback_id)
print(f"Feedback: {feedback.feedback}")
print(f"Positive: {feedback.is_positive}")
Source code in resources/feedback.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def get(self, project_id: str, feedback_id: str) -> FeedbackResponse:
    """Get feedback by ID.

    Args:
        project_id: The project ID
        feedback_id: The feedback ID

    Returns:
        The feedback details

    Example:
        ```python
        feedback = client.feedback.get(project_id, feedback_id)
        print(f"Feedback: {feedback.feedback}")
        print(f"Positive: {feedback.is_positive}")
        ```
    """
    response = self._client.get(f"/projects/{project_id}/feedback/{feedback_id}")
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return FeedbackResponse(**response_data)

get_feedback_for_message(project_id, chat_message_id)

Get all feedback for a specific chat message.

Parameters:

Name Type Description Default
project_id str

The project ID

required
chat_message_id str

The chat message ID

required

Returns:

Type Description
List[FeedbackResponse]

List of feedback for the message

Example
message_feedback = client.feedback.get_feedback_for_message(
    project_id, "msg-123"
)
for feedback in message_feedback:
    print(f"Feedback: {feedback.feedback}")
Source code in resources/feedback.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def get_feedback_for_message(self, project_id: str, chat_message_id: str) -> List[FeedbackResponse]:
    """Get all feedback for a specific chat message.

    Args:
        project_id: The project ID
        chat_message_id: The chat message ID

    Returns:
        List of feedback for the message

    Example:
        ```python
        message_feedback = client.feedback.get_feedback_for_message(
            project_id, "msg-123"
        )
        for feedback in message_feedback:
            print(f"Feedback: {feedback.feedback}")
        ```
    """
    all_feedback = self.list(project_id)
    return [fb for fb in all_feedback if fb.chat_message_id == chat_message_id]

list(project_id, limit=100, offset=0)

List feedback for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
limit int

Maximum number of items to return

100
offset int

Number of items to skip

0

Returns:

Type Description
List[FeedbackResponse]

List of feedback items

Example
feedback_list = client.feedback.list(project_id)
for feedback in feedback_list:
    status = "👍" if feedback.is_positive else "👎"
    print(f"{status} {feedback.feedback}")
Source code in resources/feedback.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[FeedbackResponse]:
    """List feedback for a project.

    Args:
        project_id: The project ID
        limit: Maximum number of items to return
        offset: Number of items to skip

    Returns:
        List of feedback items

    Example:
        ```python
        feedback_list = client.feedback.list(project_id)
        for feedback in feedback_list:
            status = "👍" if feedback.is_positive else "👎"
            print(f"{status} {feedback.feedback}")
        ```
    """
    endpoint = f"/projects/{project_id}/feedback"
    params = {"limit": limit, "offset": offset}
    return self._paginate(endpoint, params=params, model_class=FeedbackResponse)

list_negative(project_id)

List all negative feedback for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
List[FeedbackResponse]

List of negative feedback items

Example
negative_feedback = client.feedback.list_negative(project_id)
print(f"Found {len(negative_feedback)} negative feedback items")
Source code in resources/feedback.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def list_negative(self, project_id: str) -> List[FeedbackResponse]:
    """List all negative feedback for a project.

    Args:
        project_id: The project ID

    Returns:
        List of negative feedback items

    Example:
        ```python
        negative_feedback = client.feedback.list_negative(project_id)
        print(f"Found {len(negative_feedback)} negative feedback items")
        ```
    """
    all_feedback = self.list(project_id)
    return [fb for fb in all_feedback if not fb.is_positive]

list_positive(project_id)

List all positive feedback for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required

Returns:

Type Description
List[FeedbackResponse]

List of positive feedback items

Example
positive_feedback = client.feedback.list_positive(project_id)
print(f"Found {len(positive_feedback)} positive feedback items")
Source code in resources/feedback.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def list_positive(self, project_id: str) -> List[FeedbackResponse]:
    """List all positive feedback for a project.

    Args:
        project_id: The project ID

    Returns:
        List of positive feedback items

    Example:
        ```python
        positive_feedback = client.feedback.list_positive(project_id)
        print(f"Found {len(positive_feedback)} positive feedback items")
        ```
    """
    all_feedback = self.list(project_id)
    return [fb for fb in all_feedback if fb.is_positive]

update(project_id, feedback_id, chat_message_id=None, feedback=None, is_positive=None, execution_id=None, **kwargs)

Update feedback.

Parameters:

Name Type Description Default
project_id str

The project ID

required
feedback_id str

The feedback ID to update

required
chat_message_id Optional[str]

New chat message ID

None
feedback Optional[str]

New feedback text

None
is_positive Optional[bool]

New positive/negative setting

None
execution_id Optional[str]

New execution ID

None
**kwargs

Additional fields to update

{}

Returns:

Type Description
FeedbackResponse

The updated feedback

Example
result = client.feedback.update(
    project_id=project_id,
    feedback_id=feedback_id,
    feedback="Updated: The query was excellent!",
    is_positive=True
)
Source code in resources/feedback.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def update(
    self,
    project_id: str,
    feedback_id: str,
    chat_message_id: Optional[str] = None,
    feedback: Optional[str] = None,
    is_positive: Optional[bool] = None,
    execution_id: Optional[str] = None,
    **kwargs
) -> FeedbackResponse:
    """Update feedback.

    Args:
        project_id: The project ID
        feedback_id: The feedback ID to update
        chat_message_id: New chat message ID
        feedback: New feedback text
        is_positive: New positive/negative setting
        execution_id: New execution ID
        **kwargs: Additional fields to update

    Returns:
        The updated feedback

    Example:
        ```python
        result = client.feedback.update(
            project_id=project_id,
            feedback_id=feedback_id,
            feedback="Updated: The query was excellent!",
            is_positive=True
        )
        ```
    """
    # Get current feedback first since API expects complete data
    current_feedback = self.get(project_id, feedback_id)

    # Use current values as defaults, override with provided values
    update_data = FeedbackCreate(
        chat_message_id=chat_message_id if chat_message_id is not None else current_feedback.chat_message_id,
        feedback=feedback if feedback is not None else current_feedback.feedback,
        is_positive=is_positive if is_positive is not None else current_feedback.is_positive,
        execution_id=execution_id if execution_id is not None else current_feedback.execution_id,
        **kwargs
    )

    response = self._client.put(
        f"/projects/{project_id}/feedback/{feedback_id}",
        data=update_data.model_dump()
    )
    # Handle both list and single object responses
    if isinstance(response, list):
        if response:
            response_data = response[0]  # Take first item from list
        else:
            raise ValidationError("API returned empty list")
    else:
        response_data = response

    return FeedbackResponse(**response_data)

text2everything_sdk.resources.custom_tools.CustomToolsResource

Bases: BaseResource

Resource for managing custom tools with Python script uploads.

Source code in resources/custom_tools.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
class CustomToolsResource(BaseResource):
    """Resource for managing custom tools with Python script uploads."""

    def __init__(self, client: Text2EverythingClient):
        super().__init__(client)

    def create(self, project_id: str, name: str, description: str, 
              files: List[Union[str, Path, BinaryIO]]) -> CustomTool:
        """Create a new custom tool with uploaded Python script files.

        Args:
            project_id: The project ID
            name: Name of the custom tool
            description: Description of the custom tool
            files: List of file paths or file objects to upload

        Returns:
            The created custom tool

        Example:
            ```python
            # Upload files by path
            tool = client.custom_tools.create(
                project_id="proj-123",
                name="Data Analysis Tool",
                description="Custom tool for advanced data analysis",
                files=["analysis.py", "utils.py"]
            )

            # Upload file objects
            with open("script.py", "rb") as f:
                tool = client.custom_tools.create(
                    project_id="proj-123",
                    name="Script Tool",
                    description="Custom script tool",
                    files=[f]
                )
            ```
        """
        # Basic validation
        if not name or not name.strip():
            raise ValidationError("Tool name cannot be empty")

        if not description or not description.strip():
            raise ValidationError("Tool description cannot be empty")

        if not files:
            raise ValidationError("At least one file must be provided")

        # Prepare multipart form data
        form_data = {
            "name": name,
            "description": description
        }

        # Prepare files for upload
        file_data = []
        opened_files = []  # Track files we opened so we can close them

        for file_item in files:
            if isinstance(file_item, (str, Path)):
                # File path - we need to open it
                file_path = Path(file_item)
                if not file_path.exists():
                    raise ValidationError(f"File not found: {file_path}")
                file_obj = open(file_path, "rb")
                opened_files.append(file_obj)  # Track for cleanup
                file_data.append(("files", (file_path.name, file_obj, "text/plain")))
            else:
                # File object - use as-is, don't close it
                filename = getattr(file_item, 'name', 'script.py')
                file_data.append(("files", (filename, file_item, "text/plain")))

        try:
            response = self._client.post_multipart(
                f"/projects/{project_id}/custom-tools",
                data=form_data,
                files=file_data
            )
            return CustomTool(**response)
        finally:
            # Close only the files we opened
            for file_obj in opened_files:
                try:
                    file_obj.close()
                except:
                    pass

    def get(self, project_id: str, tool_id: str) -> CustomTool:
        """Get a specific custom tool.

        Args:
            project_id: The project ID
            tool_id: The custom tool ID

        Returns:
            The custom tool details

        Example:
            ```python
            tool = client.custom_tools.get(project_id, tool_id)
            print(f"Tool: {tool.name}")
            print(f"Documents: {len(tool.documents)}")
            ```
        """
        response = self._client.get(f"/projects/{project_id}/custom-tools/{tool_id}")
        return CustomTool(**response)

    def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[CustomTool]:
        """List all custom tools for a project.

        Args:
            project_id: The project ID
            limit: Maximum number of items to return
            offset: Number of items to skip

        Returns:
            List of custom tools

        Example:
            ```python
            tools = client.custom_tools.list(project_id)
            for tool in tools:
                print(f"{tool.name}: {tool.description}")
            ```
        """
        endpoint = f"/projects/{project_id}/custom-tools"
        params = {"limit": limit, "skip": offset}
        return self._paginate(endpoint, params=params, model_class=CustomTool)

    def update(self, project_id: str, tool_id: str, name: str = None, 
              description: str = None, files: List[Union[str, Path, BinaryIO]] = None) -> CustomTool:
        """Update a custom tool.

        Args:
            project_id: The project ID
            tool_id: The custom tool ID to update
            name: Optional new name
            description: Optional new description
            files: Optional new files to replace existing ones

        Returns:
            The updated custom tool

        Example:
            ```python
            # Update metadata only
            tool = client.custom_tools.update(
                project_id, tool_id,
                name="Updated Tool Name",
                description="Updated description"
            )

            # Update files
            tool = client.custom_tools.update(
                project_id, tool_id,
                files=["new_script.py", "updated_utils.py"]
            )
            ```
        """
        # Prepare multipart form data
        form_data = {}
        if name is not None:
            form_data["name"] = name
        if description is not None:
            form_data["description"] = description

        file_data = []
        opened_files = []  # Track files we opened so we can close them

        if files:
            # Prepare files for upload
            for file_item in files:
                if isinstance(file_item, (str, Path)):
                    # File path - we need to open it
                    file_path = Path(file_item)
                    if not file_path.exists():
                        raise ValidationError(f"File not found: {file_path}")
                    file_obj = open(file_path, "rb")
                    opened_files.append(file_obj)  # Track for cleanup
                    file_data.append(("files", (file_path.name, file_obj, "text/plain")))
                else:
                    # File object - use as-is, don't close it
                    filename = getattr(file_item, 'name', 'script.py')
                    file_data.append(("files", (filename, file_item, "text/plain")))

        try:
            response = self._client.put_multipart(
                f"/projects/{project_id}/custom-tools/{tool_id}",
                data=form_data,
                files=file_data if file_data else None
            )
            return CustomTool(**response)
        finally:
            # Close only the files we opened
            for file_obj in opened_files:
                try:
                    file_obj.close()
                except:
                    pass

    def delete(self, project_id: str, tool_id: str) -> bool:
        """Delete a custom tool and its associated collection.

        Args:
            project_id: The project ID
            tool_id: The custom tool ID to delete

        Returns:
            True if deletion was successful

        Example:
            ```python
            success = client.custom_tools.delete(project_id, tool_id)
            ```
        """
        self._client.delete(f"/projects/{project_id}/custom-tools/{tool_id}")
        return True

    def create_from_directory(self, project_id: str, name: str, description: str, 
                             directory_path: Union[str, Path]) -> CustomTool:
        """Create a custom tool by uploading all Python files from a directory.

        Args:
            project_id: The project ID
            name: Name of the custom tool
            description: Description of the custom tool
            directory_path: Path to directory containing Python files

        Returns:
            The created custom tool

        Example:
            ```python
            tool = client.custom_tools.create_from_directory(
                project_id="proj-123",
                name="Analysis Suite",
                description="Complete data analysis toolkit",
                directory_path="./analysis_scripts"
            )
            ```
        """
        dir_path = Path(directory_path)
        if not dir_path.exists() or not dir_path.is_dir():
            raise ValidationError(f"Directory not found: {dir_path}")

        # Find all Python files in the directory
        python_files = list(dir_path.glob("*.py"))
        if not python_files:
            raise ValidationError(f"No Python files found in directory: {dir_path}")

        return self.create(project_id, name, description, python_files)

    def update_metadata(self, project_id: str, tool_id: str, 
                       update_data: CustomToolUpdate) -> CustomTool:
        """Update only the metadata (name/description) of a custom tool.

        Args:
            project_id: The project ID
            tool_id: The custom tool ID
            update_data: The metadata updates

        Returns:
            The updated custom tool

        Example:
            ```python
            update_data = CustomToolUpdate(
                name="New Tool Name",
                description="Updated description"
            )
            tool = client.custom_tools.update_metadata(project_id, tool_id, update_data)
            ```
        """
        return self.update(
            project_id, 
            tool_id, 
            name=update_data.name, 
            description=update_data.description
        )

    def replace_files(self, project_id: str, tool_id: str, 
                     files: List[Union[str, Path, BinaryIO]]) -> CustomTool:
        """Replace all files in a custom tool.

        Args:
            project_id: The project ID
            tool_id: The custom tool ID
            files: New files to replace existing ones

        Returns:
            The updated custom tool

        Example:
            ```python
            tool = client.custom_tools.replace_files(
                project_id, tool_id, 
                ["new_main.py", "new_utils.py"]
            )
            ```
        """
        return self.update(project_id, tool_id, files=files)

create(project_id, name, description, files)

Create a new custom tool with uploaded Python script files.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name str

Name of the custom tool

required
description str

Description of the custom tool

required
files List[Union[str, Path, BinaryIO]]

List of file paths or file objects to upload

required

Returns:

Type Description
CustomTool

The created custom tool

Example
# Upload files by path
tool = client.custom_tools.create(
    project_id="proj-123",
    name="Data Analysis Tool",
    description="Custom tool for advanced data analysis",
    files=["analysis.py", "utils.py"]
)

# Upload file objects
with open("script.py", "rb") as f:
    tool = client.custom_tools.create(
        project_id="proj-123",
        name="Script Tool",
        description="Custom script tool",
        files=[f]
    )
Source code in resources/custom_tools.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def create(self, project_id: str, name: str, description: str, 
          files: List[Union[str, Path, BinaryIO]]) -> CustomTool:
    """Create a new custom tool with uploaded Python script files.

    Args:
        project_id: The project ID
        name: Name of the custom tool
        description: Description of the custom tool
        files: List of file paths or file objects to upload

    Returns:
        The created custom tool

    Example:
        ```python
        # Upload files by path
        tool = client.custom_tools.create(
            project_id="proj-123",
            name="Data Analysis Tool",
            description="Custom tool for advanced data analysis",
            files=["analysis.py", "utils.py"]
        )

        # Upload file objects
        with open("script.py", "rb") as f:
            tool = client.custom_tools.create(
                project_id="proj-123",
                name="Script Tool",
                description="Custom script tool",
                files=[f]
            )
        ```
    """
    # Basic validation
    if not name or not name.strip():
        raise ValidationError("Tool name cannot be empty")

    if not description or not description.strip():
        raise ValidationError("Tool description cannot be empty")

    if not files:
        raise ValidationError("At least one file must be provided")

    # Prepare multipart form data
    form_data = {
        "name": name,
        "description": description
    }

    # Prepare files for upload
    file_data = []
    opened_files = []  # Track files we opened so we can close them

    for file_item in files:
        if isinstance(file_item, (str, Path)):
            # File path - we need to open it
            file_path = Path(file_item)
            if not file_path.exists():
                raise ValidationError(f"File not found: {file_path}")
            file_obj = open(file_path, "rb")
            opened_files.append(file_obj)  # Track for cleanup
            file_data.append(("files", (file_path.name, file_obj, "text/plain")))
        else:
            # File object - use as-is, don't close it
            filename = getattr(file_item, 'name', 'script.py')
            file_data.append(("files", (filename, file_item, "text/plain")))

    try:
        response = self._client.post_multipart(
            f"/projects/{project_id}/custom-tools",
            data=form_data,
            files=file_data
        )
        return CustomTool(**response)
    finally:
        # Close only the files we opened
        for file_obj in opened_files:
            try:
                file_obj.close()
            except:
                pass

create_from_directory(project_id, name, description, directory_path)

Create a custom tool by uploading all Python files from a directory.

Parameters:

Name Type Description Default
project_id str

The project ID

required
name str

Name of the custom tool

required
description str

Description of the custom tool

required
directory_path Union[str, Path]

Path to directory containing Python files

required

Returns:

Type Description
CustomTool

The created custom tool

Example
tool = client.custom_tools.create_from_directory(
    project_id="proj-123",
    name="Analysis Suite",
    description="Complete data analysis toolkit",
    directory_path="./analysis_scripts"
)
Source code in resources/custom_tools.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def create_from_directory(self, project_id: str, name: str, description: str, 
                         directory_path: Union[str, Path]) -> CustomTool:
    """Create a custom tool by uploading all Python files from a directory.

    Args:
        project_id: The project ID
        name: Name of the custom tool
        description: Description of the custom tool
        directory_path: Path to directory containing Python files

    Returns:
        The created custom tool

    Example:
        ```python
        tool = client.custom_tools.create_from_directory(
            project_id="proj-123",
            name="Analysis Suite",
            description="Complete data analysis toolkit",
            directory_path="./analysis_scripts"
        )
        ```
    """
    dir_path = Path(directory_path)
    if not dir_path.exists() or not dir_path.is_dir():
        raise ValidationError(f"Directory not found: {dir_path}")

    # Find all Python files in the directory
    python_files = list(dir_path.glob("*.py"))
    if not python_files:
        raise ValidationError(f"No Python files found in directory: {dir_path}")

    return self.create(project_id, name, description, python_files)

delete(project_id, tool_id)

Delete a custom tool and its associated collection.

Parameters:

Name Type Description Default
project_id str

The project ID

required
tool_id str

The custom tool ID to delete

required

Returns:

Type Description
bool

True if deletion was successful

Example
success = client.custom_tools.delete(project_id, tool_id)
Source code in resources/custom_tools.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def delete(self, project_id: str, tool_id: str) -> bool:
    """Delete a custom tool and its associated collection.

    Args:
        project_id: The project ID
        tool_id: The custom tool ID to delete

    Returns:
        True if deletion was successful

    Example:
        ```python
        success = client.custom_tools.delete(project_id, tool_id)
        ```
    """
    self._client.delete(f"/projects/{project_id}/custom-tools/{tool_id}")
    return True

get(project_id, tool_id)

Get a specific custom tool.

Parameters:

Name Type Description Default
project_id str

The project ID

required
tool_id str

The custom tool ID

required

Returns:

Type Description
CustomTool

The custom tool details

Example
tool = client.custom_tools.get(project_id, tool_id)
print(f"Tool: {tool.name}")
print(f"Documents: {len(tool.documents)}")
Source code in resources/custom_tools.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def get(self, project_id: str, tool_id: str) -> CustomTool:
    """Get a specific custom tool.

    Args:
        project_id: The project ID
        tool_id: The custom tool ID

    Returns:
        The custom tool details

    Example:
        ```python
        tool = client.custom_tools.get(project_id, tool_id)
        print(f"Tool: {tool.name}")
        print(f"Documents: {len(tool.documents)}")
        ```
    """
    response = self._client.get(f"/projects/{project_id}/custom-tools/{tool_id}")
    return CustomTool(**response)

list(project_id, limit=100, offset=0)

List all custom tools for a project.

Parameters:

Name Type Description Default
project_id str

The project ID

required
limit int

Maximum number of items to return

100
offset int

Number of items to skip

0

Returns:

Type Description
List[CustomTool]

List of custom tools

Example
tools = client.custom_tools.list(project_id)
for tool in tools:
    print(f"{tool.name}: {tool.description}")
Source code in resources/custom_tools.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def list(self, project_id: str, limit: int = 100, offset: int = 0) -> List[CustomTool]:
    """List all custom tools for a project.

    Args:
        project_id: The project ID
        limit: Maximum number of items to return
        offset: Number of items to skip

    Returns:
        List of custom tools

    Example:
        ```python
        tools = client.custom_tools.list(project_id)
        for tool in tools:
            print(f"{tool.name}: {tool.description}")
        ```
    """
    endpoint = f"/projects/{project_id}/custom-tools"
    params = {"limit": limit, "skip": offset}
    return self._paginate(endpoint, params=params, model_class=CustomTool)

replace_files(project_id, tool_id, files)

Replace all files in a custom tool.

Parameters:

Name Type Description Default
project_id str

The project ID

required
tool_id str

The custom tool ID

required
files List[Union[str, Path, BinaryIO]]

New files to replace existing ones

required

Returns:

Type Description
CustomTool

The updated custom tool

Example
tool = client.custom_tools.replace_files(
    project_id, tool_id, 
    ["new_main.py", "new_utils.py"]
)
Source code in resources/custom_tools.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def replace_files(self, project_id: str, tool_id: str, 
                 files: List[Union[str, Path, BinaryIO]]) -> CustomTool:
    """Replace all files in a custom tool.

    Args:
        project_id: The project ID
        tool_id: The custom tool ID
        files: New files to replace existing ones

    Returns:
        The updated custom tool

    Example:
        ```python
        tool = client.custom_tools.replace_files(
            project_id, tool_id, 
            ["new_main.py", "new_utils.py"]
        )
        ```
    """
    return self.update(project_id, tool_id, files=files)

update(project_id, tool_id, name=None, description=None, files=None)

Update a custom tool.

Parameters:

Name Type Description Default
project_id str

The project ID

required
tool_id str

The custom tool ID to update

required
name str

Optional new name

None
description str

Optional new description

None
files List[Union[str, Path, BinaryIO]]

Optional new files to replace existing ones

None

Returns:

Type Description
CustomTool

The updated custom tool

Example
# Update metadata only
tool = client.custom_tools.update(
    project_id, tool_id,
    name="Updated Tool Name",
    description="Updated description"
)

# Update files
tool = client.custom_tools.update(
    project_id, tool_id,
    files=["new_script.py", "updated_utils.py"]
)
Source code in resources/custom_tools.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def update(self, project_id: str, tool_id: str, name: str = None, 
          description: str = None, files: List[Union[str, Path, BinaryIO]] = None) -> CustomTool:
    """Update a custom tool.

    Args:
        project_id: The project ID
        tool_id: The custom tool ID to update
        name: Optional new name
        description: Optional new description
        files: Optional new files to replace existing ones

    Returns:
        The updated custom tool

    Example:
        ```python
        # Update metadata only
        tool = client.custom_tools.update(
            project_id, tool_id,
            name="Updated Tool Name",
            description="Updated description"
        )

        # Update files
        tool = client.custom_tools.update(
            project_id, tool_id,
            files=["new_script.py", "updated_utils.py"]
        )
        ```
    """
    # Prepare multipart form data
    form_data = {}
    if name is not None:
        form_data["name"] = name
    if description is not None:
        form_data["description"] = description

    file_data = []
    opened_files = []  # Track files we opened so we can close them

    if files:
        # Prepare files for upload
        for file_item in files:
            if isinstance(file_item, (str, Path)):
                # File path - we need to open it
                file_path = Path(file_item)
                if not file_path.exists():
                    raise ValidationError(f"File not found: {file_path}")
                file_obj = open(file_path, "rb")
                opened_files.append(file_obj)  # Track for cleanup
                file_data.append(("files", (file_path.name, file_obj, "text/plain")))
            else:
                # File object - use as-is, don't close it
                filename = getattr(file_item, 'name', 'script.py')
                file_data.append(("files", (filename, file_item, "text/plain")))

    try:
        response = self._client.put_multipart(
            f"/projects/{project_id}/custom-tools/{tool_id}",
            data=form_data,
            files=file_data if file_data else None
        )
        return CustomTool(**response)
    finally:
        # Close only the files we opened
        for file_obj in opened_files:
            try:
                file_obj.close()
            except:
                pass

update_metadata(project_id, tool_id, update_data)

Update only the metadata (name/description) of a custom tool.

Parameters:

Name Type Description Default
project_id str

The project ID

required
tool_id str

The custom tool ID

required
update_data CustomToolUpdate

The metadata updates

required

Returns:

Type Description
CustomTool

The updated custom tool

Example
update_data = CustomToolUpdate(
    name="New Tool Name",
    description="Updated description"
)
tool = client.custom_tools.update_metadata(project_id, tool_id, update_data)
Source code in resources/custom_tools.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def update_metadata(self, project_id: str, tool_id: str, 
                   update_data: CustomToolUpdate) -> CustomTool:
    """Update only the metadata (name/description) of a custom tool.

    Args:
        project_id: The project ID
        tool_id: The custom tool ID
        update_data: The metadata updates

    Returns:
        The updated custom tool

    Example:
        ```python
        update_data = CustomToolUpdate(
            name="New Tool Name",
            description="Updated description"
        )
        tool = client.custom_tools.update_metadata(project_id, tool_id, update_data)
        ```
    """
    return self.update(
        project_id, 
        tool_id, 
        name=update_data.name, 
        description=update_data.description
    )