Skip to content

Connectors with Bento

FlowMQ integrates with Bento, a powerful connector framework that provides access to 300+ databases, cloud services, applications, and systems. This integration enables seamless data movement and real-time synchronization across your entire technology stack.

Overview

Bento provides a unified connector framework that allows FlowMQ to:

  • Connect to 300+ Systems: Databases, cloud services, APIs, and applications
  • Real-time Data Sync: Bidirectional data flow with change data capture
  • Protocol Translation: Convert between different data formats and protocols
  • Event Streaming: Capture and stream events from any connected system
  • Data Transformation: Transform data between different schemas and formats

Connector Categories

Databases

Relational Databases

  • PostgreSQL: Full CDC support with logical replication
  • MySQL: Binary log replication and change tracking
  • Oracle: LogMiner and GoldenGate integration
  • SQL Server: Change Data Capture and Always On
  • MariaDB: Binary log streaming
  • CockroachDB: Distributed SQL with CDC
  • TiDB: MySQL-compatible distributed database

NoSQL Databases

  • MongoDB: Change streams and oplog replication
  • Cassandra: Commit log streaming
  • Redis: Pub/sub and stream processing
  • DynamoDB: Streams and DynamoDB Streams
  • CouchDB: Changes feed and replication
  • InfluxDB: Time-series data streaming
  • TimescaleDB: Time-series PostgreSQL extension

Data Warehouses

  • Snowflake: Snowpipe and change tracking
  • BigQuery: Real-time streaming inserts
  • Redshift: Change data capture
  • Databricks: Delta Lake streaming
  • ClickHouse: High-performance analytics
  • Apache Druid: Real-time analytics

Cloud Services

AWS Services

  • S3: Object storage events and streaming
  • DynamoDB: Streams and change tracking
  • RDS: Database change capture
  • Lambda: Serverless function triggers
  • SQS/SNS: Message queuing and notifications
  • Kinesis: Real-time data streaming
  • EventBridge: Event routing and processing

Google Cloud

  • BigQuery: Real-time data analytics
  • Cloud Storage: Object storage events
  • Cloud SQL: Database change capture
  • Pub/Sub: Message queuing
  • Dataflow: Stream processing
  • Firestore: Document database changes

Azure Services

  • Azure SQL: Change data capture
  • Cosmos DB: Change feed streaming
  • Blob Storage: Storage events
  • Event Hubs: Real-time event streaming
  • Service Bus: Message queuing
  • Data Lake: Big data storage

Other Cloud Services

  • MongoDB Atlas: Managed MongoDB with streams
  • Redis Cloud: Managed Redis with pub/sub
  • Confluent Cloud: Managed Kafka platform
  • Elastic Cloud: Managed Elasticsearch
  • Databricks: Unified analytics platform

Applications

CRM & Marketing

  • Salesforce: Real-time platform events
  • HubSpot: Contact and deal changes
  • Pipedrive: Deal and contact updates
  • Zoho CRM: Customer data changes
  • Mailchimp: Email campaign events
  • Intercom: Customer messaging events

E-commerce

  • Shopify: Order and product updates
  • WooCommerce: WordPress e-commerce
  • Magento: E-commerce platform events
  • Stripe: Payment processing events
  • PayPal: Payment gateway events
  • Square: Point-of-sale data

Business Applications

  • Slack: Channel and message events
  • Microsoft Teams: Chat and meeting events
  • Zoom: Meeting and webinar events
  • Notion: Database and page changes
  • Airtable: Base and record updates
  • Monday.com: Project management events

Developer Tools

  • GitHub: Repository and issue events
  • GitLab: CI/CD and repository events
  • Jira: Issue and project updates
  • Confluence: Page and space changes
  • Trello: Board and card updates
  • Asana: Task and project changes

Systems & Infrastructure

Monitoring & Observability

  • Prometheus: Metrics collection
  • Grafana: Dashboard and alert events
  • Datadog: Monitoring and APM data
  • New Relic: Application performance data
  • Splunk: Log and event data
  • Elasticsearch: Search and analytics

DevOps & CI/CD

  • Jenkins: Build and deployment events
  • GitLab CI: Pipeline execution events
  • GitHub Actions: Workflow execution
  • CircleCI: Build and test events
  • Docker: Container lifecycle events
  • Kubernetes: Pod and service events

IoT & Edge

  • MQTT Brokers: IoT device data
  • OPC UA: Industrial automation data
  • Modbus: Industrial protocol data
  • LoRaWAN: Low-power wide-area networks
  • BLE: Bluetooth Low Energy devices
  • Zigbee: Wireless sensor networks

Integration Patterns

Database to Message Queue

yaml
# PostgreSQL to FlowMQ
source:
  type: postgresql
  host: localhost
  port: 5432
  database: myapp
  username: user
  password: pass
  tables:
    - users
    - orders
    - products

sink:
  type: flowmq
  url: your-namespace.flowmq.io
  topics:
    users: "data/users"
    orders: "data/orders"
    products: "data/products"

