Orchestrating an AI Agent Prototype with Hex, Dagster, and LangChain 🤖

Orchestrating an AI Agent Prototype with Hex, Dagster, and LangChain 🤖

The need for automation in data analysis, especially around complex topics like climate change, has never been greater. In this post, I’ll walk you through how I’m developing an Investigative Report AI Agent prototype as part of my Climate Resilience Data Platform. This agent processes large-scale online conversations about climate change, automates research tasks, and generates investigative reports. The backbone of this workflow relies on Hex, Dagster, and LangChain to ensure a smooth orchestration of the AI’s tasks.

To get a better understanding of how this prototype works, here’s a video walkthrough where I demonstrate the end-to-end workflow, from orchestrating the agent to running it on a real conversation dataset:

The Challenge: Analyzing Climate Change Discourse at Scale

The Climate Resilience Data Platform is designed to track and analyze how climate change is discussed in media articles and social networks. These discussions span different geographic locations and evolve over time, presenting a complex and dynamic challenge for researchers. By collecting and processing this vast amount of data, the platform provides insights into public perception of climate change.

Its AI-driven approach to data processing is at the heart of the platform’s success. AI agents powered by LangChain enrich and transform the raw conversation data. These agents perform a range of tasks, such as:

  • Conversation Classification: Determining whether conversations are related to climate change.
  • Narrative Association: Mapping conversations to specific climate discourse types (biophysical, critical, dismissive, integrative).

With these capabilities, the platform goes beyond simple keyword analysis, offering a more nuanced understanding of climate change narratives. However, automating the collection, research, and generation of insights from these conversations requires a robust orchestration and execution framework, which is where Hex, Dagster, and LangChain come into play.

The critical problem is automating the generation of investigative reports from this wealth of online conversation data. With automation, it is possible to sift through, analyze, and generate meaningful insights from the vast array of climate-related content generated daily. The Investigative Report AI Agent fills this gap by pulling data, performing additional research as needed, and automatically generating a structured summary of its findings.

The AI Agent Orchestration: Design and Role Breakdown

To automate the process of analyzing and summarizing online climate change conversations, I’m using an orchestration system that combines several powerful tools: Dagster, Hex, LangChain, and LangSmith. Each of these tools plays a crucial role in building the overall workflow.

Orchestration of AI Agent
  • Dagster: Responsible for the orchestration layer, Dagster manages the entire process, from triggering the agent to handling downstream assets. It schedules tasks, manages dependencies, and logs results.
  • Hex: Hex is at the heart of the AI agent’s functionality in this prototype. It is where the core logic of the AI agent resides. Hex powers the data processing, agent operations, and runs the investigative research workflows on conversations.
  • LangChain: Provides the framework for interacting with LLM models. LangChain handles the logic for the AI agent’s decision-making process, determining whether there’s enough information to generate a report or if more research is needed.
  • LangSmith: Used for monitoring and optimizing the agent’s performance. LangSmith tracks the agent’s runs, providing detailed metrics like token usage, response time, and costs associated with each interaction with the language models.

Together, these tools form a robust, scalable system for ingesting large datasets, conducting additional research, and generating well-structured investigative reports.

Orchestration and Implementation

Let’s examine the implementation details of the Investigative Report AI Agent. Below, I’ll walk you through the code connecting the tools and how they interact in the orchestration.

To integrate Hex with Dagster, I’m leveraging the dagster-hex library, which allows Dagster to call Hex projects. This resource handles the interaction with Hex’s API and makes it easy to run the agent.

import os

from dagster import ConfigurableResource
from dagster_hex.resources import HexResource
from pydantic import Field


class ConfigurableHexResource(ConfigurableResource):
    hex_api_key: str = Field(default_factory=lambda: os.environ.get("HEX_API_KEY"))

    def create_client(self) -> HexResource:
        return HexResource(api_key=self.hex_api_key)

    def run_and_poll(
        self,
        project_id: str,
        inputs: dict,
        update_cache: bool,
        kill_on_timeout: bool,
        poll_interval: int,
        poll_timeout: int,
    ):
        return self.hex_resource.run_and_poll(
            project_id=project_id,
            inputs=inputs,
            update_cache=update_cache,
            kill_on_timeout=kill_on_timeout,
            poll_interval=poll_interval,
            poll_timeout=poll_timeout,
        )

My wrapper around the HexResource class to make it a configurable resource

This ConfigurableHexResource class allows me to interact with the AI agent I have defined in a Hex project.

Let's first initialize a hex resource:

hex_resource = ConfigurableHexResource(
    hex_api_key=os.environ["HEX_API_KEY"],
)

Initialization of hex resource

I can then use the Hex Resource within a Dagster asset to run the investigative report generation:

from dagster_hex.resources import DEFAULT_POLL_INTERVAL
from dagster_hex.types import HexOutput

class ConversationBrief(TypedDict):
    conversation_id: str
    run_id: str
    run_url: str
    trace_id: str
    partition_time: datetime


