2026年1月20日火曜日

MCPサーバの実装

書いてみました。これは、スケジュールナースMCP TOOL実装になります。AIの力を借りて書きました。

スケジュールナースのシフト勤務表に関わるオブジェクトをLLMが自由に操作できるようにすることが目的です。タスクに関しては、シフト勤務表での知見が集積し、物になりそうなら、進めることとして、現時点では実装しません。

仮に期待に反し、LLMで全く動かなかったとしても、スケジュールナースをサーバとして使用できるようにするためのAPI機能を公開しているので、それなりに役に立つはずです。無駄にはなりません。

従来、サーバの設計者には、ソルバとのインタフェースのみを提供していました。しかし、これがあると、GUIで操作に加えて、プログラムでの操作も容易に出来るようになります。双方向の面から、ソルバインタフェースを操作出来るようになります。つまり、

■GUI⇒ソルバIF⇒ソルバ

■ソルバIF⇒GUI

これにより、ソルバインタフェースが、より分かり易くなる効果が期待できます。ソルバインタフェースを理解する学習教材としても機能するのではないか、と思います。



import asyncio
import json
from fastmcp import FastMCP
import asyncio



from fastmcp import Client # クライアントをインポート
import asyncio
import win32file
import win32pipe


import pywintypes

import uuid

from pydantic import BaseModel, Field
from datetime import datetime
from typing import List, Literal, Optional
from pydantic import BaseModel, Field, model_validator, field_validator

from typing import TypeVar, Type, Optional
from typing import Set
from pydantic import BaseModel, ValidationError

from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context

# 任意のBaseModelを指す型変数 T を定義
T = TypeVar("T", bound=BaseModel)


class PipeNotConnectedError(Exception):
    pass

class AsyncNamedPipeClient:
    def __init__(self, pipe_name: str):
        self.pipe_path = fr"\\.\pipe\{pipe_name}"
        self.handle = None

    async def connect(self, retry: int = 5, delay: float = 0.5):
        """パイプ接続。リトライ付き"""
        loop = asyncio.get_running_loop()

        def _connect():
            return win32file.CreateFile(
                self.pipe_path,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                0, None,
                win32file.OPEN_EXISTING,
                0, None
            )

        for attempt in range(retry):
            try:
                self.handle = await loop.run_in_executor(None, _connect)
                return
            except pywintypes.error as e:
                #print (e.winerror)
                # パイプが存在しない / 接続できない
                if e.winerror == 2:  # ERROR_FILE_NOT_FOUND
                    pass
                elif e.winerror == 231:  # ERROR_PIPE_BUSY
		    
                    pass
                else:
                    raise

            await asyncio.sleep(delay)

        raise PipeNotConnectedError(f"パイプ {self.pipe_path} に接続できませんでした。")

    async def write_line(self, text: str):
        if not self.handle:
            raise PipeNotConnectedError("パイプが接続されていません。")

        loop = asyncio.get_running_loop()
        data = (text + "\n").encode("utf-8")

        def _write():
            win32file.WriteFile(self.handle, data)

        try:
            await loop.run_in_executor(None, _write)
        except pywintypes.error as e:
            raise PipeNotConnectedError(f"パイプ書き込みエラー: {e}")

    async def read_line(self) -> str:
        if not self.handle:
            raise PipeNotConnectedError("パイプが接続されていません。")

        loop = asyncio.get_running_loop()

        def _read():
            return win32file.ReadFile(self.handle, 1024 * 1024 * 10)#20)#このサイズが小さいとBrokenPipeErrorが発生する Solutionの大きいInstanceのSolution読み込みに注意
        #await asyncio.sleep(1)
        while True:
            try:
                _, data = await loop.run_in_executor(None, _read)
            except pywintypes.error as e:
                if e.winerror == 109:  # Broken Pipe
                    print("Broken Pipe Detected. Retry from Connection")
                    #await asyncio.sleep(1)
                    #await self.connect()
                    #continue
                    raise PipeNotConnectedError(f"パイプ読み込みエラー: {e}")
                else:
                    raise PipeNotConnectedError(f"パイプ読み込みエラー: {e}")

            if not data:
                raise PipeNotConnectedError("パイプが切断されました。")

            return data.decode("utf-8").rstrip("\n")

    async def close(self):
        print("Closing Pipe")
        if self.handle:
            try:
                win32file.CloseHandle(self.handle)
            except Exception:
                pass
            self.handle = None

# date_objectの定義
class DateObject(BaseModel):
    name: str = Field(
        ...,
        description=(
            "オブジェクトの種別名。既定オブジェクトの場合は以下のいずれかを使用: "
            "'StartDate', 'StartDisplayDate', 'FinishDate', 'AllDays', 'ThisMonth', "
            "'SUN', 'MON', 'TUE', 'WED', 'THU', 'FRI', 'SAT', 'HOLIDAYS', 'SHOLIDAYS'。 "
            "自由な曜日定義を行う場合は 'DayDef' を使用してください。"
            """
	        "StartDate" 必須。 制約開始日となる。通常、'今月'の最初の日に設定する。通常先月部には制約せず'今月'だけ制約対象とする。ただし、連続勤務日数制約等で、先月部を参照する必要場合もある。それには、StartDisplayDateで対象日を設定する。つまり、StartDisplayDate<=StartDate<=FinishDateの関係となる。
	        "StartDisplayDate" 必須。 表示開始日、通常、StartDateの5日前に設定する。スケジュールナースが対象とする全てのDateの最初の日に設定する
	        "FinishDate" 必須。制約終了日、通常、'今月'の最後の日に設定する
	        "DayDef	ユーザ定義用
	        "AllDays" StartDateからFinishDateまでの全てのDayを表す曜日集合に自動設定される。ユーザはReadOnly
	        "ThisMonth" '今月'の定義となる。制約開始日から、制約終了日までに自動設定される。ユーザはReadOnly
	        "SUN" 日曜日ユーザはReadOnly
        	"MON" 月曜日ユーザはReadOnly
	        "TUE" 火曜日ユーザはReadOnly
	        "WED" 水曜日ユーザはReadOnly
	        "THU" 木曜日ユーザはReadOnly
	        "FRI" 金曜日ユーザはReadOnly
	        "SAT" 土曜日ユーザはReadOnly
	        "HOLIDAYS" 祝日ユーザはReadOnly
	        "SHOLIDAYS" 振替休日ユーザはReadOnly"""
        )
    )
    def_name: str = Field(
        ...,
        description="曜日名または定義名。空文字の場合は自動的に無効(use=false)として扱われます。"
    )
    dates: List[str]=Field(# datetime] = Field(
        default_factory=list,
        description="DateTimeのリスト。ISO8601形式(YYYY-MM-DD)で指定してください。"
    )
    use: bool = Field(
        default=True,
        description="有効(true)か無効(false)か。def_nameが空の場合は無効となります。"
    )
    predefined: bool = Field(
        default=False,
        description="既定オブジェクト(true)かユーザ定義(false)かを示すフラグ。trueの場合Readonlyとなる。"
    )
    val: int = Field(
        default=0,
        description="現在は使用されていません(互換性のために維持)。"
    )

    # C#のコンストラクタロジック(def_nameが空ならuse=false)を再現
    @field_validator("use", mode="before")
    @classmethod
    def validate_use_by_def_name(cls, v, info):
        # def_nameが提供されているか確認し、空なら強制的にFalseにする
        def_name = info.data.get("def_name", "")
        if not def_name or len(def_name.strip()) == 0:
            return False
        return v