Application to Database

yaml
# Salesforce to PostgreSQL
source:
  type: salesforce
  username: user@company.com
  password: pass
  security_token: token
  objects:
    - Account
    - Contact
    - Opportunity

sink:
  type: postgresql
  host: localhost
  port: 5432
  database: salesforce_sync
  tables:
    Account: accounts
    Contact: contacts
    Opportunity: opportunities

Real-time Event Streaming

yaml
# Shopify to Multiple Destinations
source:
  type: shopify
  shop_name: mystore
  access_token: token
  webhooks:
    - orders/create
    - orders/updated
    - products/create

sinks:
  - type: flowmq
    url: your-namespace.flowmq.io
    topics:
      orders: "ecommerce/orders"
      products: "ecommerce/products"
  
  - type: postgresql
    host: localhost
    database: analytics
    tables:
      orders: shopify_orders
      products: shopify_products
  
  - type: elasticsearch
    hosts: ["localhost:9200"]
    index: shopify_events

Configuration Examples

Change Data Capture

yaml
# MySQL CDC Configuration
source:
  type: mysql
  host: localhost
  port: 3306
  username: user
  password: pass
  database: myapp
  cdc:
    enabled: true
    binlog_position: "mysql-bin.000001:1234"
    tables:
      - users
      - orders
    include_schema_changes: true

sink:
  type: flowmq
  url: your-namespace.flowmq.io
  topics:
    users: "cdc/users"
    orders: "cdc/orders"

API Integration

yaml
# REST API to FlowMQ
source:
  type: http
  url: "https://api.example.com/events"
  method: GET
  headers:
    Authorization: "Bearer token"
  polling_interval: 30s
  pagination:
    type: cursor
    next_page_field: "next_cursor"

sink:
  type: flowmq
  url: your-namespace.flowmq.io
  topic: "api/events"

File Processing

yaml
# S3 to FlowMQ
source:
  type: s3
  bucket: my-data-bucket
  prefix: events/
  region: us-east-1
  credentials:
    access_key_id: key
    secret_access_key: secret
  file_types:
    - json
    - csv
    - parquet

sink:
  type: flowmq
  url: your-namespace.flowmq.io
  topic: "s3/events"

Data Transformation

Schema Mapping

yaml
# Transform data between different schemas
transformations:
  - type: field_mapping
    source_field: "user_id"
    target_field: "id"
  
  - type: field_mapping
    source_field: "created_at"
    target_field: "timestamp"
    format: "unix_timestamp"
  
  - type: field_removal
    fields: ["internal_id", "temp_data"]
  
  - type: field_addition
    fields:
      source: "api"
      version: "1.0"

Data Enrichment

yaml
# Enrich data with external sources
enrichment:
  - type: lookup
    source: postgresql
    table: user_profiles
    join_field: user_id
    target_field: user_id
    add_fields:
      - name
      - email
      - subscription_tier
  
  - type: http_request
    url: "https://api.geolocation.com/ip/{ip_address}"
    method: GET
    target_field: ip_address
    add_fields:
      - country
      - city
      - timezone

Performance Optimization

Parallel Processing

yaml
# Enable parallel processing
performance:
  parallelism: 4
  batch_size: 1000
  buffer_size: 10000
  timeout: 30s

Caching

yaml
# Configure caching for lookups
caching:
  enabled: true
  ttl: 3600s
  max_size: 10000
  strategy: lru

Monitoring & Observability

Metrics Collection

yaml
# Enable metrics and monitoring
monitoring:
  metrics:
    enabled: true
    port: 9090
    path: /metrics
  
  logging:
    level: info
    format: json
  
  health_check:
    enabled: true
    port: 8080
    path: /health

Error Handling

yaml
# Configure error handling and retries
error_handling:
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1s
  
  dead_letter_queue:
    enabled: true
    topic: "dlq/errors"
  
  circuit_breaker:
    enabled: true
    failure_threshold: 5
    recovery_timeout: 60s

Best Practices

  1. Choose Appropriate Connectors

    • Use CDC for databases when possible
    • Use webhooks for real-time application events
    • Use polling for APIs without webhooks
  2. Optimize Performance

    • Configure appropriate batch sizes
    • Use parallel processing for high-volume data
    • Implement caching for frequent lookups
  3. Handle Data Quality

    • Validate data at source and destination
    • Implement error handling and retries
    • Use dead letter queues for failed messages
  4. Monitor and Alert

    • Set up metrics collection
    • Configure alerts for failures
    • Monitor data latency and throughput

Next Steps

Future Enhancements

Bento integration with FlowMQ will continue to evolve with:

  • More Connectors: Additional 300+ integrations
  • Advanced Transformations: Complex data processing capabilities
  • Visual Configuration: GUI for connector setup
  • Enhanced Monitoring: Real-time dashboards and alerts
  • Auto-scaling: Dynamic resource allocation
  • Machine Learning: Intelligent data routing and processing