グラフ生成
# -*- coding:utf-8 -*-
from datetime import datetime, timedelta
from enum import Enum
from typing import Tuple
class TimeUnit(Enum):
"""
時間の単位を定義する列挙型。
Attributes:
SECOND: 秒単位。
MILLISECOND: ミリ秒単位。
"""
SECOND = 'second'
MILLISECOND = 'millisecond'
class RangeData:
"""
特定の時間範囲とその範囲内に含まれる要素数を保持するデータクラス。
Attributes:
start (datetime): 期間の開始日時 (この時刻を含む)。
end (datetime): 期間の終了日時 (この時刻を含まない)。
count (int): この期間内に含まれる要素の数。
"""
def __init__(self, start: datetime, end: datetime, count: int = 0):
self.start = start
self.end = end
self.count = count
def count_up(self) -> None:
"""カウンタを1増加させます。"""
self.count += 1
def __str__(self) -> str:
"""オブジェクトを文字列として表現します。デバッグ時に便利です。"""
return f"RangeData(start={self.start}, end={self.end}, count={self.count})"
class RangeCount:
"""
指定された期間を一定間隔のタイムスロットに分割し、各スロットのデータ数を計測するクラス。
計測期間全体を `timedelta` で区切った `RangeData` オブジェクトのリストを内部に保持し、
与えられた日時に基づいて対応するスロットのカウンタを増加させます。
"""
def __init__(self, measure_start: datetime, measure_end: datetime, time_unit: TimeUnit):
"""
RangeCountのインスタンスを初期化します。
Args:
measure_start (datetime): 計測全体の開始日時。
measure_end (datetime): 計測全体の終了日時。
time_unit (TimeUnit): 時間を分割する単位(秒またはミリ秒)。
"""
# 処理の粒度を揃えるため、マイクロ秒を0に丸めます。
sd = measure_start.replace(microsecond=0)
ed = measure_end.replace(microsecond=0)
# 指定された単位に基づいて、タイムスロットの間隔(delta)を設定します。
# 注意: 現在の実装では、どちらのTimeUnitを指定しても1秒間隔で分割されます。
# 1ミリ秒間隔を意図している場合、`timedelta(milliseconds=1)` に変更する必要があります。
delta = timedelta(seconds=1) if time_unit == TimeUnit.SECOND else timedelta(milliseconds=1)
# 指定された時間間隔でRangeDataオブジェクトを生成し、リストに格納します。
self.data_list = []
current_time = sd
while current_time < ed:
next_time = current_time + delta
self.data_list.append(RangeData(current_time, next_time))
current_time = next_time
def _binary_search(self, t: Tuple[datetime, datetime], range_count: bool) -> bool:
"""
【内部メソッド】二分探索を用いて、指定日時に対応する範囲を見つけてカウンタを増加させます。
このメソッドはソート済みの `data_list` に対して効率的に動作します。
Args:
t (Tuple[datetime, datetime]): 検索対象の期間 `(start_time, end_time)`。
range_count (bool): カウント方法を指定するフラグ。
- True: `t` の開始から終了までにまたがる全てのタイムスロットをカウントします。
- False: `t` の開始日時 `t[0]` が含まれる単一のタイムスロットのみをカウントします。
Returns:
bool: 対象のタイムスロットが見つかり、カウントアップに成功した場合は True、
見つからなかった場合は False を返します。
"""
low = 0
high = len(self.data_list) - 1
while low <= high:
mid = (low + high) // 2
current_range = self.data_list[mid]
# 検索対象の開始時刻 `t[0]` が、中央のタイムスロット内に存在するか判定
if current_range.start <= t[0] < current_range.end:
# `t[0]` を含むタイムスロットを発見
current_range.count_up()
# `range_count`がTrueの場合、後続のタイムスロットもチェック
if range_count:
# `t[0]`を含むスロットの次から、`t[1]`が終了するまでのスロットをカウントアップ
next_index = mid + 1
while (next_index < len(self.data_list)) and (self.data_list[next_index].start <= t[1]):
self.data_list[next_index].count_up()
next_index += 1
return True # 処理が完了したためループを抜ける
# `t[0]` が中央のタイムスロットより前にある場合、探索範囲を前半に絞る
elif t[0] < current_range.start:
high = mid - 1
# `t[0]` が中央のタイムスロットより後ろにある場合、探索範囲を後半に絞る
else: # t[0] >= current_range.end
low = mid + 1
return False
def find_and_increment(self, t: Tuple[datetime, datetime]) -> bool:
"""
指定されたタプルの開始日時 `t[0]` が含まれる「単一の」タイムスロットのカウンタを1増やします。
Args:
t (Tuple[datetime, datetime]): 検索対象の期間タプル `(start_time, end_time)`。
このメソッドでは主に `t[0]` が使用されます。
Returns:
bool: 対象が見つかり、カウントアップに成功した場合は True、見つからなかった場合は False。
"""
return self._binary_search(t, False)
def find_range_and_increment(self, t: Tuple[datetime, datetime]) -> bool:
"""
指定された期間 `t` (開始 `t[0]` から終了 `t[1]`) にまたがる「全ての」タイムスロットのカウンタを増やします。
Args:
t (Tuple[datetime, datetime]): 検索対象の期間タプル `(start_time, end_time)`。
Returns:
bool: 対象が見つかり、カウントアップに成功した場合は True、見つからなかった場合は False。
"""
return self._binary_search(t, True)
def display_data_list(self):
"""
保持している `data_list` の内容を整形して表示します。
デバッグや動作確認に利用できます。
"""
print("\n--- Data List Status ---")
if not self.data_list:
print("データリストは空です。")
print("------------------------")
return
# ヘッダーを表示し、各列の幅を固定して見やすくします。
print(f"{'Index':<7} | {'Start Time':<26} | {'End Time':<26} | {'Count'}")
print("-" * 70)
# 各データを表示します。
for i, data in enumerate(self.data_list):
start_str = data.start.strftime("%Y-%m-%d %H:%M:%S.%f")
end_str = data.end.strftime("%Y-%m-%d %H:%M:%S.%f")
print(f"{i:<7} | {start_str:<28} | {end_str:<28} | {data.count}")
print("------------------------\n")
ファイルユーティリティ
# -*- condig: utf-8 -*-
class DirectoryNotFoundError(OSError):
"""
指定されたパスにディレクトリが見つからない場合に送出される例外。
この例外は、パスが存在しない場合、またはパスがファイルであって
ディレクトリではない場合に発生することを意図しています。
"""
pass
# -*- condig: utf-8 -*-
class DirectoryNotFoundError(OSError):
"""
指定されたパスにディレクトリが見つからない場合に送出される例外。
この例外は、パスが存在しない場合、またはパスがファイルであって
ディレクトリではない場合に発生することを意図しています。
"""
pass
# -*- condig: utf-8 -*-
from pathlib import Path
from typing import Callable, Optional
import os
from directory_util import DirectoryNotFoundError
from file_filter import all
def check_directory_exists(directory_path: str) -> None:
"""
指定されたパスにディレクトリが存在することを保証します。
ディレクトリが存在しない場合、またはパスがファイルである場合は
`DirectoryNotFoundError` を送出します。
Args:
directory_path: 検証するディレクトリのパス。
Raises:
DirectoryNotFoundError: ディレクトリが存在しないか、パスがファイルの場合。
"""
path = Path(directory_path)
if not path.is_dir():
raise DirectoryNotFoundError(f"指定されたパスはディレクトリではありません: '{directory_path}'")
def get_file_paths(directory_path: str,filter_func: Optional[Callable[[str], bool]] = all) -> list[str]:
"""
指定されたディレクトリ内のファイルのフルパスをリストで取得します。
オプションのフィルター関数で結果を絞り込むことができます。
Args:
directory_path: 対象のディレクトリパス。
filter_func: オプション。ファイルパス(文字列)を引数に取り、
含める場合はTrue、除外する場合はFalseを返す関数。
指定しない場合は、全てのファイルが対象になります。
Returns:
条件に合致したファイルのフルパス文字列のリスト。
Raises:
DirectoryNotFoundError: 指定されたパスが存在しないか、ディレクトリではない場合。
"""
check_directory_exists(directory_path)
path = Path(directory_path)
# ファイルパスのリストを生成
file_paths = []
for p in path.iterdir():
# パスがファイルであることを確認
if p.is_file():
path_str = str(p.resolve())
# フィルター関数が指定されていない、または関数がTrueを返した場合に追加
if filter_func is None or filter_func(Path(path_str).name):
file_paths.append(path_str)
return file_paths
def get_file_paths_recur(directory_path: str,filter_func: Optional[Callable[[str], bool]] = all) -> list[str]:
"""
指定されたディレクトリから再帰的に全てのファイルのパスを取得します。
オプションのフィルター関数で結果を絞り込むことができます。
Args:
directory_path: 検索を開始するディレクトリのパス。
filter_func: オプション。ファイルパス(文字列)を引数に取り、
含める場合はTrue、除外する場合はFalseを返す関数。
指定しない場合は、全てのファイルが対象になります。
Returns:
条件に合致したファイルのフルパス文字列のリスト。
Raises:
DirectoryNotFoundError: 指定されたパスが存在しないか、ディレクトリではない場合。
"""
check_directory_exists(directory_path)
path = Path(directory_path)
# path.rglob('*') を使って再帰的に全てのパスを取得
all_paths = path.rglob('*')
# ファイルであり、かつフィルター条件を満たすものだけをリスト化
file_paths = [
str(p.resolve()) for p in all_paths if p.is_file() and (filter_func is None or filter_func(p.name))
]
return file_paths
# ----- 以下、使用例 -----
if __name__ == '__main__':
from file_filter import full_matcher,extension
all_files = get_file_paths_recur(r".",extension(".py"))
print("見つかった全てのファイル:")
for file in all_files:
print(f" - {file}")
print("-" * 30)
# ケース2: 拡張子が .txt のファイルのみを