理解API结构¶

在编写代码之前,让我们先了解将要使用的关键接口:

1. 获取所有职位¶

GET /api/v6.8/jobs

返回机器上所有作业的列表,包含基本信息如开始时间、结束时间和状态。

2. 获取具体职位详情¶

GET /api/v6.8/jobs/{jobId}

返回有关特定作业的详细信息,包括:

  • 时间开始当工作开始时
  • 时间结束: 当任务完成时(若仍在运行则为空)
  • 结果职位状态 (待定, 成功, 用户取消, 失败)
  • 持续时间曝光时间、重涂时间等的细分
  • 材料, 任务, 任务当前层级构建参数

3. 获取作业的用户消息¶

GET /api/v6.8/软件/用户消息

返回有关用户消息的详细信息,包括

返回消息列表,若不存在消息则返回空值。

  • 严重性用户消息的严重性
  • 职位ID:构建作业的唯一标识符,
  • 时间提升消息创建的时间戳,
  • 消息:消息本身,

该API采用OAuth2认证机制,并以JSON格式返回数据。所有时间戳均采用协调世界时(UTC)。

要全面了解 Web API 接口,请参阅附录Api 文档。

环境配置¶

首先,让我们安装所需的Python包并设置导入。

在[ ]中:
import 请求
import time
import json
来自 datetime import datetime, timezone
from typing import Dict, Optional, 元组
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dotenv import load_dotenv
import os
from datetime import timedelta

# 为演示目的,我们将禁用 SSL 警告
# 生产环境中请使用正规SSL证书!
import urllib3
urllib3.禁用警告(urllib3.异常.不安全的请求警告)

print("✓ 所有包加载成功!")

配置¶

让我们设置连接参数。在实际生产环境中,您应使用环境变量或安全的配置文件来存储凭据。

要访问机器接口,我们需要创建API凭证。这EOSCONNECT Core 便捷完成。在浏览器中打开打印机的网页——在本例中,该地址为 https://si16120019/.

image.png

在多数情况下,浏览器会显示网站不安全的警告——请勿担心,此警告出现是因为EOS机器默认配备自签名证书(另请参阅附录中的《理解自签名证书》)。

image-2.png

在授权设置菜单下,我们可以使用 (+) 按钮生成新的客户端 ID 和密钥对。

image-3.png

最近生成的API凭证存储在 .env 文件。

为什么使用一个 .env 文件?

将API密钥和密码等敏感凭证直接存储在代码中存在安全风险——尤其当您共享笔记本或将其提交至Git等版本控制系统时。 .env 该文件提供了一个简洁的解决方案:

  • 安全性:凭证与您的代码分开存储
  • 便捷性:无需修改代码即可轻松更新凭证
  • 灵活性不同环境(开发环境、生产环境)可以使用不同的 .env 文件
  • 最佳实践:遵循“12要素应用”方法论进行配置管理

设置变量¶

接下来,我们将确定要EOSCONNECT Core API版本。EOSCONNECT Core Web APIEOSCONNECT Core 向后兼容性——这意味着既可通过Web API查询当前软件版本的机器,也可查询旧版软件的机器。我们直接从机器读取最新版本——截至本文发布时,最新可用版本为 v6.8

在[ ]中:
# Load environment variables from .env file
load_dotenv()

# Load API credentials from environment variables
HOSTNAME = os.getenv("HOSTNAME")
API_VERSION_INFO_URL = f"https://{HOSTNAME}/api/supportedVersions"

# Fetch available API versions

# Check if API_VERSION is set in environment variables
API_VERSION = os.getenv("API_VERSION")

if API_VERSION:
    # Use the version from environment variable
    print(f"✓ Using API version from environment variable: {API_VERSION}")

else:
    # Fetch API versions from the endpoint
    print(f"\n🔍 Fetching available API versions from {API_VERSION_INFO_URL}...")
    response = requests.get(API_VERSION_INFO_URL, verify=False, timeout=10)
    versions_data = response.json()
    # Extract version strings from the response
    available_versions = [f"v{v['majorVersion']}.{v['minorVersion']}" for v in versions_data]

    if available_versions:
        # Sort versions to get the latest
        API_VERSION = sorted(available_versions, key=lambda v: [int(x) for x in v.lstrip('v').split('.')])[-1]
        print(f"✓ Available API versions: {', '.join(available_versions)}")
        print(f"✓ Using latest version: {API_VERSION}")
    else:
        # Fallback to default version
        API_VERSION = "v6.0"
        print(f"⚠️  No versions found in response, using default: {API_VERSION}")