# C# の date_objects クラスに対応するモデル
class DateObjects(BaseModel):
    """
    date_object のリストを保持するコンテナクラスです。
    """
    members: List[DateObject] = Field(
        default_factory=list, 
        description="date_object のリスト"
    )
    
    days: int = Field(
        default=0, 
        description="現在使用していないフィールド(互換性のため維持)"
    )
    
    target_year: int = Field(
        default=0, 
        description="現在使用していないフィールド(互換性のため維持)"
    )
    
    target_month: int = Field(
        default=0, 
        description="現在使用していないフィールド(互換性のため維持)"
    )

class ShiftObject(BaseModel):
    """
    シフト定義を管理するクラスです。
    """
    # C# の public const string name に相当
    name: str = Field(default="ShiftDef", frozen=True)
    
    # 基本フィールド
    use: bool = Field(default=False, description="シフト定義が有効無効を示すフラグ")
    def_name: str = Field(default="", description="シフト名")
    auto_schedule: bool = Field(default=True, description="自動スケジュールのときtrue")
    color: str = Field(default="Gray", description="GUI色")
    label: str = Field(default="", description="ラベル名")
    
    # リスト型フィールド (default_another_labels = 2 を再現)
    another_labels: List[str] = Field(
        default_factory=lambda: [""] * 2, 
        description="別名ラベルのリスト"
    )
    another_colors: List[str] = Field(
        default_factory=lambda: ["Gray"] * 2, 
        description="ラベルの色リスト"
    )
    
    num_cnt: int = Field(default=0)
    work_start_time: str = Field(default="", description="勤務開始時間")
    work_hours: int = Field(default=-1, description="勤務時間定義(15分単位、-1で無定義)")

    @model_validator(mode='before')
    @classmethod
    def validate_logic(cls, data: dict):
        """
        C#のコンストラクタにあるロジックを再現します。
        - def_name が空なら use を False にする
        - 文字列のトリム処理
        """
        if isinstance(data, dict):
            # def_name のトリム処理
            def_name = data.get("def_name", "")
            if isinstance(def_name, str):
                def_name = def_name.strip()
                data["def_name"] = def_name
                
                # use の論理: def_name が空なら use は強制的に False
                if not def_name:
                    data["use"] = False

            # label のトリム処理
            label = data.get("label", "")
            if isinstance(label, str):
                data["label"] = label.strip()

        return data

    class ConfigDict:
        # C#の命名規則(スネークケース)を維持しつつ、
        # 柔軟な入力を許可する場合の設定
        populate_by_name = True

class ShiftObjects(BaseModel):
    """
    shift_objectのリストを管理するクラスです。
    """
    # C# の public const string name = "Shift Objects" に相当
    name: str = Field(
        default="Shift Objects", 
        frozen=True, 
        description="オブジェクト名"
    )

    # C# の public List members に相当
    members: List[ShiftObject] = Field(
        default_factory=list, 
        description="shift_objectのリスト"
    )

    class ConfigDict:
        # JSONのキーが "members" であれば、自動的に List[ShiftObject] にデシリアライズされます
        populate_by_name = True
# 1. C#のクラス構造をPydanticモデルで定義
class ShiftAggregate(BaseModel):
    use: bool = Field(
        default=False, 
        description="有効(true)か無効かを示すフラグ。無効ならソルバは参照しない"
    )
    # const扱いの名前。LLMが変更しないようdefault固定にする
    name: str = Field(
        default="ShiftAggregateDef", 
        frozen=True,
        description="オブジェクト名(固定値: ShiftAggregateDef)"
    )
    def_name: str = Field(
        default="", 
        description="定義名。シフトオブジェクトのORまたはNOTの集合体を表す一意識別子"
    )
    color: str = Field(
        default="Grey", 
        description="ラベル背景色(例: Blue, #FF0000)"
    )
    # Operatorをリテラルで制限
    Operator: Literal["OR", "NOT","OR_INCLUDE_NON_AUTO"] = Field(
        default="OR", 
        description="演算子。OR(いずれか)またはNOT(以外)のみ選択可能"
    )
    members: List[str] = Field(
        default_factory=list, 
        description="シフトオブジェクト名のリスト(例: ['日勤', '夜勤'])"
    )
    label: str = Field(
        default="", 
        description="GUI(予定入力・解上)での表示名"
    )
class ShiftAggregates(BaseModel):
    """
    ShiftAggregateのリストを管理するクラスです。
    """
    # C# の public const string name = "Shift Objects" に相当
    name: str = Field(
        default="ShiftAggregates", 
        frozen=True, 
        description="オブジェクト名"
    )

    # C# の public List members に相当
    members: List[ShiftAggregate] = Field(
        default_factory=list, 
        description="ShiftAggregateのリスト"
    )

    class ConfigDict:
        # JSONのキーが "members" であれば、自動的に List[ShiftObject] にデシリアライズされます
        populate_by_name = True

class DateAggregate(BaseModel):
    """
    date_objectまたは他のdate_aggregateを組み合わせて、新しい曜日集合を定義します。
    """
    
    # C#のconst string name = "DateAggregate" を再現
    name: str = Field(
        default="DateAggregate",
        frozen=True,
        description="オブジェクト名。常に 'DateAggregate' 固定です。"
    )

    Operator: str = Field(
        default="OR",
        description="演算子。AND(かつ)、OR(または)、NOT(以外)-10, -9,-8..-1 ,+1,+2,..+10,+1,Rep2,Rep3,Rep4,Rep7,Rep14,等のいずれかを指定します。"
    )

    def_name: str = Field(
        ...,
        description="この曜日集合の定義名(例: '平日', '特定日集合')。一意である必要があります。"
    )

    dates: List[str] = Field(
        default_factory=list,
        description=(
            "対象となる曜日メンバー名のリスト。date_object または "
            "他の date_aggregate の def_name を指定します。循環参照は禁止です。"
        )
    )

    use: bool = Field(
        default=False,
        description="有効(true)か無効(false)か。無効な場合、ソルバはこの定義を無視します。"
    )

# C# の date_aggregates クラスに対応するモデル
class DateAggregates(BaseModel):
    """
    複数の日付集合(DateAggregate)を管理するリストクラスです。
    """
    # C# の const string name = "DateAggregates" を再現
    name: str = Field(
        default="DateAggregates",
        frozen=True,
        description="オブジェクト名。常に 'DateAggregates' 固定です。"
    )

    # List members を再現
    members: List[DateAggregate] = Field(
        default_factory=list,
        description="DateAggregate オブジェクトのリスト"
    )
class GroupProperty(BaseModel):
    """
    スタッフの属性項目(例:夜勤属性)とその選択肢リストを定義します。
    """
    use: bool = Field(
        default=False, 
        description="有効(true)か無効かを示すフラグ。無効ならソルバは参照しません。"
    )
    
    # JAN112026 更新: オブジェクト名固定
    name: str = Field(
        default="GroupPropertyDef",
        frozen=True,
        description="オブジェクト名。常に 'GroupPropertyDef' 固定です。"
    )
    
    def_name: str = Field(
        default="",
        description="スタッフプロパティアイテム名(例:'夜勤属性'、'役職' など)。"
    )
    
    # C#コンストラクタの「空文字7つ」の初期化を再現
    members: List[str] = Field(
        default_factory=lambda: [""] * 7,
        description="属性の選択肢リスト(例:['夜勤可', '夜勤不可', '', ... ])。"
    )

