我正在尝试使用Docker、Postgres和Airflow构建一个最小化的数据管道。我的docker-compose.yaml
文件可以在这里找到,它是从Airflow官方文档这里扩展而来的。我在其中添加了单独的Postgres数据库(用于写入数据)和pgadmin实例(这些内容添加在文件底部)。
我已经确认通过运行docker compose up -d
时,所有服务都在运行并可访问,并且我可以登录到Airflow web UI查看我的DAGs。我创建了一个非常简单的DAG,用于每分钟将日期和时间插入表中。DAG代码如下所示:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2024, 1, 1),
}
def create_table():
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
conn = pg_hook.get_conn()
cursor = conn.cursor()
create_query = """
CREATE TABLE IF NOT EXISTS fact_datetime (
datetime TIMESTAMP
);
"""
cursor.execute(create_query)
conn.commit()
cursor.close()
conn.close()
def insert_datetime():
pg_hook = PostgresHook(postgres_conn_id='postgres_default')
conn = pg_hook.get_conn()
cursor = conn.cursor()
insert_query = """
INSERT INTO fact_datetime (datetime)
VALUES (NOW());
"""
cursor.execute(insert_query)
conn.commit()
cursor.close()
conn.close()
with DAG('insert_datetime_dag',
default_args=default_args,
description='DAG to insert current datetime every minute',
schedule_interval='*/1 * * * *',
catchup=False) as dag:
create_table_task = PythonOperator(
task_id='create_table',
python_callable=create_table,
)
insert_datetime_task = PythonOperator(
task_id='insert_datetime',
python_callable=insert_datetime,
)
create_table_task >> insert_datetime_task
在运行此DAG之前,我在Airflow web UI中添加了一个Postgres连接,以便能够使用PostgresHook
。
当我运行该DAG时,任务似乎在create_table
任务上卡住,日志如下:
...
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
...
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
Is the server running on that host and accepting TCP/IP connections?
...
如果我理解正确,这表明Airflow无法看到我的Postgres实例。这个问题可以通过将端口5432暴露给其中一个Airflow服务来解决。
我不确定应该向哪个服务暴露端口,也不清楚如何编辑我的docker-compose文件。请有人能帮助:
- 确认我对问题的理解是否正确;
- 提供对docker-compose文件的正确修改建议,以便我能成功运行我的DAG。