Skip to content
Snippets Groups Projects

Core sampler

Merged Daniel Lyons requested to merge core-sampler into main
1 unresolved thread
1 file
+ 22
15
Compare changes
  • Side-by-side
  • Inline
@@ -128,6 +128,19 @@ class Table:
:return: a RowSet with the row in it
"""
# 1. Determine the primary key columns
primary_key_columns = self.primary_key_columns()
# 2. Generate the WHERE clause
pkey = {column_name: primary_key[column_name] for column_name in primary_key_columns}
whereclause = " AND ".join(f"{name} = %({name})s" for name in pkey.keys())
# 3. Run the query
self.cursor.execute(f"SELECT * FROM {self.name} WHERE {whereclause}", pkey)
# 4. Manufacture the result
return RowSet(self.cursor, self, self.cursor.fetchall())
def primary_key_columns(self):
self.cursor.execute(
"""SELECT c.column_name
FROM information_schema.table_constraints tc
@@ -139,18 +152,8 @@ class Table:
tc.table_name = %(tablename)s""",
dict(tablename=self.name),
)
primary_key_columns = [r["column_name"] for r in self.cursor.fetchall()]
# 2. Generate the WHERE clause
pkey = {column_name: primary_key[column_name] for column_name in primary_key_columns}
whereclause = " AND ".join(f"{name} = %({name})s" for name in pkey.keys())
# 3. Run the query
self.cursor.execute(f"SELECT * FROM {self.name} WHERE {whereclause}", pkey)
# 4. Manufacture the result
return RowSet(self.cursor, self, self.cursor.fetchall())
return primary_key_columns
def fetch_according_to(self, rows: "RowSet", columns: list[str]):
"""
@@ -160,17 +163,21 @@ class Table:
:param columns: columns to consider in the generated WHERE clause
:return: RowSet for the rows in this table
"""
print(f"fetching from {self.name}")
# 1. generate the where clause
escaped = [self.escape(row[column]) for column in columns for row in rows if row[column] is not None]
# 1. Escape the WHERE clause entries. it's important to use the primary keys
# to retrieve the values from the previous resultset; the new columns will
# appear in the query below.
key_columns = rows.table.primary_key_columns()
escaped = [self.escape(row[column]) for column in key_columns for row in rows if row[column] is not None]
# If we have no escaped entries, then there won't be anything in the resultset and we can return now
if len(escaped) == 0:
return RowSet(self.cursor, self, [])
# 2. Build the WHERE clause
whereclause = f"({','.join(columns)}) IN ({','.join(escaped)})"
# 2. send the query off and make the result
# 3. Send the query off and make the result
self.cursor.execute(f"SELECT * FROM {self.name} WHERE {whereclause}")
return RowSet(self.cursor, self, self.cursor.fetchall())
Loading