class GroupProperties(BaseModel):
    """
    複数のスタッフ属性定義(GroupProperty)を管理するリストクラスです。
    """
    name: str = Field(
        default="GroupProperties",
        frozen=True,
        description="オブジェクト名。常に 'GroupProperties' 固定です。"
    )
    
    members: List[GroupProperty] = Field(
        default_factory=list,
        description="GroupProperty オブジェクトのリストです。"
    )
class ShiftLevel(BaseModel):
    """
    予定セルにおけるシフトとその制約レベル、表示用インデックスを定義します。
    """
    shift: str = Field(
        default="",
        description="shift_object または shift_aggregate_class の定義名(def_name)を参照します。"
    )
    
    level: int = Field(
        default=0,
        description=(
            "ソフト制約の重みインデックス(通常1-7)。"
            "0以下の値はハード制約(絶対に守るべき制約)を意味します。"
        )
    )
    
    a_ix: int = Field(
        default=-1,
        description=(
            "another_labelを指すインデックス。デフォルトは-1。"
            "0以上のとき、該当するラベルのインデックスとして機能します。"
        )
    )
    
    locked: bool = Field(
        default=False,
        description="GUI上で予定セルの編集がロックされているかどうかを示すフラグ(2026年1月12日追加)。"
    )

class GroupAggregate(BaseModel):
    """
    スタッフ属性(GroupProperty)または他の集合(GroupAggregate)を組み合わせて、
    新しい属性集合を定義します。
    """
    use: bool = Field(
        default=False, 
        description="有効(true)か無効かを示すフラグ。無効ならソルバは参照しません。"
    )

    name: str = Field(
        default="GroupAggregateDef",
        frozen=True,
        description="オブジェクト名。常に 'GroupAggregateDef' 固定です。"
    )

    def_name: str = Field(
        default="",
        description="この属性集合の定義名(例: '夜勤リーダー候補', '新人以外' など)。"
    )

    Operator: Literal["AND", "OR", "NOT"] = Field(
        default="OR",
        description="演算子。AND(かつ)、OR(または)、NOT(以外)のいずれかを指定します。"
    )

    # C#コンストラクタの「空文字7つ」の初期化を再現
    members: List[str] = Field(
        default_factory=lambda: [""] * 7,
        description=(
            "対象となるメンバー名のリスト。group_property または "
            "他の group_aggregate の def_name を参照します。循環参照は禁止です。"
        )
    )

class GroupAggregates(BaseModel):
    """
    複数のスタッフ属性集合定義(GroupAggregate)を管理するリストクラスです。
    """
    name: str = Field(
        default="GroupAggregates",
        frozen=True,
        description="オブジェクト名。常に 'GroupAggregates' 固定です。"
    )
    
    members: List[GroupAggregate] = Field(
        default_factory=list,
        description="GroupAggregate オブジェクトのリストです。"
    )
class StaffProperty(BaseModel):
    """
    個々のスタッフの属性(可能なシフト、タスク、所属グループ等)を定義します。
    """
    use: bool = Field(
        default=True,
        description="有効(true)か無効かを示すフラグ。無効ならソルバは参照しません。"
    )

    name: str = Field(
        default="StaffPropertyDef",
        frozen=True,
        description="オブジェクト名。常に 'StaffPropertyDef' 固定です。"
    )

    def_name: str = Field(
        default="",
        description="スタッフの名前(一意の識別子)。"
    )

    shifts: Set[str] = Field(
        default_factory=list,
        description="このスタッフが担当可能なシフト名のリスト(C#のSortedSetに対応)。"
    )

    tasks: Set[str] = Field(
        default_factory=list,
        description="このスタッフが担当可能なタスク名のリスト(C#のSortedSetに対応)。"
    )

    properties: dict[str, str] = Field(
        default_factory=dict,
        description="グループプロパティ名と選択された属性値のマップ(例: {'夜勤属性': '夜勤可'})。"
    )

    comment: str = Field(
        default="",
        description="スタッフに関するメモ・コメント(2026年1月12日更新)。"
    )

class StaffProperties(BaseModel):
    """
    全スタッフの属性定義を一括管理するコンテナクラスです。
    """
    name: str = Field(
        default="StaffProperties",
        frozen=True,
        description="オブジェクト名。常に 'StaffProperties' 固定です。"
    )
    
    members: List[StaffProperty] = Field(
        default_factory=list,
        description="StaffProperty オブジェクトのリストです。"
    )

class ShiftSchedules(BaseModel):
    """
    全スタッフ・全日程のシフト割り当て(二次元リスト)を管理します。
    """
    # C# の const string name = "Schedules" を再現
    name: str = Field(
        default="Schedules",
        frozen=True,
        description="オブジェクト名。常に 'Schedules' 固定です。"
    )

    # List> members を再現
    # 外側のリストが「スタッフ」、内側のリストが「日付」に対応します
    members: list[list[ShiftLevel]] = Field(
        default_factory=list,
        description="シフト割り当ての二次元リスト。例: members[スタッフIndex][日付Index]"
    )

class Solution(BaseModel):
    """
    全スタッフ・全日程のシフト割り当て(二次元リスト)結果を格納するコンテナ。
    """
  
    # List> members を再現
    # 外側のリストが「スタッフ」、内側のリストが「日付」に対応します
    members: list[list[ShiftLevel]] = Field(
        default_factory=list,
        description="シフト割り当ての二次元リスト。例: members[スタッフIndex][日付Index]"
    )

class RowConstraint(BaseModel):
    """
    行制約(勤務パターンの禁止、回数、勤務時間など)を定義する詳細モデルです。
    """
    # 基本情報
    name: str = Field(default="RowConstraint", frozen=True)
    def_name: str = Field(..., description="行制約名(必須)。")
    use: bool = Field(default=True, description="有効フラグ。def_nameが空の場合はFalseになります。")
    phase_mode: bool = Field(default=False, description="フェーズモードフラグ。")
    equal_count_mode: bool = Field(default=False, description="同数カウントモードフラグ。")

    # 日付・スタッフ属性
    primary_day_type: str = Field(default="", description="date_object または date_aggregate の名称。必須")
    first_day_type: str = Field(default="", description="パターンの開始日条件。")
    last_day_type: str = Field(default="", description="パターンの終了日条件。")
    staff_property_type: str = Field(default="", description="対象となるスタッフ属性。")

    # パターン定義
    pattern: list[ShiftLevel] = Field(default_factory=list, description="シフトパターンのリスト。必須")
    inv_pattern: list[bool] = Field(default_factory=list, description="patternに対応するNOTリスト。必須")
    shift_pattern: list[ShiftLevel] = Field(default_factory=list, description="phase mode用シフトパターン。")
    shift_inv_pattern: list[bool] = Field(default_factory=list, description="phase mode用NOTリスト。")

    # 同数カウントモード用
    intercept: int = Field(default=0, description="数値オフセット。")
    A_daytypes: list[str] = Field(default_factory=list, description="Aグループの曜日タイプ。")
    B_daytypes: list[str] = Field(default_factory=list, description="Bグループの曜日タイプ。")

    # 制約の詳細設定
    constraint_type: int = Field(default=0, description="0:シフト禁止, 1:回数, 2:勤務時間, 3:Phase, 4:回数比較 必須")
    max_value: int = Field(default=-1)
    min_value: int = Field(default=-1)
    max_double: float = Field(default=-1.0)
    min_double: float = Field(default=-1.0)
    max_staff_property: str = Field(default="")
    min_staff_property: str = Field(default="")
    soft_constraint_level: int = Field(default=0, description="0:ハード, 1-7:ソフトレベル 必須")

    # バリデータ: C#コンストラクタのロジックを再現
    @field_validator("def_name", mode="before")
    @classmethod
    def trim_name(cls, v: str) -> str:
        return v.strip() if isinstance(v, str) else v

    @model_validator(mode="after")
    def apply_constructor_logic(self) -> "RowConstraint":
        # def_nameが空なら自動的に無効化
        if not self.def_name:
            self.use = False
        
        # equal_count_mode が True の場合の初期設定
        if self.equal_count_mode:
            self.constraint_type = 4
            self.intercept = 0
            if not self.A_daytypes:
                self.A_daytypes = [""]
            if not self.B_daytypes:
                self.B_daytypes = [""]
        
        return self