加载 API凭据¶

现在我们从 .env 将文件加载到内存中

在[ ]中:
from dotenv import load_dotenv
import os
from datetime import timedelta

# Load environment variables from .env file
load_dotenv()

API_BASE_URL = f"https://{HOSTNAME}/api/{API_VERSION}"
API_CLIENT_ID = os.getenv("API_CLIENT_ID")
API_CLIENT_SECRET = os.getenv("API_CLIENT_SECRET")

print("✓ Environment variables loaded")
print(f"  - Client ID: {API_CLIENT_ID}")
print(f"  - Client Secret: {'*' * len(API_CLIENT_SECRET) if API_CLIENT_SECRET else 'Not set'}")
print(f"  - Base URL: {API_BASE_URL}")

获取访问令牌¶

要与EOSCONNECT Web API进行通信,我们需要一个访问令牌。该令牌作为数字密钥,具有以下作用:

  • 身份验证:确认我们有权访问该API
  • 授权:定义我们被允许执行的操作(基于客户端权限)
  • 安全性:保护机器免受未经授权的访问

令牌是通过OAuth2客户端凭证流程请求的——这是机器间通信的标准化流程。我们先前在EOSCONNECT Core 创建的API凭证(客户端ID和密钥)将被交换为有限时效的访问令牌。

重要提示:该令牌具有有限的有效期(通常为1小时)。对于运行时间较长的应用程序,必须在令牌过期前进行更新。

在[ ]中:
def get_oauth_token(client_id: str, client_secret: str) -> Optional[Dict]:
    """
    Fetch OAuth2 token using client credentials flow.
    
    Args:
        client_id: OAuth2 client ID
        client_secret: OAuth2 client secret
        
    Returns:
        Dictionary containing token information or None if request fails
    """
    token_url = f"https://{HOSTNAME}/auth/connect/token"
    
    # Prepare the request data for client credentials grant
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }
    
    headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    
    try:
        response = requests.post(token_url, data=data, headers=headers, verify=False, timeout=10)
        response.raise_for_status()
        token_data = response.json()
        return token_data
    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching OAuth token: {e}")
        return None


def refresh_token() -> bool:
    """
    Refresh the OAuth2 token and update global HEADERS.
    
    Returns:
        True if token refresh was successful, False otherwise
    """
    global API_TOKEN, HEADERS
    
    token_data = get_oauth_token(API_CLIENT_ID, API_CLIENT_SECRET)
    
    if token_data:
        API_TOKEN = token_data.get("access_token", "")
        token_type = token_data.get("token_type", "Bearer")
        
        HEADERS = {
            "Authorization": f"{token_type} {API_TOKEN}",
            "Accept": "application/json"
        }

        return True
    else:
        print("❌ Failed to refresh token")
        return False

# Fetch the token
token_response = get_oauth_token(API_CLIENT_ID, API_CLIENT_SECRET)

