相关疑难解决方法(0)

在flask python中调用POST请求时,无法解码JSON对象

我在python中编写了一个简单的REST-ful Web服务器flask,本教程中包含以下步骤; 但是我在调​​用POST请求时遇到了问题.代码是:

@app.route('/todo/api/v1.0/tasks', methods=['POST'])
def create_task():
    if not request.json or not 'title' in request.json:
        abort(400)
    task = {
        'id': tasks[-1]['id'] + 1,
        'title': request.json['title'],
        'description': request.json.get('description', ""),
        'done': False
    }
    tasks.append(task)
    return jsonify({'task': task}), 201
Run Code Online (Sandbox Code Playgroud)

POST使用curl上述页面中的示例发送请求:

curl -i -H "Content-Type: application/json" -X POST -d '{"title":"Read a book"}' http://127.0.0.1:5000/todo/api/v1.0/tasks
Run Code Online (Sandbox Code Playgroud)

但是我得到了这个错误:

HTTP/1.0 400 BAD REQUEST
Content-Type: text/html
Content-Length: 187
Server: Werkzeug/0.11.10 Python/2.7.9
Date: Mon, 30 May 2016 09:05:52 GMT

<!DOCTYPE HTML …
Run Code Online (Sandbox Code Playgroud)

python webserver json flask flask-restful

9
推荐指数
2
解决办法
1万
查看次数

如何在 Airflow 2.x 中将 XComArg 转换为字符串值?

代码:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class GCSUploadOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        bucket_name,
        target_file_name,
        data_as_str,
        gcp_conn_id="google_cloud_default",
        *args,
        **kwargs,
    ):
        super(GCSUploadOperator, self).__init__(*args, **kwargs)
        self.bucket_name = bucket_name
        self.data_as_str = data_as_str
        self.gcp_conn_id = gcp_conn_id
        self.target_file_name = target_file_name

    def execute(self, context):
        hook = GCSHook(self.gcp_conn_id)
        hook.upload(
            bucket_name=self.bucket_name,
            object_name=context["execution_date"].strftime(
                f"year=2022/month=%m/day=%d/{self.target_file_name}"
            ),
            data=self.data_as_str,
        )

numbers = PythonOperator(task_id="numbers", python_callable=lambda: "abcde")
gcs = GCSUploadOperator(
    task_id="upload_content_to_GCS",
    bucket_name=BUCKET_NAME,
    target_file_name=f"{STORE_KEY_CONTENT}.json",
    data_as_str=?????????,   # I need to pass a string result of previous task
)
Run Code Online (Sandbox Code Playgroud)

我尝试过的data_as_str …

python airflow

7
推荐指数
1
解决办法
8992
查看次数

标签 统计

python ×2

airflow ×1

flask ×1

flask-restful ×1

json ×1

webserver ×1