EventStoreDB Documentation
Getting started
  • v24.6
  • v24.2
  • v23.10
  • v22.10
  • v5
Connectors
  • Clients

    • EventStoreDB clients
  • HTTP API

    • v24.6
    • v24.2
    • v23.10
    • v22.10
    • v5
  • Deprecated

    • Legacy TCP clients
Cloud
  • Community forum
  • Articles
  • Webinars
  • Release notes
Getting started
  • v24.6
  • v24.2
  • v23.10
  • v22.10
  • v5
Connectors
  • Clients

    • EventStoreDB clients
  • HTTP API

    • v24.6
    • v24.2
    • v23.10
    • v22.10
    • v5
  • Deprecated

    • Legacy TCP clients
Cloud
  • Community forum
  • Articles
  • Webinars
  • Release notes

Clients


    • Getting started
    • Appending events
    • Reading events
    • Catch-up subscriptions
    • Persistent subscriptions
    • Projection management
    • Authentication
    • Observability

Reading events

There are two options for reading events from EventStoreDB. You can either: 1. Read from an individual stream, or 2. Read from the $all stream, which will return all events in the store.

Each event in EventStoreDB belongs to an individual stream. When reading events, pick the name of the stream from which you want to read the events and choose whether to read the stream forwards or backwards.

All events have a StreamPosition and a Position. StreamPosition is a big int (unsigned 64-bit integer) and represents the place of the event in the stream. Position is the event's logical position, and is represented by CommitPosition and a PreparePosition. Note that when reading events you will supply a different "position" depending on whether you are reading from an individual stream or the $all stream.

TIP

Check connecting to EventStoreDB instructions to learn how to configure and use the client SDK.

Reading from a stream

You can read all the events or a sample of the events from individual streams, starting from any position in the stream, and can read either forward or backward. It is only possible to read events from a single stream at a time. You can read events from the global event log, which spans across streams. Learn more about this process in the Read from $all section below.

Reading forwards

The simplest way to read a stream forwards is to supply a stream name, read direction, and revision from which to start. The revision can either be a stream position Start or a big int (unsigned 64-bit integer):

events = client.get_stream(
    stream_name=stream_name,
    stream_position=0,
    limit=100,
)
const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});
const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  maxCount: 10,
});
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readStream("some-stream", options)
        .get();
var events = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    StreamPosition.Start
);
options := esdb.ReadStreamOptions{
    From:      esdb.Start{},
    Direction: esdb.Forwards,
}
stream, err := db.ReadStream(context.Background(), "some-stream", options, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
let options = ReadStreamOptions::default()
    .position(StreamPosition::Start)
    .forwards();
let mut stream = client.read_stream("some-stream", &options).await?;

This will return an enumerable that can be iterated on:

for event in events:
    # Doing something productive with the event
    print(f"Event: {event}")
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
for (ResolvedEvent resolvedEvent : result.getEvents()) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
await foreach (var @event in events) Console.WriteLine(Encoding.UTF8.GetString(@event.Event.Data.ToArray()));
for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
while let Some(event) = stream.next().await? {
    let test_event = event.get_original_event().as_json::<TestEvent>()?;

    println!("Event> {:?}", test_event);
}

There are a number of additional arguments you can provide when reading a stream, listed below.

maxCount

Passing in the max count will limit the number of events returned.

resolveLinkTos

When using projections to create new events, you can set whether the generated events are pointers to existing events. Setting this value to true tells EventStoreDB to return the event as well as the event linking to it.

configureOperationOptions

You can use the configureOperationOptions argument to provide a function that will customise settings for each operation.

userCredentials

The userCredentials argument is optional. It is used to override the default credentials specified when creating the client instance.

credentials = client.construct_call_credentials(
    username="admin",
    password="changeit",
)

events = client.get_stream(
    stream_name=stream_name,
    credentials=credentials,
)
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  credentials,
  maxCount: 10,
});
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: START,
  credentials,
  maxCount: 10,
});
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromStart()
        .authenticated("admin", "changeit");