class RowConstraintsGroup(BaseModel):
    """
    行制約のグループ定義です。
    C#のコンストラクタ row_constraints_group(def_name_, phase_mode_, equal_count_mode_) の仕様を反映しています。
    """
    def_name: str = Field(
        ..., 
        description="制約グループの定義名(必須)。"
    )
    
    phase_mode: bool = Field(
        default=False,
        description="フェーズモード。シフト勤務表では常にFalse、タスク勤務表ではGUI制御に使用します。"
    )
    
    equal_count_mode: bool = Field(
        default=False,
        description="回数比較モード(2023年3月追加)。オンにすると回数比較専用のGUIになります。"
    )

    name: str = Field(
        default="RowConstraintsGroup",
        description="オブジェクト名"
    )

    use: bool = Field(
        default=True,
        description="有効(true)か無効か。無効ならソルバは参照しません。"
    )

    members: list[RowConstraint] = Field(
        default_factory=list,
        description="このグループに属する個別の行制約(RowConstraint)のリスト"
    )

    @field_validator("def_name", mode="before")
    @classmethod
    def trim_def_name(cls, v: str) -> str:
        """C#のTrim()処理を再現"""
        if isinstance(v, str):
            return v.strip()
        return v

class RowConstraints(BaseModel):
    """
    全ての行制約グループを一括管理するクラスです。
    """
    name: str = Field(
        default="RowConstraints",
        frozen=True,
        description="オブジェクト名。常に 'RowConstraints' 固定です。"
    )

    members: list[RowConstraintsGroup] = Field(
        default_factory=list,
        description="行制約グループ (RowConstraintsGroup) のリスト"
    )

class ColumnConstraint(BaseModel):
    """
    列制約(日付ごとの必要人数、勤務時間、シフト禁止・強制など)を定義するモデルです。
    """
    # 基本情報
    name: str = Field(default="ColumnConstraint", frozen=True)
    def_name: str = Field(..., description="列制約名(必須)。前後の空白は自動的に削除されます。")
    use: bool = Field(default=True, description="有効フラグ。名前が空の場合は自動的にFalseになります。")

    # シフト・タスク・日付・スタッフ属性
    shift_type: str = Field(default="", description="shift_object または shift_aggregate の名称。")
    shift_at_phase: str = Field(default="", description="タスク勤務表フェーズ用シフト(2023年1月4日追加)。")
    task_type: str = Field(default="", description="タスクオブジェクトまたはタスク集合の名称(タスク勤務表用)。")
    primary_day_type: str = Field(default="", description="date_object または date_aggregate の名称。")
    staff_property_type: str = Field(default="", description="スタッフ属性(group_property 等)の名称。")

    # 制約タイプ
    constraint_type: int = Field(
        default=0, 
        description="0:シフト禁止, 1:max-min, 2:シフト強制, 3:勤務時間max-min, 4:整数計数"
    )

    # タスク勤務表用 Daily制約テーブルプロパティ
    dayphase_vec_max: str = Field(default="", description="Daily制約テーブル(Max)に対応するプロパティ。")
    dayphase_vec_min: str = Field(default="", description="Daily制約テーブル(Min)に対応するプロパティ。")

    # 数値制約(整数・実数)
    max_value: int = Field(default=-1, description="max-minの場合の最大値。設定しない場合は負の値。")
    min_value: int = Field(default=-1, description="max-minの場合の最小値。設定しない場合は負の値。")
    max_double: float = Field(default=-1.0, description="勤務時間max。設定しない場合は負の値。")
    min_double: float = Field(default=-1.0, description="勤務時間min。設定しない場合は負の値。")

    # ソフト制約レベル
    soft_constraint_level: int = Field(
        default=0, 
        description="ソフト制約レベル。0:ハード, 1:最弱, 7:最強。"
    )
    soft_constraint_level_max: int = Field(
        default=0, 
        description="Max専用のソフト制約レベル。min-maxでレベルを分けたい場合に使用。"
    )

    # バリデータ: C#コンストラクタのロジックを再現
    @field_validator("def_name", mode="before")
    @classmethod
    def trim_def_name(cls, v: str) -> str:
        """C#のTrim()を再現"""
        return v.strip() if isinstance(v, str) else v

    @model_validator(mode="after")
    def validate_use_state(self) -> "ColumnConstraint":
        """名前が空なら有効フラグをFalseにするロジックを再現"""
        if not self.def_name:
            self.use = False
        return self

class ColumnConstraintsGroup(BaseModel):
    """
    列制約のグループ定義です。
    C#のコンストラクタ column_constraints_group(def_name_, phase_mode_) の仕様を反映しています。
    """
    def_name: str = Field(
        ..., 
        description="制約グループの定義名(必須)。"
    )
    
    phase_mode: bool = Field(
        default=False,
        description="フェーズモード。シフト勤務表では常にFalse、タスク勤務表ではGUI制御に使用します。"
    )

    name: str = Field(
        default="ColumnConstraintsGroup",
        frozen=True,
        description="オブジェクト名"
    )

    use: bool = Field(
        default=True,
        description="有効(true)か無効か。def_nameが空文字の場合はFalseとして扱われます。"
    )

    members: list[ColumnConstraint] = Field(
        default_factory=list,
        description="このグループに属する個別の列制約(ColumnConstraint)のリスト"
    )

    # Color型はSystem.Text.Jsonで直接扱えないため、文字列(HTMLカラーコード)として定義
    #tab_color: str = Field(
    #    default="#FFFFFF", 
    #    description="GUIのタブカラー。HTMLカラーコード(例: #FFFFFF)または色名(White)で指定。"
    #)

    @field_validator("def_name", mode="before")
    @classmethod
    def validate_and_trim_def_name(cls, v: str) -> str:
        """C#のTrim()処理および空文字時のuse=falseロジックの再現"""
        if isinstance(v, str):
            trimmed = v.strip()
            return trimmed
        return v

    @field_validator("use", mode="after")
    @classmethod
    def set_use_by_def_name(cls, v: bool, info) -> bool:
        """def_nameが空なら強制的にuseをFalseにするロジックを再現"""
        if not info.data.get("def_name"):
            return False
        return v

