Added
pw.io.elasticsearch.readreads an Elasticsearch index into Pathway. Since Elasticsearch has no change-data-capture API, the connector ingests by polling and reconciling the overlap between consecutive queries, so no row is missed or delivered twice. It is configured withtimestamp_column(a numeric column it watermarks and orders by),id_column(a unique, sortable identifier used to deduplicate the overlap and as the Pathway row key), andmax_transaction_duration(how late a row may still become visible).mode="streaming"(default) keeps polling atpoll_interval;mode="static"reads the index once. The index is read in bounded pages ofread_batch_sizedocuments (default 10 000), each becoming one minibatch, and an idle index is detected and skipped without re-reading the overlap window. With persistence enabled, the connector resumes from the saved watermark, delivering only rows added since the last checkpoint. At startup it warns iftimestamp_columnorid_columnis mapped in a way it cannot poll on (e.g. anid_columnmapped astext, which Elasticsearch cannot sort by).pw.io.clickhouse.writewrites a Pathway table to a ClickHouse table over the native protocol. Two output formats are available viaoutput_table_type: the default"stream_of_changes"appends every change withtime/diffcolumns, while"snapshot"maintains the current state in aReplacingMergeTreekeyed by the requiredprimary_key(query it withSELECT ... FINAL). Theinit_modeparameter ("default","create_if_not_exists","replace") controls whether the connector creates the destination table, and the destination is validated at start-up so a missing table, column, or incompatible type is reported immediately. Most scalar,Optional,list,tuple, and 1-Dnp.ndarraycolumn types are supported (see the connector documentation for the full type mapping).pw.io.iceberg.readnow decodes every Iceberg primitive type. The new arms are:datematerializes asDateTimeNaiveat midnight on the calendar day (Pathway has no date-only type);timematerializes asDurationrepresenting microseconds since midnight (same convention as the PostgresTIMEmapping);uuidmaterializes as the canonical 8-4-4-4-12 hex string when the column is declared asstrin the Pathway schema (or as 16 rawbyteswhen declared asbytes);fixed(N)materializes asbytes;decimal(p, s)materializes as eitherfloat(lossy, with a one-shot startup warning naming each affected column) orstr(lossless decimal text — opt in by declaring the column asstr).pw.io.iceberg.writenow reconciles Pathway types against the destination table's existing schema and writes the narrower / alternatively-encoded representation when the target column already declares one: Pathwayintinto an existingint(32-bit) column with overflow detection, Pathwayfloatinto an existingfloat(32-bit) column (cast to 32-bit float; precision beyond ~7 significant decimal digits is lost), Pathwaystrinto an existingdecimal(p, s)column (parsed as decimal text) oruuidcolumn (parsed as canonical UUID hex), Pathwaybytesinto an existingfixed(N)column (length-checked), and PathwayDurationinto an existingtimecolumn (microseconds since midnight). When Pathway creates the destination table from a Pathway schema, the connector continues to emit the wide representations (longfrom Pathwayint/Duration,doublefromfloat,stringfromstr,binaryfrombytes); choosing a narrow / specialized type at create-time isn't exposed yet. Icebergdateis not supported on write at all — neither at create-time (Pathway has no date-only type to derive from) nor as an existing-column override. Icebergmap<K, V>remains unsupported on both sides.pw.io.iceberg.readandpw.io.iceberg.writenow support Icebergstruct<…>columns through Pathway's positionaltuple[…]type. Tuples are written with synthesized field names[0], [1], …(same conventionpw.io.deltalake.writealready uses). Reads ignore struct field names and bind tuple positions to struct field positions in the destination order; the mapping composes transitively, solist[tuple[…]]works as well. When writing into an existing table whose target column declares a struct with arbitrary field names, the writer adopts the destination's field names automatically, so the user'stuple[…]declaration only needs to align with the destination struct's field order — Pathway has no named-record type that would let a tuple bind to struct fields by name, so reordering the destination struct's fields out-of-band would silently misalign a Pathway pipeline declaring the column astuple[…].pw.io.mongodb.readnow accepts four additional BSON types that were previously dropped at parse time.ObjectIdandDecimal128map tostr(the canonical 24-character hex form and the canonical decimal string respectively);RegularExpressionmaps tostrformatted as"/<pattern>/<options>";Timestampmaps tointcarrying the seconds-since-epochtimecomponent (the companionincrementfield is dropped). When such a value is written back to MongoDB it is stored as an ordinary string (or integer forTimestamp) field rather than under its original BSON type, so a write-then-read round-trip preserves the value but not the original BSON type of the column.pw.io.postgres.writenow accepts aschema_nameparameter for writing to tables in non-default PostgreSQL schemas.pw.io.postgres.writenow supports pre-existingINET,CIDR,MACADDR, andMACADDR8columns from astrPathway column, matching the reader round-trip.pw.io.postgres.readandpw.io.postgres.writenow run extensive preflight validation that surfaces misconfigurations (PostgreSQL types that are not yet supported, array element type mismatches, nullability mismatches,REPLICA IDENTITY NOTHINGon non-append-only streaming tables, etc.) as clear pipeline-start errors instead of silent row drops or opaque worker panics.pw.io.mysql.readreads a MySQL table into Pathway. Inmode="streaming"(the default) it performs Change Data Capture by reading the MySQL binary log: it takes an initial snapshot and then continuously delivers inserts, updates, and deletes (requireslog_binon,binlog_format=ROW,binlog_row_image=FULL, and theREPLICATION SLAVE/REPLICATION CLIENTprivileges). Inmode="static"it reads the table once and terminates. The schema must declare at least one primary-key column. Every type produced bypw.io.mysql.writeround-trips back, and common native MySQL types (DECIMAL,DATE, integer and text families,JSON, …) are parsed as well. Unlike PostgreSQL logical replication, the connector leaves no server-side state behind — there is no replication slot to retain logs and fill the disk; binary-log retention is governed solely by the server's own settings. With persistence enabled, the streaming connector saves the binary-log coordinates and resumes from them on restart, raising a clear error if the needed binary logs have already been purged by the server's normal expiry.
Changed
pw.io.iceberg.readandpw.io.iceberg.writenow retry transient catalog errors automatically (e.g. concurrent-commit conflicts on write, transient REST/Glue catalog failures on read).pw.io.postgres.writenow retries transient PostgreSQL errors automatically — SQLSTATE class 08 (connection exceptions), class 57 (admin / crash shutdown,cannot_connect_now),serialization_failure(40001),deadlock_detected(40P01), and any closed connection are retried up to three times with exponential backoff before the writer surfaces the error. Permanent failures (syntax errors, missing tables, constraint violations) still propagate on the first attempt.pw.io.postgres.read(streaming mode) no longer requiresuser,password, orhostinpostgres_settings. Missing components are omitted from the connection string and resolved by PostgreSQL's standard client defaults (OS user,~/.pgpass, UNIX socket), matching how static mode has always behaved. This unblocks deployments authenticated viatrust,peer,cert, or other passwordlesspg_hba.confmodes.pw.io.postgresconnections now tag themselves in PostgreSQL asapplication_name=pathway[:<name>](where<name>comes from the connector'snameparameter), so operators can identify Pathway sessions inpg_stat_activity,pg_stat_replication, and server logs. The value is sanitized to printable ASCII and truncated to 63 bytes to match PostgreSQL'sNAMEDATALEN. A user-suppliedapplication_nameinpostgres_settingsis left untouched.pw.io.postgresconnections now default to TCP keepalives tuned for roughly five-minute dead-peer detection (keepalives_idle=300,keepalives_interval=30,keepalives_count=3, plustcp_user_timeout=300000), so a SIGKILL'd Pathway process releases its temporary replication slot in minutes rather than the OS-inherited ~2 hour timeline. Each value is only applied when the user has not already set it inpostgres_settings.pw.io.mssql.readandpw.io.mssql.writenow validate configuration and schemas at call/init time, producing clear errors for cases that previously surfaced as opaque SQL Server failures partway through the run: invalidprimary_key(passed instream_of_changesmode, with duplicates, referring to a different table, or withOptionaldtype), schema columns colliding with the auto-appendedtime/diffcolumns or differing only in letter case, non-existent source tables or columns, missing or incompatible destination columns (non-existent, IDENTITY, computed, or requiredNOT NULLcolumns absent from the Pathway schema),Optional[T]fields mapped toNOT NULLdestination columns, and empty or NUL-containingtable_name/schema_name.pw.io.mssql.writesnapshot mode now supportsbytes-typed primary keys, and verifies that the destination has a unique index covering exactly the configuredprimary_keycolumns — without it, the upsert could silently match the wrong rows.pw.io.mssql.readstreaming/CDC mode now handles previously silent edge cases: it errors when more than one CDC capture instance is registered on the source table, recovers the persistence offset from CDC when no events have been observed yet (so subsequent runs resume from CDC instead of re-snapshotting), and raises a clear error if CDC cleanup advances retention past the connector's last read position.pw.io.mssql.readaccepts SQL ServerNUMERIC(N, 0)columns into a Pathwayintschema and integer-family columns (TINYINT/SMALLINT/INT/BIGINT) into a Pathwayfloatschema, matching theint → floattolerance ofpw.io.sqlite.read.pw.io.mssql.readandpw.io.mssql.writenow correctly handle identifiers containing].pw.io.kafka.readnow emits aDeprecationWarning(instead of aSyntaxWarning) whentopicis passed as a list, and warns when an explicitly configuredauto.offset.resetis overridden becausestart_from_timestamp_msis set. It also logs a warning (previously an easy-to-miss info message) whenstart_from_timestamp_mslands at or past the end of a partition, since no already-written data will be read from it.pw.io.mysql.writenow retries only transient MySQL errors — connection drops, deadlocks, lock-wait timeouts, "too many connections", and "server is shutting down" — with exponential backoff, and lets permanent failures (missing tables, syntax errors, constraint violations) propagate on the first attempt. Previously every error was retried up to three times, delaying permanent failures by several seconds before they surfaced.
Fixed
pw.io.mssql.readinmode="streaming"no longer mistakes an unrelated table for a CDC-enabled one. SQL Server leaves a capture instance behind when a CDC-tracked table is dropped withoutsp_cdc_disable_table, and its danglingsource_object_idcan later be reused by a brand-new, non-CDC table; the CDC probe matched on that object id alone, so the reader would report the fresh table as CDC-enabled and then tail a stale change table forever instead of failing fast with a "CDC is not enabled on table" error. The probe now also requires the source table to currently exist and haveis_tracked_by_cdc = 1.pw.io.iceberg.readinmode="static"no longer hangs on an Iceberg table that has no current snapshot (e.g. a table Pathway just created but never wrote data to). The reader treated the absence of a snapshot as "wait for one to appear" — which never returned in static mode — and now correctly reports zero rows and exits.pw.io.iceberg.write'smin_commit_frequencynow actually rate-limits all commits over the lifetime of the run, not just the first one. Previously the last-commit timestamp was set at writer construction and never updated, so once the initial interval elapsed every subsequent minibatch was committed individually — producing one Iceberg snapshot per minibatch rather than at most one permin_commit_frequencywindow.pw.io.iceberg.writeinto an Iceberg table that was created outside Pathway (for example, a table pre-created via pyiceberg with a hand-rolled schema) now correctly writes each row's column data into the matching destination column. Previously the writer relied on column-position metadata that only happened to line up when Pathway had both created and written the table — so writes into externally-created tables either failed to load on read-back or silently bound data to the wrong destination columns.pw.io.postgres.readstreaming mode now correctly parses negative time components ofINTERVALvalues in PostgreSQL's default text format.pw.io.kafka.read,pw.io.kafka.writeandpw.io.kafka.simple_readnow validate their arguments when the connector is created and raise a clearValueError/TypeError, instead of deferring to an opaque error from the engine or librdkafka at run time. This covers, among others: an empty or missing topic name; a missing or emptybootstrap.servers; a missing or emptygroup.idon the reader; non-positivemax_backlog_size,parallel_readersorautocommit_duration_ms; a negativestart_from_timestamp_ms; a schema that declares the reserved_metadatacolumn;json_field_pathsentries that reference an unknown field or use an invalid JSON Pointer; and contradictory combinations such asread_only_new=Truewithmode="static".pw.io.kafka.writenow rejects configurations that would otherwise silently drop data or emit malformed messages: duplicate header names, header names colliding with the reservedpathway_time/pathway_diffheaders, table columns namedtime/diffunderformat="json", an emptysubject, and asubject/schema_registry_settingspair where only one side is provided or the format is notjson.pw.io.kafka.writenow honors its documentedsort_byparameter; previously the column reference was lost when the connector rebuilt its output table, raising an error at run time.pw.io.kafka.readwithmode="static"and astart_from_timestamp_mspast the end of the topic now returns immediately, instead of stalling until the static-read polling budget is exhausted.pw.io.kafka.SchemaRegistrySettingsandpw.io.kafka.SchemaRegistryHeadernow validate their fields on construction (theurlslist shape and non-emptiness, string-typed credentials, mutually exclusive authentication methods, and thetimeouttype and positivity), instead of failing later with an opaque error.pw.io.mongodb.readpersistence: on restart, the replayed change-stream events are now delivered atomically, preventing an edge case where a crash partway through the replay could skip events that had been read from MongoDB but not yet processed downstream.pw.io.mongodb.readinmode="streaming"no longer loses change events that are committed around connector startup. The reader now anchors its change stream to a cluster operation time captured before the initial collection snapshot and keeps a single cursor for both catch-up and live tailing, instead of capturing oplog tokens with short-lived streams and then re-opening a separate live stream. The previous handoff left a gap in which an event arriving during startup — most easily triggered when several MongoDB readers start concurrently against the same server — could be skipped and never delivered.- Passing a non-positive
max_batch_sizeto any output connector that accepts it now raises a clear error (max_batch_size must be a positive integer). Previously the value was handled inconsistently:0was silently accepted and disabled size-based batching entirely, while a negative value surfaced an opaqueOverflowError. pw.io.mysql.writenow rejects, at construction, a schema column namedtimeordiff(case-insensitive) instream_of_changesmode, where it would collide with thetime/diffmetadata columns the connector appends. Previously the conflict surfaced mid-run as an opaque MySQL "Duplicate column name" error. Rename the column or switch tooutput_table_type="snapshot", which does not append these columns.pw.io.dynamodb.writenow correctly handles which column types may serve as apartition_key/sort_key. Abytescolumn (serialized as the DynamoDBBinarytype) and apw.Jsoncolumn (serialized asString) are valid scalar key types but were previously rejected with a "can't be used in the index" error; they are now accepted. Conversely, aboolcolumn was previously allowed as a key and the auto-created table declared it asBinary, so every write then failed with an opaqueInvalid attribute value typeerror from DynamoDB —boolis now rejected up front with a clear message, since DynamoDB has no Boolean key type.pw.io.dynamodb.writeno longer fails when a row is updated within a single minibatch. The update emits a retraction of the old row and an insertion of the new one under the same primary key; previously both were sent in the sameBatchWriteItemrequest, which DynamoDB rejects withProvided list of item keys contains duplicates, so the write failed after exhausting its retries. The connector now reconciles the two operations per key within the batch (the insertion wins), so updates are applied as expected under the documented snapshot semantics.pw.io.questdb.writenow validates its designated-timestamp arguments when the connector is created and raises a clearValueError, instead of deferring to an opaque error from the engine at run time. This coversdesignated_timestamp_policy="use_column"passed without adesignated_timestampcolumn, and adesignated_timestampcolumn whose type is neitherDateTimeNaivenorDateTimeUtc.