if token_response:
    # Extract token details
    API_TOKEN = token_response.get("access_token", "")
    token_type = token_response.get("token_type", "Bearer")
    expires_in = token_response.get("expires_in", 0)
    
    # Update the headers with the new token
    HEADERS = {
        "Authorization": f"{token_type} {API_TOKEN}",
        "Accept": "application/json"
    }
    
    # Calculate expiry time
    current_time = datetime.now(timezone.utc)
    expiry_time = current_time + timedelta(seconds=expires_in)
    
    # Display token information
    print("📋 Token Details:")
    print(f"  Token Type: {token_type}")
    print(f"  Expires In: {expires_in} seconds ({expires_in / 3600:.2f} hours)")
    print(f"  Current Time (UTC): {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"  Expiry Time (UTC): {expiry_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"  Token (first 50 chars): {API_TOKEN[:50]}...")
    print(f"\n✓ Authorization headers updated")
else:
    print("⚠️  Failed to retrieve token. Please check your credentials.")

步骤 1:连接到API¶

让我们创建函数,从API获取工作详情和用户消息。这将展示与EOSCONNECT接口交互是多么简单!

在[ ]中:
def get_last_job() -> Optional[Dict]:
    """
    Fetch the most recent job from the printer.
    
    Returns:
        Dictionary containing the last job's details or None if request fails
    """
    url = f"{API_BASE_URL}/jobs/last"
    try:
        response = requests.get(url, headers=HEADERS, verify=False, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching last job: {e}")
        return None
    
def get_user_message(jobId : str, limit: int = 20) -> Optional[Dict]:
    """
    Fetch the current user message displayed on the printer.
    
    Returns:
        Dictionary containing the user message or None if request fails
    """
    url = f"{API_BASE_URL}/software/usermessages"
    params = {"jobId": jobId, "limit":limit}
    try:
        response = requests.get(url, headers=HEADERS, params=params, verify=False, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching user message: {e}")
        return None

def get_all_jobs(limit: int = 10, from_date: Optional[str] = None, to_date: Optional[str] = None) -> Optional[list]:
    """
    Fetch a list of recent jobs.
    
    Args:
        limit: Maximum number of jobs to retrieve
        from_date: Filter jobs starting from this date (ISO 8601 format, e.g., '2024-01-01T00:00:00Z')
        to_date: Filter jobs up to this date (ISO 8601 format, e.g., '2024-12-31T23:59:59Z')
        
    Returns:
        List of job summaries or None if request fails
    """
    url = f"{API_BASE_URL}/jobs"
    params = {"take": limit}
    
    # Add optional date filters if provided
    if from_date:
        params["from"] = from_date
    if to_date:
        params["to"] = to_date
    
    try:
        response = requests.get(url, headers=HEADERS, params=params, verify=False, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching jobs list: {e}")
        return None


print("✓ API functions defined")

步骤 2:测试连接¶

让我们通过从打印机获取最近的作业来测试连接。这将帮助我们理解数据结构。

该 获取任务 该函数查询机器在指定时间范围内启动的所有作业。 起始日期 以及 截至今日我们使用先前定义的API函数 获取所有工作 为此目的。API函数的结果经过格式化后,在Gradio UI中显示。图形化呈现的代码EOSCONNECT Core 无关EOSCONNECT Core 已移至 ui.py 为清晰起见,请提交文件。

让我们看看职位详情:

  • 该 id 是该作业的唯一标识符。遗憾的是,它并未提供任何关于我们任务(配方)的信息,因为它仅仅是机器序列号与时间戳的组合。
  • 该 结果 这是一个我们感兴趣的属性。我们希望在任务完成时收到通知,并了解任务是否成功。
在[ ]中:
import pandas as pd
from datetime import datetime, timedelta
from ui import create_job_browser_ui

def fetch_jobs(from_date, to_date):
    """
    Fetch jobs within the specified date range and display them in a table.
    
    Args:
        from_date: Start date for filtering jobs
        to_date: End date for filtering jobs
        
    Returns:
        Pandas DataFrame with job information
    """
    # Convert dates to ISO 8601 format with time
    from_datetime = f"{from_date}T00:00:00.000Z" if from_date else None
    to_datetime = f"{to_date}T23:59:59.999Z" if to_date else None
    
    # Fetch jobs from API
    refresh_token()
    jobs = get_all_jobs(limit=500, from_date=from_datetime, to_date=to_datetime)
    
    if not jobs:
        return pd.DataFrame(columns=['id', 'result', 'timeStarted', 'timeEnded', 'task', 'material'])
    
    # Extract relevant fields
    job_data = []
    for job in jobs:
        job_data.append({
            'id': job.get('id', 'N/A'),
            'result': job.get('result', 'N/A'),
            'timeStarted': job.get('timeStarted', 'N/A'),
            'timeEnded': job.get('timeEnded', 'N/A'),
            'task': job.get('task', 'N/A'),
            'material': job.get('material', 'N/A')
        })
    
    # Create DataFrame
    df = pd.DataFrame(job_data)
    return df

# Launch the interface
demo = create_job_browser_ui(fetch_jobs)
demo.launch(share=False, inline=True)

示例:这是一个示例输出 image.png

我们现在看到的是最近创建的作业概览。根据作业结果,我们可以决定是否需要获取更多详细信息。在我们的场景中,只有当作业因取消或错误终止时,我们才需要查看额外细节——特别是用户消息。

步骤 3:任务监控逻辑¶

现在让我们实现核心监控逻辑。该函数检查最近构建的作业是否成功完成。若作业因错误而中止或终止,我们将检索用户消息以获取有关取消操作的相关信息。

在[ ]中:
def check_last_job_status(last_job_id: Optional[str] = None) -> Optional[Dict]:
    """
    Check the status of the last job.
    
    Args:
        last_job_id: Optional job ID to compare against. If provided and matches
                     the current last job, returns None (no change detected)
    
    Returns:
        Dictionary containing:
        - 'result': Job result status (Pending, Successful, Failed, UserCanceled)
        - 'jobId': The job ID
        - 'task': The task name
        - 'timeStarted': When the job started
        - 'timeEnded': When the job ended (None if still running)
        - 'userMessages': List of user messages (only for failed jobs)
        
        Returns None if last_job_id matches the current last job ID
    """
    refresh_token()
    # Fetch the last job
    last_job = get_last_job()
    
    if not last_job:
        return {
            'result': 'Error',
            'jobId': None,
            'task': None,
            'timeStarted': None,
            'timeEnded': None,
            'userMessages': None,
            'error': 'Failed to fetch last job'
        }
    
    job_id = last_job.get('id', 'Unknown')
    
    # If last_job_id is provided and matches current job, return None
    if last_job_id is not None and job_id == last_job_id:
        return None
    
    result = last_job.get('result', 'Unknown')
    task = last_job.get('task', 'N/A')
    time_started = last_job.get('timeStarted', 'N/A')
    time_ended = last_job.get('timeEnded', 'N/A')
    
    # Prepare base response
    response = {
        'result': result,
        'jobId': job_id,
        'task': task,
        'timeStarted': time_started,
        'timeEnded': time_ended,
        'userMessages': None
    }
    
    # If job failed or was canceled, fetch user messages
    if result in ['Failed', 'UserCanceled']:
        user_messages = get_user_message(job_id, limit=20)
        response['userMessages'] = user_messages if user_messages else []
    
    return response


print("✓ Last job status check function defined")
在[ ]中:
import pandas as pd
from ui import create_job_status_ui

# Track the last job ID
last_job_id_tracker = None

def fetch_job_status():
    """
    Fetch the last job status and format it for display.
    Returns a tuple of (job_info_df, user_messages_df)
    """
    global last_job_id_tracker
    
    result = check_last_job_status(last_job_id_tracker)
    
    if result is None:
        # No new job detected
        return (
            pd.DataFrame([{"Info": f"No new job detected or job status unchanged. Tracked ID: {last_job_id_tracker}"}]),
            pd.DataFrame()
        )
    
    # Update tracker
    last_job_id_tracker = result.get('jobId')
    
    # Create job info DataFrame
    job_info = {
        'Field': ['Job ID', 'Task', 'Result', 'Time Started', 'Time Ended'],
        'Value': [
            result.get('jobId', 'N/A'),
            result.get('task', 'N/A'),
            result.get('result', 'N/A'),
            result.get('timeStarted', 'N/A'),
            result.get('timeEnded', 'N/A')
        ]
    }
    job_info_df = pd.DataFrame(job_info)
    
    # Create user messages DataFrame
    user_messages = result.get('userMessages')
    if user_messages:
        messages_data = []
        for msg in user_messages:
            messages_data.append({
                'Severity': msg.get('severity', 'N/A'),
                'timeRaised': msg.get('timeRaised', 'N/A'),
                'message': msg.get('message', 'N/A'),
                'additionalInfo': msg.get('additionalInfo', 'N/A')
            })
        user_messages_df = pd.DataFrame(messages_data)
    else:
        user_messages_df = pd.DataFrame([{"Info": "No user messages available"}])
    
    return job_info_df, user_messages_df

def clear_last_job_id_tracker():
    global last_job_id_tracker 
    last_job_id_tracker = None
    return [],[]
    

# Create Gradio interface for job status
job_status_demo = create_job_status_ui(fetch_job_status, clear_last_job_id_tracker)

# Launch the interface
job_status_demo.launch(share=False, inline=True)

示例:这是一个示例输出 image.png

在此示例中,我们可以看到最近构建的作业未能成功完成。查看用户消息可知,该作业是被机器操作员中止的。

附录:¶

使用 OpenAPI编辑器查看EOSCONNECT Core 文档¶

您可以使用以下官方Swagger编辑器: https://editor.swagger.io/ 渲染 swagger.json 此文件为您EOSCONNECT Core提供所有可用API调用的概览。EOSCONNECT Core 直接提供了类似的界面,您可以在Web API/数据点部分找到它。

  1. 访问官方 Swagger 编辑器:https://editor.swagger.io/
  2. 点击文件→导入网址
  3. 选择 swagger.json 文件

理解自签名证书¶

当您连接到某个网站时,您的浏览器会检查该网站的安全证书是否由可信机构(如知名组织)签发。

自签名证书是由网站所有者自行创建的,而非由可信机构签发。这就像自己写身份证,而不是向政府申领一样。

您的浏览器显示警告是因为无法验证证书是否可信——它可能是合法的(如EOS打印机),也可能存在安全风险。对于EOS设备而言,由于您连接的是本地打印机而非互联网上的随机网站,因此是安全的。

不妨这样理解:打印机在说"相信我,我就是我所声称的那样",而无需第三方担保。