class ColumnConstraints(BaseModel):
    """
    全ての列制約グループを一括管理するクラスです。
    """
    name: str = Field(
        default="ColumnConstraints",
        frozen=True,
        description="オブジェクト名。常に 'ColumnConstraints' 固定です。"
    )

    members: list[ColumnConstraintsGroup] = Field(
        default_factory=list,
        description="列制約グループ (ColumnConstraintsGroup) のリスト"
    )
class PairConstraint(BaseModel):
    """
    2つの要素(AとB)の間の相関制約(禁止、ならば、比較など)を定義するモデルです。
    """
    name: str = Field(default="PairConstraint", frozen=True)
    def_name: str = Field(default="", description="制約名(一意)")
    use: bool = Field(default=True, description="有効フラグ")
    
    # モード設定
    phase_mode: bool = Field(default=False, description="タスク勤務表モードフラグ")
    equal_count_mode: bool = Field(default=False, description="リニア不等式(数値比較)モードフラグ")

    # 対象 A
    shift_typeA: str = Field(default="", description="Aのシフト/タスク集合名")
    staff_property_typeA: str = Field(default="", description="Aのスタッフ属性/集合名")
    operatorA: str = Field(default="", description="通常時: OR, AND等 / 数値比較時: A係数(数字またはBFULL)")

    # 対象 B
    shift_typeB: str = Field(default="", description="Bのシフト/タスク集合名")
    staff_property_typeB: str = Field(default="", description="Bのスタッフ属性/集合名")
    operatorB: str = Field(default="", description="通常時: OR, AND等 / 数値比較時: B係数(数字またはAFULL)")

    # 共通条件
    day_type: str = Field(default="", description="対象となる日付集合名")
    dayoffset: int = Field(default=0, description="Aに対するBの日付オフセット")

    # 制約の詳細
    constraint_type: int = Field(
        default=0, 
        description="0:!(A&&B)禁止, 1:AならばB, 2:A==B, 3:A>=B, 4:A<=B"
    )
    soft_constraint_level: int = Field(default=0, description="0:ハード, 1-7:ソフトレベル")

    # タスク・フェーズ関連 (2024年1月更新対応)
    phase_str: str = Field(default="", description="フェーズ識別子 (例: ph0)")
    shift_typeA_at_phase: str = Field(default="", description="フェーズ時のAシフト参照")
    shift_typeB_at_phase: str = Field(default="", description="フェーズ時のBシフト参照")
    
    # 数値オフセット関連
    numerical_offset: int = Field(default=0, description="数値比較モード時の定数オフセット")
    numerical_offset_string: str = Field(default="", description="数値オフセットの文字列表現 (2024年1月追加)")

class PairConstraintsGroup(BaseModel):
    """
    ペア制約のグループ。GUIのタブ表示単位に対応します。
    """
    name: str = Field(default="PairConstraintsGroup", frozen=True)
    def_name: str = Field(default="", description="グループ名")
    use: bool = Field(default=True)
    equal_count_mode: bool = Field(default=False)
    phase_mode: bool = Field(default=False)
    members: list[PairConstraint] = Field(default_factory=list)
    
    # System.Drawing.Colorの互換性のため文字列として保持
    #tab_color: str = Field(default="White", description="タブの背景色 (ColorTranslator形式)")

class PairConstraints(BaseModel):
    """
    全てのペア制約グループを管理するトップレベルクラスです。
    """
    name: str = Field(default="PairConstraints", frozen=True)
    members: list[PairConstraintsGroup] = Field(default_factory=list)

    @field_validator("members", mode="before")
    @classmethod
    def ensure_list(cls, v):
        return v if isinstance(v, list) else []

class SwInt(BaseModel):
    """
    ソルバの制約許容数や重み(ウェイト)を定義するモデルです。
    """
    use: bool = Field(
        default=False, 
        description="有効(true)か無効(false)かを示すフラグ。"
    )
    
    value: int = Field(
        default=0, 
        description="1制約あたりのCARDINALS許容エラー数(スラック変数)。"
    )
    
    total_max_errors: int = Field(
        default=0, 
        description="現在使用されていません(0: 最小化を意味します)。"
    )
    
    weight: int = Field(
        default=1, 
        description=(
            "ソフト制約の重み(自然数、1-10推奨)。"
            "※ソフトレベル(1-7のインデックス)とは異なる「実際の重み係数」です。"
        )
    )
 
class SolvingParameters(BaseModel):
    """
    ソルバの求解パラメータ、ソフト制約レベルごとの重み設定、
    および外部Python制約などを管理するクラスです。
    """
    name: str = Field(
        default="SolvingParameters",
        frozen=True,
        description="オブジェクト名。常に 'SolvingParameters' 固定です。"
    )

    # Dictionary> solving_map を再現
    # 外側のキー(int)はソフト制約レベル(1-7など)、内側のキー(str)は制約名
    solving_map: dict[int, dict[str, SwInt]] = Field(
        default_factory=dict,
        description="ソフトレベルおよび制約名ごとの詳細設定マップ。"
    )

    # Pythonによる制約記述
    external_constraint_python: str = Field(
        default="",
        description="Pythonスクリプトによる外部制約記述。"
    )

    # プロジェクトコメント
    comment: str = Field(
        default="",
        description="プロジェクトの変更履歴やメモ用コメント。"
    )

    # 求解パラメータ Dictionary
    parameters: dict[str, int] = Field(
        default_factory=dict,
        description="ソルバの各種数値パラメータ(イテレーション回数等)。"
    )

    # 現在使用されていない、または内部用のフィールド
    aws_parameters: dict[str, str] = Field(
        default_factory=dict,
        description="AWS関連パラメータ(現在未使用)。"
    )
    external_constraint: str = Field(
        default="",
        description="言語制約記述(現在未使用)。"
    )
    python_property_file: str = Field(
        default="",
        description="内部用プロパティファイルパス(編集不可)。"
    )
    python_property_file_post: str = Field(
        default="",
        description="内部用プロパティファイルパス(後処理、編集不可)。"
    )

class ApiResponse(BaseModel):
    """
    WinForms API からの実行結果レスポンスを定義するモデルです。
    """
    # status の選択肢を Literal で定義(complete, rejected, accepted, error, unknown_api)
    status: Literal["complete", "rejected", "accepted", "error", "unknown_api"] = Field(
        ..., 
        description="APIリクエストの処理ステータス。"
    )
    
    solver_status: Optional[str] = Field(
        default=None, 
        description="ソルバの内部状態(例: Optimal, Feasible, Timeout 等)。"
    )
    
    error_reason: Optional[str] = Field(
        default=None, 
        description="エラーが発生した場合の理由、または詳細メッセージ。"
    )

    value: Optional[str] = Field(
        default=None, 
        description="APIから返される主要な戻り値(JSON文字列や特定の識別子など)。"
    )

    progress: int = Field(
        default=0, 
        description="現在の処理進捗状況。"
    )

    total: int = Field(
        default=100, 
        description="全体の処理数。デフォルト値は100です。"
    )

    def get_value_as(self, model_class: Type[T]) -> Optional[T]:
        if not self.value:
            return None
        try:
            return model_class.model_validate_json(self.value)
        except ValidationError:
            return None

