base on Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
<p align="center">
<img src="icon.png" width=50% height=50% alt="Bento">
</p>
[![godoc for warpstreamlabs/bento][godoc-badge]][godoc-url]
[![Build Status][actions-badge]][actions-url]
[![Docs site][website-badge]][website-url]
[Discord](https://console.warpstream.com/socials/discord)
[Slack](https://console.warpstream.com/socials/slack)
Bento is a high performance and resilient stream processor, able to connect various [sources][inputs] and [sinks][outputs] in a range of brokering patterns and perform [hydration, enrichments, transformations and filters][processors] on payloads.
It comes with a [powerful mapping language][bloblang-about], is easy to deploy and monitor, and ready to drop into your pipeline either as a static binary, docker image, or [serverless function][serverless], making it cloud native as heck.
Bento is declarative, with stream pipelines defined in as few as a single config file, allowing you to specify connectors and a list of processing stages:
```yaml
input:
gcp_pubsub:
project: foo
subscription: bar
pipeline:
processors:
- mapping: |
root.message = this
root.meta.link_count = this.links.length()
root.user.age = this.user.age.number()
output:
redis_streams:
url: tcp://TODO:6379
stream: baz
max_in_flight: 20
```
### Delivery Guarantees
Delivery guarantees [can be a dodgy subject](https://youtu.be/QmpBOCvY8mY). Bento processes and acknowledges messages using an in-process transaction model with no need for any disk persisted state, so when connecting to at-least-once sources and sinks it's able to guarantee at-least-once delivery even in the event of crashes, disk corruption, or other unexpected server faults.
This behaviour is the default and free of caveats, which also makes deploying and scaling Bento much simpler.
## Supported Sources & Sinks
AWS (DynamoDB, Kinesis, S3, SQS, SNS), Azure (Blob storage, Queue storage, Table storage), GCP (Pub/Sub, Cloud storage, Big query), Kafka, NATS (JetStream, Streaming), NSQ, MQTT, AMQP 0.91 (RabbitMQ), AMQP 1, Redis (streams, list, pubsub, hashes), Cassandra, Elasticsearch, HDFS, HTTP (server and client, including websockets), MongoDB, SQL (MySQL, PostgreSQL, Clickhouse, MSSQL), and [you know what just click here to see them all, they don't fit in a README][about-categories].
Connectors are being added constantly, if something you want is missing then [open an issue](https://github.com/warpstreamlabs/bento/issues/new).
## Documentation
If you want to dive fully into Bento then don't waste your time in this dump, check out the [documentation site][general-docs].
For guidance on how to configure more advanced stream processing concepts such as stream joins, enrichment workflows, etc, check out the [cookbooks section.][cookbooks]
For guidance on building your own custom plugins in Go check out [the public APIs.][godoc-url]
## Install
We're working on the release process, but you can either compile from source or pull the docker image:
```
docker pull ghcr.io/warpstreamlabs/bento
```
For more information check out the [getting started guide][getting-started].
## Run
```shell
bento -c ./config.yaml
```
Or, with docker:
```shell
# Using a config file
docker run --rm -v /path/to/your/config.yaml:/bento.yaml ghcr.io/warpstreamlabs/bento
# Using a series of -s flags
docker run --rm -p 4195:4195 ghcr.io/warpstreamlabs/bento \
-s "input.type=http_server" \
-s "output.type=kafka" \
-s "output.kafka.addresses=kafka-server:9092" \
-s "output.kafka.topic=bento_topic"
```
## Monitoring
### Health Checks
Bento serves two HTTP endpoints for health checks:
- `/ping` can be used as a liveness probe as it always returns a 200.
- `/ready` can be used as a readiness probe as it serves a 200 only when both the input and output are connected, otherwise a 503 is returned.
### Metrics
Bento [exposes lots of metrics][metrics] either to Statsd, Prometheus, a JSON HTTP endpoint, [and more][metrics].
### Tracing
Bento also [emits open telemetry tracing events][tracers], which can be used to visualise the processors within a pipeline.
## Configuration
Bento provides lots of tools for making configuration discovery, debugging and organisation easy. You can [read about them here][config-doc].
## Build
Build with Go (any [currently supported version](https://go.dev/dl/)):
```shell
git clone
[email protected]:warpstreamlabs/bento
cd bento
make
go build -o bento ./cmd/bento/main.go
```
## Lint
Bento uses [golangci-lint][golangci-lint] for linting, which you can install with:
```shell
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
```
And then run it with `make lint`.
## Plugins
It's pretty easy to write your own custom plugins for Bento in Go, for information check out [the API docs][godoc-url].
## Extra Plugins
By default Bento does not build with components that require linking to external libraries, such as the `zmq4` input and outputs. If you wish to build Bento locally with these dependencies then set the build tag `x_bento_extra`:
```shell
# With go
go install -tags "x_bento_extra" github.com/warpstreamlabs/bento/cmd/bento@latest
# Using make
make TAGS=x_bento_extra
```
Note that this tag may change or be broken out into granular tags for individual components outside of major version releases. If you attempt a build and these dependencies are not present you'll see error messages such as `ld: library not found for -lzmq`.
## Docker Builds
There's a multi-stage `Dockerfile` for creating a Bento docker image which results in a minimal image from scratch. You can build it with:
```shell
make docker
```
Then use the image:
```shell
docker run --rm \
-v /path/to/your/bento.yaml:/config.yaml \
-v /tmp/data:/data \
-p 4195:4195 \
bento -c /config.yaml
```
## Contributing
Contributions are welcome, please [read the guidelines](CONTRIBUTING.md), come and chat (links are on the [community page][community]), and watch your back.
[inputs]: https://warpstreamlabs.github.io/bento/docs/components/inputs/about
[about-categories]: https://warpstreamlabs.github.io/bento/docs/about#components
[processors]: https://warpstreamlabs.github.io/bento/docs/components/processors/about
[outputs]: https://warpstreamlabs.github.io/bento/docs/components/outputs/about
[metrics]: https://warpstreamlabs.github.io/bento/docs/components/metrics/about
[tracers]: https://warpstreamlabs.github.io/bento/docs/components/tracers/about
[config-interp]: https://warpstreamlabs.github.io/bento/docs/configuration/interpolation
[streams-api]: https://warpstreamlabs.github.io/bento/docs/guides/streams_mode/streams_api
[streams-mode]: https://warpstreamlabs.github.io/bento/docs/guides/streams_mode/about
[general-docs]: https://warpstreamlabs.github.io/bento/docs/about
[bloblang-about]: https://warpstreamlabs.github.io/bento/docs/guides/bloblang/about
[config-doc]: https://warpstreamlabs.github.io/bento/docs/configuration/about
[serverless]: https://warpstreamlabs.github.io/bento/docs/guides/serverless/about
[cookbooks]: https://warpstreamlabs.github.io/bento/cookbooks
[releases]: https://github.com/warpstreamlabs/bento/releases
[plugin-repo]: https://github.com/warpstreamlabs/bento-plugin-example
[getting-started]: https://warpstreamlabs.github.io/bento/docs/guides/getting_started
[godoc-badge]: https://pkg.go.dev/badge/github.com/warpstreamlabs/bento/public
[godoc-url]: https://pkg.go.dev/github.com/warpstreamlabs/bento/public
[actions-badge]: https://github.com/warpstreamlabs/bento/actions/workflows/test.yml/badge.svg
[actions-url]: https://github.com/warpstreamlabs/bento/actions/workflows/test.yml
[website-badge]: https://img.shields.io/badge/Docs-Learn%20more-ffc7c7
[website-url]: https://warpstreamlabs.github.io/bento/
[community]: https://warpstreamlabs.github.io/bento/community
[golangci-lint]: https://golangci-lint.run/
[jaeger]: https://www.jaegertracing.io/
", Assign "at most 3 tags" to the expected json: {"id":"10572","tags":[]} "only from the tags list I provide: [{"id":77,"name":"3d"},{"id":89,"name":"agent"},{"id":17,"name":"ai"},{"id":54,"name":"algorithm"},{"id":24,"name":"api"},{"id":44,"name":"authentication"},{"id":3,"name":"aws"},{"id":27,"name":"backend"},{"id":60,"name":"benchmark"},{"id":72,"name":"best-practices"},{"id":39,"name":"bitcoin"},{"id":37,"name":"blockchain"},{"id":1,"name":"blog"},{"id":45,"name":"bundler"},{"id":58,"name":"cache"},{"id":21,"name":"chat"},{"id":49,"name":"cicd"},{"id":4,"name":"cli"},{"id":64,"name":"cloud-native"},{"id":48,"name":"cms"},{"id":61,"name":"compiler"},{"id":68,"name":"containerization"},{"id":92,"name":"crm"},{"id":34,"name":"data"},{"id":47,"name":"database"},{"id":8,"name":"declarative-gui "},{"id":9,"name":"deploy-tool"},{"id":53,"name":"desktop-app"},{"id":6,"name":"dev-exp-lib"},{"id":59,"name":"dev-tool"},{"id":13,"name":"ecommerce"},{"id":26,"name":"editor"},{"id":66,"name":"emulator"},{"id":62,"name":"filesystem"},{"id":80,"name":"finance"},{"id":15,"name":"firmware"},{"id":73,"name":"for-fun"},{"id":2,"name":"framework"},{"id":11,"name":"frontend"},{"id":22,"name":"game"},{"id":81,"name":"game-engine "},{"id":23,"name":"graphql"},{"id":84,"name":"gui"},{"id":91,"name":"http"},{"id":5,"name":"http-client"},{"id":51,"name":"iac"},{"id":30,"name":"ide"},{"id":78,"name":"iot"},{"id":40,"name":"json"},{"id":83,"name":"julian"},{"id":38,"name":"k8s"},{"id":31,"name":"language"},{"id":10,"name":"learning-resource"},{"id":33,"name":"lib"},{"id":41,"name":"linter"},{"id":28,"name":"lms"},{"id":16,"name":"logging"},{"id":76,"name":"low-code"},{"id":90,"name":"message-queue"},{"id":42,"name":"mobile-app"},{"id":18,"name":"monitoring"},{"id":36,"name":"networking"},{"id":7,"name":"node-version"},{"id":55,"name":"nosql"},{"id":57,"name":"observability"},{"id":46,"name":"orm"},{"id":52,"name":"os"},{"id":14,"name":"parser"},{"id":74,"name":"react"},{"id":82,"name":"real-time"},{"id":56,"name":"robot"},{"id":65,"name":"runtime"},{"id":32,"name":"sdk"},{"id":71,"name":"search"},{"id":63,"name":"secrets"},{"id":25,"name":"security"},{"id":85,"name":"server"},{"id":86,"name":"serverless"},{"id":70,"name":"storage"},{"id":75,"name":"system-design"},{"id":79,"name":"terminal"},{"id":29,"name":"testing"},{"id":12,"name":"ui"},{"id":50,"name":"ux"},{"id":88,"name":"video"},{"id":20,"name":"web-app"},{"id":35,"name":"web-server"},{"id":43,"name":"webassembly"},{"id":69,"name":"workflow"},{"id":87,"name":"yaml"}]" returns me the "expected json"