ReadResult result = client.readStream("some-stream", options)
        .get();
var result = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    StreamPosition.Start,
    userCredentials: new UserCredentials("admin", "changeit"),
    cancellationToken: cancellationToken
);
options := esdb.ReadStreamOptions{
    From: esdb.Start{},
    Authenticated: &esdb.Credentials{
        Login:    "admin",
        Password: "changeit",
    },
}
stream, err := db.ReadStream(context.Background(), "some-stream", options, 100)
let options = ReadStreamOptions::default()
    .position(StreamPosition::Start)
    .authenticated(Credentials::new("admin", "changeit"));

let stream = client.read_stream("some-stream", &options).await;

Reading from a revision

Instead of providing the StreamPosition you can also provide a specific stream revision as a big int (unsigned 64-bit integer).

events = client.get_stream(
    stream_name=stream_name,
    stream_position=10,
)
const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});
const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromRevision(10)
        .maxCount(20);

ReadResult result = client.readStream("some-stream", options)
        .get();
var events = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    10,
    20
);
ropts := esdb.ReadStreamOptions{
    From: esdb.Revision(10),
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 20)

if err != nil {
    panic(err)
}

defer stream.Close()
let options = ReadStreamOptions::default()
    .position(StreamPosition::Position(10))
    .max_count(20);
let mut stream = client.read_stream("some-stream", &options).await?;

Reading backwards

In addition to reading a stream forwards, streams can be read backwards. To read all the events backwards, set the stream position to the end:

events = client.get_stream(
    stream_name=stream_name,
    backwards=True,
)
const events = client.readStream("some-stream", {
  direction: BACKWARDS,
  fromRevision: END,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
const events = client.readStream<SomeEvent>("some-stream", {
  direction: BACKWARDS,
  fromRevision: END,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
ReadStreamOptions options = ReadStreamOptions.get()
        .backwards()
        .fromEnd();

ReadResult result = client.readStream("some-stream", options)
        .get();

for (ResolvedEvent resolvedEvent : result.getEvents()) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
var events = client.ReadStreamAsync(
    Direction.Backwards,
    "some-stream",
    StreamPosition.End
);

await foreach (var e in events) Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
ropts := esdb.ReadStreamOptions{
    Direction: esdb.Backwards,
    From:      esdb.End{},
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 10)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
let options = ReadStreamOptions::default()
    .position(StreamPosition::End)
    .backwards();
let mut stream = client.read_stream("some-stream", &options).await?;

while let Some(event) = stream.next().await? {
    let test_event = event.get_original_event().as_json::<TestEvent>()?;

    println!("Event> {:?}", test_event);
}

TIP

Read one event backwards to find the last position in the stream.

Checking if the stream exists

Reading a stream returns a ReadStreamResult, which contains a property ReadState. This property can have the value StreamNotFound or Ok.

It is important to check the value of this field before attempting to iterate an empty stream, as it will throw an exception.

For example:

try:
    events = client.get_stream(
        stream_name=unknown_stream_name, limit=1
    )
except exceptions.NotFound:
    print("The stream was not found")
const events = client.readStream("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});

try {
  for await (const resolvedEvent of events) {
    console.log(resolvedEvent.event?.data);
  }
} catch (error) {
  if (error instanceof StreamNotFoundError) {
    return;
  }

  throw error;
}
const events = client.readStream<SomeEvent>("some-stream", {
  direction: FORWARDS,
  fromRevision: BigInt(10),
  maxCount: 20,
});

try {
  for await (const resolvedEvent of events) {
    console.log(resolvedEvent.event?.data);
  }
} catch (error) {
  if (error instanceof StreamNotFoundError) {
    return;
  }

  throw error;
}
ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromRevision(10)
        .maxCount(20);

ReadResult result = null;
try {
    result = client.readStream("some-stream", options)
            .get();
} catch (ExecutionException e) {
    Throwable innerException = e.getCause();

    if (innerException instanceof StreamNotFoundException) {
        return;
    }
}

for (ResolvedEvent resolvedEvent : result.getEvents()) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
var result = client.ReadStreamAsync(
    Direction.Forwards,
    "some-stream",
    10,
    20
);

if (await result.ReadState == ReadState.StreamNotFound) return;

await foreach (var e in result) Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
ropts := esdb.ReadStreamOptions{
    From: esdb.Revision(10),
}

stream, err := db.ReadStream(context.Background(), "some-stream", ropts, 100)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if err, ok := esdb.FromError(err); !ok {
        if err.Code() == esdb.ErrorCodeResourceNotFound {
            fmt.Print("Stream not found")
        } else if errors.Is(err, io.EOF) {
            break
        } else {
            panic(err)
        }
    }

    fmt.Printf("Event> %v", event)
}
let options = ReadStreamOptions::default().position(StreamPosition::Position(10));

let mut stream = client.read_stream("some-stream", &options).await?;

while let Some(event) = stream.next().await? {
    let test_event = event.get_original_event().as_json::<TestEvent>()?;

    println!("Event> {:?}", test_event);
}

Reading from the $all stream

Reading from the $all stream is similar to reading from an individual stream, but please note there are differences. One significant difference is the need to provide admin user account credentials to read from the $all stream. Additionally, you need to provide a transaction log position instead of a stream revision when reading from the $all stream.

Reading forwards

The simplest way to read the $all stream forwards is to supply a read direction and the transaction log position from which you want to start. The transaction log postion can either be a stream position Start or a big int (unsigned 64-bit integer):

events = client.read_all(
    commit_position=0,
    limit=100,
)
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readAll(options)
        .get();
var events = client.ReadAllAsync(Direction.Forwards, Position.Start);
options := esdb.ReadAllOptions{
    From:      esdb.Start{},
    Direction: esdb.Forwards,
}
stream, err := db.ReadAll(context.Background(), options, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
let options = ReadAllOptions::default()
    .position(StreamPosition::Start)
    .forwards();
let mut stream = client.read_all(&Default::default()).await?;

You can iterate asynchronously through the result:

for event in events:
    print(f"Event: {event}")
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
for await (const resolvedEvent of events) {
  console.log(resolvedEvent.event?.data);
}
for (ResolvedEvent resolvedEvent : result.getEvents()) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
await foreach (var e in events) Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)
}
while let Some(event) = stream.next().await? {
    println!("Event> {:?}", event.get_original_event());
}

There are a number of additional arguments you can provide when reading the $all stream.

maxCount

Passing in the max count allows you to limit the number of events that returned.

resolveLinkTos

When using projections to create new events you can set whether the generated events are pointers to existing events. Setting this value to true will tell EventStoreDB to return the event as well as the event linking to it.

events = client.read_all(
    commit_position=0,
    limit=100,
    resolve_links=True,
)
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  resolveLinkTos: true,
  maxCount: 10,
});
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  resolveLinkTos: true,
  maxCount: 10,
});
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart()
        .resolveLinkTos();

ReadResult result = client.readAll(options)
        .get();
var result = client.ReadAllAsync(
    Direction.Forwards,
    Position.Start,
    resolveLinkTos: true,
    cancellationToken: cancellationToken
);
ropts := esdb.ReadAllOptions{
    ResolveLinkTos: true,
}

stream, err := db.ReadAll(context.Background(), ropts, 100)
let options = ReadAllOptions::default().resolve_link_tos();
client.read_all(&options).await?;

configureOperationOptions

This argument is generic setting class for all operations that can be set on all operations executed against EventStoreDB.

userCredentials

The credentials used to read the data can be used by the subscription as follows. This will override the default credentials set on the connection.

credentials = client.construct_call_credentials(
    username="admin",
    password="changeit",
)

events = client.read_all(
    commit_position=0,
    limit=100,
    credentials=credentials,
)
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  credentials,
  maxCount: 10,
});
const credentials = {
  username: "admin",
  password: "changeit",
};

const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  credentials,
  maxCount: 10,
});
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart()
        .authenticated("admin", "changeit");

ReadResult result = client.readAll(options)
        .get();