#PIPE_NAME =f'\\\\.\\pipe\\winforms_pipe'# r'\\.\pipe\winforms_pipe'
PIPE_NAME = "winforms_pipe"

async def call_winforms_api(api_name: str, payload: dict, ctx=None)->ApiResponse:
    pipe = AsyncNamedPipeClient(PIPE_NAME)

    # 1. パイプ接続
    try:
        await pipe.connect()
    except PipeNotConnectedError as e:
        return {"status": "pipe_error", "message": str(e)}

    # 2. JSON リクエスト送信
    request = json.dumps({"api": api_name, "payload": payload})
    try:
        print("Writing api_name=",api_name)
        await pipe.write_line(request)
    except PipeNotConnectedError as e:

        await pipe.close()
        return {"status": "write_error", "message": str(e)}

    # 3. 継続的な読み取りループ
    try:
        print("Reading api_name=",api_name)

        while True:
            response_line = await pipe.read_line()
            if not response_line:
                break

            try:
                data = json.loads(response_line)
            except json.JSONDecodeError:
                continue # 不正な行はスキップして次を待つ

            status = data.get("status")
            solver_status = data.get("solver_status")
            error_reason=data.get("error_reason")
            value=data.get("value")
            print(status)
            if solver_status == "Feasible_Solution":
                print("実行可能解が見つかりました!")
          
            elif solver_status == "Solving":
       # 進捗報告時に詳細ステータスを添える
                if ctx:
                    await ctx.report_progress(progress=data.get("progress"), total=100)
                    print(f"ステータス: {solver_status}")
            # A. 完了時の処理
            if status == "complete":
                if value is not None :
                    print("value len=",len(value))
                # 最後に100%を報告(任意)
                if ctx:
                    await ctx.report_progress(progress=100, total=100)
                # 辞書を ApiResponse モデルに変換して返す
                return ApiResponse.model_validate(data)

		

            # B. エラー時の処理
            if status in ["error", "rejected", "unknown_api"]:#if status == "error" or status=="rejected" :

                if status=="unknown_api":
                    print("サポートしていないAPIです。")
                if status=="rejected":
                    print("APIが拒否されました")
                if solver_status=="Infeasible_Solution":
                    print("解が見つかりませんでした(実行不能)。")
                elif solver_status=="ProjectFile_NOT_Specified":
                    print("プロジェクトファイルが保存されていません。スケジュールナース→ファイルメニュー→名前をを付けて保存してください。")
                elif solver_status=="Compile_Failed":
                    print("プロジェクトファイルのコンパイルに失敗しました。")
                elif solver_status=="Solving_Engine_Invoking_Failure":
                    print("求解エンジンの起動に失敗しました。")
                elif solver_status=="Solving_Killed_by_User":
                    print("求解がユーザにより中止させられました")
                return  ApiResponse.model_validate(data)#辞書を ApiResponse モデルに変換して返す

            # C. それ以外(進捗報告など)
            if ctx:
                # WinForms側のJSONに progress/total があればそれを使う
                # 無ければ現在のステップとして報告
                p = data.get("progress", 0)
                t = data.get("total", 100)
                
                # Contextを通じてMCPクライアントに通知
                await ctx.report_progress(progress=p, total=t)
                
                # ログ出力(任意)
                # print(f"進捗報告中: {p}/{t}")

    except PipeNotConnectedError as e:
        return {"status": "read_error", "message": str(e)}
    #finally:
    #    await pipe.close()

    return {"status": "unexpected_termination"}

# -------------------------------
# WinForms に API を発行する関数
# -------------------------------





# FastMCP サーバー作成
mcp = FastMCP("local-mcp")
#mcp = FastMCP(name="私の最初のMCPサーバー")


# -------------------------------
# MCP ツール → WinForms API 呼び出し
# -------------------------------



@mcp.tool()
async def get_shift_definitions()-> ShiftObjects| None:
    """
    スケジュールナースから現在の全てのシフト定義(名前、色、ラベル、サブラベル等)を取得します。
    """
    api_res= await call_winforms_api("get_shift_objects", {} )
    #print("api_res=",api_res)
    if api_res.status=="complete":
        SO=api_res.get_value_as(ShiftObjects)
        #print("SO=",SO)
        return SO
    else:
        return None


@mcp.tool()
async def update_shift_definitions(data:ShiftObjects)-> ApiResponse:
    """
    スケジュールナースから現在の全てのシフト定義(名前、色、ラベル、サブラベル等)を取得します。
    """
    text = data.model_dump_json()
    payload={"text":text}

    return  await call_winforms_api("update_shift_objects",payload )
    
@mcp.tool()
async def get_shift_aggregates_definitions()-> ShiftAggregates | None:
    """
    スケジュールナースから現在の全てのシフト集合定義(名前、色、ラベル、サブラベル等)を取得します。
    """
    api_res= await call_winforms_api("get_shift_aggreggates", {} )
    
    if api_res.status=="complete":
        SA=api_res.get_value_as(ShiftAggregates)
        #print("SO=",SO)
        return SA
    else:
        return None

@mcp.tool()
async def get_shift_solution()-> Solution | None:
    """
    スケジュールナースから現在の全てのシフト集合定義(名前、色、ラベル、サブラベル等)を取得します。戻り値は、Schedulesクラスと同じ2次元形式です。
    """
    api_res= await call_winforms_api("get_shift_solution", {} )
    if api_res.status=="complete":
        So=api_res.get_value_as(Solution)
        #print("SO=",SO)
        return So
    else:
        return None

@mcp.tool()
async def get_date_objects()-> DateObjects | None:
    """
    スケジュールナースから現在の全てのDateObjectを取得します。
    """
    api_res= await call_winforms_api("get_date_objects", {} )
    if api_res.status=="complete":
        So=api_res.get_value_as(DateObjects)
        #print("SO=",SO)
        return So
    else:
        return None

@mcp.tool()
async def get_date_aggregates()-> DateAggregates | None:
    """
    スケジュールナースから現在の全てのDateAggregateを取得します。
    """
    api_res=await call_winforms_api("get_date_aggregates", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res.value)
        So=api_res.get_value_as(DateAggregates)
        #print("SO=",SO)
        return So
    else:
        return None

@mcp.tool()
async def get_group_properties()-> GroupProperties | None:
    """
    スケジュールナースから現在の全てのGroupPropertyを取得します。
    """
    api_res=await call_winforms_api("get_group_properties", {} )
    if api_res.status=="complete":
        Gp=api_res.get_value_as(GroupProperties)
        return Gp
    else:
        return None