@asset(
    name="investigative_reporter_ai_agent",
    description="Investigative Reporter AI Agent",
    io_manager_key="prototypes_io_manager",
    ins={
        "x_conversations": AssetIn(
            key=["bronze", "x_conversations"],
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-4, end_offset=-4
            ),
        ),
        "x_conversation_posts": AssetIn(
            key=["bronze", "x_conversation_posts"],
            partition_mapping=TimeWindowPartitionMapping(start_offset=-4, end_offset=0),
        ),
    },
    partitions_def=three_hour_partition_def,
    metadata={"partition_expr": "partition_time"},
    output_required=False,
    compute_kind="Hex",
)
def investigative_reporter_ai_agent(
    context,
    hex_resource: ConfigurableHexResource,
    x_conversations,
    x_conversation_posts,
):
    for _, conversation in x_conversations.iterrows():
        hex_output: HexOutput = hex_client.run_and_poll(
            project_id="00c977d2-e2c7-43a0-abfc-3d466dbad3c1",
            inputs={
                "conversation_id": conversation["tweet_conversation_id"],
                "conversation_markdown": conversation_markdown,
            },
            kill_on_timeout=True,
            poll_interval=DEFAULT_POLL_INTERVAL,
            poll_timeout=None,
        )

        conversation_brief_output = ConversationBrief(
            conversation_id=conversation["tweet_conversation_id"],
            run_id=hex_output.run_response["runId"],
            run_url=hex_output.run_response["runUrl"],
            trace_id=hex_output.run_response["traceId"],
            partition_time=partition_time,
        )

        yield Output(
            value=pd.DataFrame([conversation_brief_output]),
            metadata={
                "run_url": MetadataValue.url(hex_output.run_response["runUrl"]),
                "run_status_url": MetadataValue.url(
                    hex_output.run_response["runStatusUrl"]
                ),
                "trace_id": MetadataValue.text(
                    hex_output.run_response["traceId"]
                ),
                "run_id": MetadataValue.text(hex_output.run_response["runId"]),
                "elapsed_time": MetadataValue.int(
                    hex_output.status_response["elapsedTime"]
                ),
            },
        )

Dagster asset that launches agent runs on Hex

This asset takes in a conversation ID and markdown data, passes it to Hex for processing, and logs the results. The output is a detailed summary of the conversation generated by the AI agent.

Example Workflow and Execution

Now that we’ve discussed the orchestration and code let’s look at a complete example run illustrating how Dagster, Hex, LangChain, and LangSmith come together.

The Dagster DAG shows how the prototype agent is integrated with upstream and downstream assets. Dagster orchestrates the data flow and schedules the Hex project runs.

Dagster DAG

Here’s a look at the metadata from a successful Dagster run, showing the logs and asset materialization.

Dagster run's metadata

The AI agent processes the conversation and generates a brief using Hex. The output includes the research results and summary text.

## Completeness Assessment
False

## Research Cycles
3

## Research Findings
Research Task 1
Task: Investigate the specific study mentioned regarding the impact of wet ground on hurricanes.
Status: ✅ Completed
Tool used: tool_choice='DuckDuckGo'
Findings:
The research task investigated the impact of wet ground on hurricanes, focusing on urban flooding and its relationship with extreme weather events. Urban flooding is a significant hazard in cities, often caused by rainfall or storm surges that overwhelm drainage systems, leading to property damage and infrastructure challenges. Different types of urban flooding include pluvial, fluvial, and coastal flooding, exacerbated by impervious surfaces that increase runoff. Climate change is influencing urban flooding patterns, particularly in coastal areas due to sea level rise and increased rainfall intensity.
Additionally, the research highlighted the 2023 wildfires in Hawaii, which were influenced by dry conditions related to Hurricane Dora. The fires caused extensive damage and fatalities, demonstrating how weather patterns can interact with urban environments to create severe disasters.

The findings suggest that effective urban planning, including gray and green infrastructure, improved drainage systems, and integrated water management, can mitigate urban flooding impacts. Understanding these dynamics is crucial for addressing future challenges posed by climate change and extreme weather events.

Research Task 2
Task: Look for verified data or research articles that support the claim about hurricanes maintaining strength over wet ground.
Status: ✅ Completed
Tool used: tool_choice='DuckDuckGo'
Findings:
The research task aimed to gather verified data regarding the claim that hurricanes can maintain their strength over wet ground. The findings included a detailed account of the August 2023 wildfires in Hawaii, which were exacerbated by strong winds from Hurricane Dora. This indicates a relationship between hurricanes and their influence on ground conditions. Additionally, the research presented background information on Puerto Rico, highlighting its geographical and political context, but did not directly address the hurricane claim. Overall, while the findings suggest hurricanes may affect environmental conditions, more focused research is needed to definitively support the claim about hurricanes maintaining strength over wet ground.
...

## Brief
The conversation centered on the interaction between hurricanes and wet ground conditions, particularly how moisture levels can affect hurricane strength as they move over land. The discussion highlighted a potential link between urban flooding, exacerbated by prior rainfall, and the capacity of hurricanes to maintain their intensity. Urban flooding is identified as a significant hazard in cities, often leading to severe property damage and infrastructure challenges, especially as climate change alters weather patterns, including increased rainfall and rising sea levels.

Research findings revealed that while there is a general understanding of the dynamics between hurricanes and environmental conditions, specific studies verifying the claim that hurricanes can sustain strength over wet ground are limited. The investigations included an analysis of urban flooding types and their causes, with additional insights from historical data on hurricanes, particularly in Hawaii, and the impact of climate change on weather patterns. However, a definitive study supporting the hurricane wet ground interaction claim remains elusive, emphasizing the need for further focused research in this area.

Monitoring the agent’s performance in LangSmith helps optimize the research cycles and track token usage and cost metrics. This helps ensure the AI agent is efficient and doesn’t perform redundant tasks.

Agent monitoring in LangSmith

Conclusion

Using LangChain, Hex, and Dagster provides a powerful stack for prototyping AI agents. This combination allows you to efficiently build, orchestrate, and evaluate AI agents before moving them into full production. LangChain manages the decision-making and agent logic, Hex powers the core agent operations and handles the research tasks, and Dagster ties everything together by orchestrating the workflows and ensuring smooth execution across assets.

As I continue to develop this project, the focus will be on refining the agent’s research capabilities and performance metrics while working toward fully productionizing the entire workflow in Dagster.