var result = client.ReadAllAsync(
    Direction.Forwards,
    Position.Start,
    userCredentials: new UserCredentials("admin", "changeit"),
    cancellationToken: cancellationToken
);
ropts := esdb.ReadAllOptions{
    From: esdb.Start{},
    Authenticated: &esdb.Credentials{
        Login:    "admin",
        Password: "changeit",
    },
}
stream, err := db.ReadAll(context.Background(), ropts, 100)
let options = ReadAllOptions::default()
    .authenticated(Credentials::new("admin", "changeit"))
    .position(StreamPosition::Position(Position {
        commit: 1_110,
        prepare: 1_110,
    }));
let stream = client.read_all(&options).await;

Reading backwards

In addition to reading the $all stream forwards, it can be read backwards. To read all the events backwards, set the position to the end:

events = client.read_all(
    backwards=True,
    limit=100,
)
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  maxCount: 10,
});
const events = client.readAll({
  direction: BACKWARDS,
  fromPosition: END,
  maxCount: 10,
});
ReadAllOptions options = ReadAllOptions.get()
        .backwards()
        .fromEnd();

ReadResult result = client.readAll(options)
        .get();
var events = client.ReadAllAsync(Direction.Backwards, Position.End);
ropts := esdb.ReadAllOptions{
    Direction: esdb.Backwards,
    From:      esdb.End{},
}

stream, err := db.ReadAll(context.Background(), ropts, 100)

if err != nil {
    panic(err)
}

defer stream.Close()
let options = ReadAllOptions::default().position(StreamPosition::End);

let mut stream = client.read_all(&options).await?;

TIP

Read one event backwards to find the last position in the $all stream.

Handling system events

EventStoreDB will also return system events when reading from the $all stream. In most cases you can ignore these events.

All system events begin with $ or $$ and can be easily ignored by checking the EventType property.

events = client.read_all(
    filter_exclude=[],  # system events are excluded by default
)

for event in events:
    if event.type.startswith("$"):
        continue

    print(f"Event: {event.type}")
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  if (resolvedEvent.event?.type.startsWith("$")) {
    continue;
  }

  console.log(resolvedEvent.event?.type);
}
const events = client.readAll({
  direction: FORWARDS,
  fromPosition: START,
  maxCount: 10,
});

for await (const resolvedEvent of events) {
  if (resolvedEvent.event?.type.startsWith("$")) {
    continue;
  }

  console.log(resolvedEvent.event?.type);
}
ReadAllOptions options = ReadAllOptions.get()
        .forwards()
        .fromStart();

ReadResult result = client.readAll(options)
        .get();

for (ResolvedEvent resolvedEvent : result.getEvents()) {
    RecordedEvent recordedEvent = resolvedEvent.getOriginalEvent();
    if (recordedEvent.getEventType().startsWith("$")) {
        continue;
    }
    System.out.println(new ObjectMapper().writeValueAsString(recordedEvent.getEventData()));
}
var events = client.ReadAllAsync(Direction.Forwards, Position.Start);

await foreach (var e in events) {
    if (e.Event.EventType.StartsWith("$")) continue;

    Console.WriteLine(Encoding.UTF8.GetString(e.Event.Data.ToArray()));
}
stream, err := db.ReadAll(context.Background(), esdb.ReadAllOptions{}, 100)

if err != nil {
    panic(err)
}

defer stream.Close()

for {
    event, err := stream.Recv()

    if errors.Is(err, io.EOF) {
        break
    }

    if err != nil {
        panic(err)
    }

    fmt.Printf("Event> %v", event)

    if strings.HasPrefix(event.OriginalEvent().EventType, "$") {
        continue
    }

    fmt.Printf("Event> %v", event)
}
let mut stream = client.read_all(&Default::default()).await?;

while let Some(event) = stream.next().await? {
    if event.get_original_event().event_type.starts_with("$") {
        continue;
    }

    println!("Event> {:?}", event.get_original_event());
}
Last Updated:
Contributors: Faheem Muhammad Ramjaun
Prev
Appending events
Next
Catch-up subscriptions