@mcp.tool()
async def get_group_aggregates()-> GroupAggregates| None:
    """
    スケジュールナースから現在の全てのGroupAggregateを取得します。
    """
    api_res= await call_winforms_api("get_group_aggregates", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Gp=api_res.get_value_as(GroupAggregates)
        return Gp
    else:
        return None

@mcp.tool()
async def get_staff_properties()-> StaffProperties:
    """
    スケジュールナースから現在の全てのStaffPropertyを取得します。
    """
    api_res= await call_winforms_api("get_staff_properties", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Sp=api_res.get_value_as(StaffProperties)
        return Sp
    else:
        return None

@mcp.tool()
async def get_shift_schedules()->ShiftSchedules| None:
    """
    スケジュールナースから現在のSchedulesを取得します。
    """
    api_res= await call_winforms_api("get_shift_schedules", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Sc=api_res.get_value_as(ShiftSchedules)
        return Sc
    else:
        return None

@mcp.tool()
async def get_row_constraints()-> RowConstraints| None:
    """
    スケジュールナースから現在の全てのRowConstraintを取得します。
    """
    api_res=await call_winforms_api("get_row_constraints", {} )
    #print("api_res=",api_res)
    
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Sc=api_res.get_value_as(RowConstraints)
        return Sc
    else:
        return None
   
@mcp.tool()
async def get_column_constraints()-> ColumnConstraints|None:
    """
    スケジュールナースから現在の全てのColumnConstraintを取得します。
    """
    api_res=await call_winforms_api("get_column_constraints", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Cc=api_res.get_value_as(ColumnConstraints)
        return Cc
    else:
        return None
    


@mcp.tool()
async def get_pair_constraints()-> PairConstraints|None:
    """
    スケジュールナースから現在の全てのColumnConstraintを取得します。
    """
    api_res= await call_winforms_api("get_pair_constraints", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Pc=api_res.get_value_as(PairConstraints)
        return Pc
    else:
        return None

@mcp.tool()
async def get_solving_parameters()-> SolvingParameters|None:
    """
    スケジュールナースから現在のSolvingParametersを取得します。
    """
    api_res= await call_winforms_api("get_solving_parameters", {} )
    if api_res.status=="complete":
        #print("api_res=",api_res)
        Spp=api_res.get_value_as(SolvingParameters)
        return Spp
    else:
        return None


@mcp.tool()
async def update_date_objects(data: DateObjects) -> ApiResponse:
    """
    複数の日付定義(DateObject)を一括で更新します。
    """
    # C#側へ送信するための辞書化
    payload = data.model_dump_json()
    
    # WinForms APIを呼び出してデータを送信
    return  await call_winforms_api("update_date_objects", payload)
    
   


@mcp.tool()
async def update_date_aggregates(data: DateAggregates) -> ApiResponse:
    """
    日付集合(DateAggregates)の全リストを更新します。
    """
    # Pydanticオブジェクトを辞書(JSON互換)に変換
    # これにより members 内の DateAggregate もすべて辞書化されます
    payload = data.model_dump_json()
    
    # WinForms APIの呼び出し(例)
    return await call_winforms_api("update_date_aggregates", payload)
   

 


@mcp.tool()
async def update_staff_group_properties(data: GroupProperties) -> str:
    """
    スタッフの全属性定義(GroupProperties)を一括更新します。
    """
    # 辞書形式に変換(ネストされた GroupProperty もすべて変換されます)
    payload = data.model_dump_json()
    
    # WinForms API への送信(例)
    return  await call_winforms_api("update_group_properties", payload)
    
    

@mcp.tool()
async def update_staff_group_aggregates(data: GroupAggregates) -> str:
    """
    スタッフ属性の集合演算定義(GroupAggregates)を一括更新します。
    """
    # 辞書形式に変換(WinForms API 送信用)
    payload = data.model_dump_json()
    
     
    return await call_winforms_api("update_group_aggregates", payload)
    
    



@mcp.tool()
async def update_staff_properties(data: StaffProperties) -> str:
    """
    全スタッフの属性情報(担当可能シフト、属性マップ等)を一括更新します。
    """
    # WinForms側へ送信するために辞書形式に変換
    payload = data.model_dump_json()
    
     
    return await call_winforms_api("update_staff_properties", payload)
    
    

@mcp.tool()
async def update_shift_schedules(data: ShiftSchedules) -> str:
    """
    勤務表の全予定データ(二次元リスト)を一括更新します。
    """
    # 再帰的に辞書形式に変換(ShiftLevel オブジェクトもすべて辞書化される)
    payload = data.model_dump_json()
    
    # WinForms API への送信
    return await call_winforms_api("update_schedules", payload)
    
    


@mcp.tool()
async def update_row_constraints(data: RowConstraints) -> str:
    """
    行制約(RowConstraints)の設定をすべて更新します。
    """
    # 辞書形式に変換(ネストされたリストやオブジェクトもすべて変換されます)
    payload = data.model_dump_json()
    
    
    return  await call_winforms_api("update_row_constraints", payload)
    
    

    

@mcp.tool()
async def update_column_constraints(data: ColumnConstraints) -> str:
    """
    列制約(ColumnConstraints)の設定をすべて一括更新します。
    """
    # model_dump_json() により C# 側が受信可能な JSON 形式の辞書に変換
    payload = data.model_dump_json()
    
    # 注意: C#側でColor.Whiteを復元できるよう、tab_colorには"White"などの文字列が入ります。
    
    return  await call_winforms_api("update_pair_constraints", payload)
    
    

@mcp.tool()
async def update_pair_constraints(data: PairConstraints) -> str:
    """
    ペア制約(PairConstraints)の設定をすべて一括更新します。
    """
    # model_dump_json() により C# 側が受信可能な JSON 形式の辞書に変換
    payload = data.model_dump_json()
    
    # 注意: C#側でColor.Whiteを復元できるよう、tab_colorには"White"などの文字列が入ります。
    
    return  await call_winforms_api("update_column_constraints", payload)

mcp.tool()
async def update_solving_parameters(data: SolvingParameters) -> str:
    """
    求解パラメータ(SolvingParameters)の設定をすべて一括更新します。
    """
    # model_dump_json() により C# 側が受信可能な JSON 形式の辞書に変換
    payload = data.model_dump_json()
    
    
    return  await call_winforms_api("update_solving_parameters", payload)

@mcp.tool()
async def load_project(filepath: str) ->ApiResponse:
    """スケジュールナースのプロジェクトファイルをスケジュールナースにロードする MCP ツール"""
    return await call_winforms_api("load_project", {"text": filepath})
  





@mcp.tool()
async def solve( payload: dict,ctx: Context= None) -> ApiResponse:
    if ctx==None:
        ctx = MockContext() 
    return await call_winforms_api("solve", payload, ctx)





# -------------------------------
# MCP サーバ起動
# -------------------------------
def start_server():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    loop.create_task(python_stream())
    mcp.run(port=5000)


# 1. MCPのContextを模倣するMockクラス
class MockContext:
    async def report_progress(self, progress: float, total: float):
        percentage = (progress / total) * 100
        print(f"【MCP進捗通知】 現在の進捗: {percentage:.1f}% ({progress}/{total})")


   
async def call_tool(client,command,payload):
    
    print("=== ",command," テスト開始 ===")
    result = await client.call_tool(command, payload)
    print("\n[Step 2] 最終結果を受信しました:")
    print(result.content[0].text)#.dumps(result, indent=2, ensure_ascii=False))
    print("=== ",command," テスト終了 ===\n")
    return result.content[0].text



async def api_test(api1,api2,obj):
    api_res= await call_winforms_api(api1, {} )
    if api_res.status!="complete":
        return False

    #print("api_res=",api_res)
    data=api_res.get_value_as(obj)
    #print("??data=",data)
    text = data.model_dump_json()# model_dump_json()
    #print("text=",text)
    payload={"text":text}
    api_res= await call_winforms_api(api2,payload)##{} )
    #print("api_res after update shift aggregates",api_res)
    api_res= await call_winforms_api(api1, {} )
    if api_res.status!="complete":
        print("Failed === api_test  "+api1+" "+api2+"テスト終了===\n")
        return False
    data2=api_res.get_value_as(obj)
    if data !=data2:
        print("Failed === api_test  "+api1+" "+api2+"テスト終了===\n")
        return False
    print("Success === api_test  "+api1+" "+api2+"テスト終了===\n")
    
           
    return True

async def test_server_locally():
    print("\n--- ローカルサーバーのテスト ---")
    # クライアントをサーバーオブジェクトに直接ポイントします
    client = Client(mcp) 
    ctx = MockContext()

    async with client:
       
        
        #filepath=r"C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\標準パターン\3交代深準標準パターン.nurse3"#'C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\プロジェクトサンプル\チュートリアル3A.nurse3'
        filepath=r"C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\プロジェクトサンプル\チュートリアル3A.nurse3"
        #filepath=r"C:\Users\sugaw\Documents\FA\sc3\release_check\2025.10_0930ver2.nurse3"
        payload={"filepath":filepath}
        
        await call_tool(client,"load_project",payload)
        
        # テスト用ペイロード ctxは、シリアライズできないので、そのまま載せられない
        payload = {
        "payload": {},
   
        }
        await call_tool(client,"solve",payload)

        #SO_str=await call_tool(client,"get_shift_definitions", payload)
        #deliv_from_dict = Delivery.model_validate(deliv_dict)
        #SO = ShiftObjects.model_validate_json(SO_str)
        #print("*SO=",SO)
        #text=SO.model_dump_json()
        #print("*S0_str=",text)
        #payload={"text":text}
        #api_res=await call_winforms_api("update_shift_objects",payload )
        #print(api_res)
        #SO_str2=await call_tool(client,"get_shift_definitions", payload)
        #compare_str(SO_str,SO_str2)
        #deliv_dict = deliv.model_dump_json()  # ◎
        # >> {'timestamp': datetime.datetime(2020, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc), 'products': [{'name': 'sushi', 'price': 1000}]}
        #deliv_json = deliv.model_dump_json()  #
        """
        SA=await call_tool(client,"get_shift_aggregates_definitions", payload)
        print("SA=",SA)
        So=await call_tool(client,"get_shift_solution", payload)
        print("Solution=",So)
        DO=await call_tool(client,"get_date_objects", payload)
        print("DateObjects=",DO)

        DA=await call_tool(client,"get_date_aggregates", payload)
        print("DateAggregates=",DA)

        GP=await call_tool(client,"get_group_properties", payload)
        print("Groupp Properties=",GP)

        GA=await call_tool(client,"get_group_aggregates", payload)
        print("Group Aggregates=",GA)

        SP=await call_tool(client,"get_staff_properties", payload)
        print("Staff Properties=",SP)

        SC=await call_tool(client,"get_shift_schedules", payload)
        print("Shit Schedules=",SC)
        RC=await call_tool(client,"get_row_constraints", payload)
        print("Row Constraints=",RC)
        CC=await call_tool(client,"get_column_constraints", payload)
        print("Column Constraints=",CC)
        PC=await call_tool(client,"get_pair_constraints", payload)
        print("Pair Constraints=",PC)
        SPP=await call_tool(client,"get_solving_parameters",payload)
        print("Solving Parameter=",SPP)

        print("update_shift_objects Test")
        api_res=  await call_winforms_api("update_shifts",SO.model_dump_json())"""
        
        
async def test_server_api():
        for i in range(1):
            print("****Loop=",i)    
            #filepath=r"C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\標準パターン\3交代深準標準パターン.nurse3"#'C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\プロジェクトサンプル\チュートリアル3A.nurse3'
            #filepath=r"C:\Users\sugaw\Documents\FA\sc3\プロジェクトサンプル\プロジェクトサンプル\チュートリアル3A.nurse3"
            #filepath=r"C:\Users\sugaw\Documents\FA\sc3\release_check\2025.10_0930ver2.nurse3"
            #filepath=r"C:\Users\sugaw\Documents\FA\sc3\寒河江\8月V1Color.nurse3"
            filepath=r"C:\Users\sugaw\Documents\FA\sc3\release_check3\神戸大0405_5_V4.nurse3"
            payload={"text":filepath}
        
        
            api_res= await call_winforms_api("load_project",payload )
        # テスト用ペイロード ctxは、シリアライズできないので、そのまま載せられない
            payload = {
            "payload": {},
   
            }
            api_res=await call_winforms_api("solve",payload)
            await api_test("get_shift_objects","update_shift_objects",ShiftObjects)
            await api_test("get_shift_aggregates","update_shift_aggregates",ShiftAggregates)
            await api_test("get_date_objects","update_date_objects",DateObjects)
            await api_test("get_date_aggregates","update_date_aggregates",DateAggregates)
            await api_test("get_group_properties","update_group_properties",GroupProperties)
            await api_test("get_group_aggregates","update_group_aggregates",GroupAggregates)
            await api_test("get_staff_properties","update_staff_properties",StaffProperties)
            await api_test("get_shift_schedules","update_shift_schedules",ShiftSchedules)
            await api_test("get_row_constraints","update_row_constraints",RowConstraints)
            await api_test("get_column_constraints","update_column_constraints",ColumnConstraints)
            await api_test("get_pair_constraints","update_pair_constraints",PairConstraints)
            await api_test("get_solving_parameters","update_solving_parameters",SolvingParameters)


async def test_solve_locally():
    ctx = MockContext()
    # 以前作成した継続読み取り版の call_winforms_api を呼び出し
    # ※ WinFormsアプリを起動した状態で実行してください
    filepath=r'C:\Users\sugaw\Documents\FA\sc3\Project_Samples\プロジェクトサンプル\チュートリアル3.nurse3'
    payload={"text":filepath}
    result = await call_winforms_api("load_project", payload, ctx=ctx)
    print("\n[Step 2] 最終結果を受信しました:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    print("=== load_projectテスト終了 ===")


    
    print("\n[Step 1] WinFormsへ 'solve' 命令を送信します...")
    
    # 以前作成した継続読み取り版の call_winforms_api を呼び出し
    # ※ WinFormsアプリを起動した状態で実行してください

     
    # テスト用ペイロード
    payload = {
        "mode": "normal",
        "options": {"max_iterations": 1000}
    }
    result = await call_winforms_api("solve", payload, ctx=ctx)
    print("\n[Step 2] 最終結果を受信しました:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    print("=== solveテスト終了 ===")

    result = await call_winforms_api("solve", payload, ctx=ctx)
    print("\n[Step 2] 最終結果を受信しました:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    print("=== solveテスト終了 ===")
    
    
    
    print("=== Solve API ローカルテスト開始 ===")
    
    print("\n[Step 1] WinFormsへ 'get_shift_objects' 命令を送信します...")
    result=await call_winforms_api("get_shift_objects",{}, ctx=ctx);
    print("\n[Step 2] 最終結果を受信しました:")
    print(json.dumps(result, indent=2, ensure_ascii=False))
    print("=== get_shift_objectsテスト終了 ===")
    print("\n[Step 1] WinFormsへ 'load_project' 命令を送信します...")
    
    

if __name__ == "__main__":
    try:
        asyncio.run(test_server_api())#test_server_locally())#Test())#test_solve_locally())# エントリポイントは一つにする
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(f"エラーが発生しました: {e}")


0 件のコメント:

コメントを投稿