-
Added
heartbeat
option to pull subscribersfetch
APIawait sub.fetch(1, timeout=1, heartbeat=0.1)
It can be useful to help distinguish API timeouts from not receiving messages:
try: await sub.fetch(100, timeout=1, heartbeat=0.2) except nats.js.errors.FetchTimeoutError: # timeout due to not receiving messages except asyncio.TimeoutError: # unexpected timeout
-
Added
subject_transform
toadd_consumer
await js.add_stream( name="TRANSFORMS", subjects=["test", "foo"], subject_transform=nats.js.api.SubjectTransform( src=">", dest="transformed.>" ), )
-
Added
subject_transform
to sources as well:transformed_source = nats.js.api.StreamSource( name="TRANSFORMS", # The source filters cannot overlap. subject_transforms=[ nats.js.api.SubjectTransform( src="transformed.>", dest="fromtest.transformed.>" ), nats.js.api.SubjectTransform( src="foo.>", dest="fromtest.foo.>" ), ], ) await js.add_stream( name="SOURCING", sources=[transformed_source], )
-
Added
backoff
option toadd_consumer
await js.add_consumer( "events", durable_name="a", max_deliver=3, # has to be greater than length as backoff array backoff=[1, 2], # defined in seconds ack_wait=999999, # ignored once using backoff max_ack_pending=3, filter_subject="events.>", )
-
Added
compression
toadd_consumer
await js.add_stream( name="COMPRESSION", subjects=["test", "foo"], compression="s2", )
-
Added
metadata
toadd_stream
await js.add_stream( name="META", subjects=["test", "foo"], metadata={'foo': 'bar'}, )