愛記システムの基本設計:DApps側である愛記システム データの一貫性 監査ログ | 続・ティール組織 研究会のブログ

続・ティール組織 研究会のブログ

ティール組織が話題になっているが、具現化するにはどうしたらよいか?
その研究を続けるにあたり、さらに次の形態である、続・ティール組織なるものまで視野に入れ、具体的な施策・行動内容を研究・支援する会。

先までは、"愛記"についての記載で、どのようにブロックチェーンSNSに組み込んで実装していけばよいのか、概念的なところからアプローチ方法を記載していった。概念設計としてはひとまず終えた。次は、フェデレーションモデル全体の基本設計といえるところまで、基本設計書に着手できるようなところまで、概念を具体化していきたい。そして、それにつながるDApps側である「愛記システム」を、Pythonプログラムで開発していきたい。

 

愛の行動のPL,BSを決算書として、個人単位、市町村単位、で公表するような愛記システムというものを考えている。愛の行動のデータベースはブロックチェーンのプログラムであり、日々の愛の行動による愛貨の移動を決算書にまとめていきたい。なお、市町村のブロックチェーンのプログラムは以前にも記載している。その市町村のブロックチェーンのプログラムにつながる愛記システムを、DApps側であるPythonプログラムとして設計したい。その場合、基本設計をどのような手順で進めていけばよいか、詳しく見ていこう。

 

愛記システムを設計するための基本手順を以下に示す。このシステムは、Pythonを用いて市町村のブロックチェーンと連携し、個人および市町村単位での愛の行動のデータを収集、記録し、決算書(PL、BS)として公表するものである。

基本設計のステップ

  1. 要件定義
  2. アーキテクチャ設計
  3. データベース設計
  4. API設計
  5. ブロックチェーンインターフェース
  6. 決算書の生成
  7. フロントエンド開発
  8. テストとデプロイ

基本設計の各ステップを順番に進めることで、ブロックチェーンとDAppsとして繋がる「愛記システム」の詳細な設計が可能になる。各ステップでは、関係者との協議やレビューを通じて設計内容を確定していくことが重要である。

1.要件定義

まず、基本設計の最初のステップである要件定義をしていきたい。どのような機能が必要か、どのような問題を解決するのかを洗い出したい。要件定義はシステム設計の最初の重要なステップであり、システムが解決するべき問題と、必要な機能を明確に定義するプロセスである。以下に、愛記システムのプログラムに必要な機能と解決すべき問題を列挙してみよう。

機能要件

  1. 愛の行動の記録

  2. 愛貨の移動の記録

  3. 決算書の生成

  4. 個人および市町村単位でのデータの集約

  5. データのブロックチェーンへの記録と取得

  6. 愛貨の管理

  7. ユーザー管理

  8. 通知機能

  9. レポート機能

  10. ダッシュボード

非機能要件

  1. セキュリティ

  2. 可用性

  3. パフォーマンス

  4. スケーラビリティ

  5. ユーザビリティ

  6. コンプライアンス

解決すべき問題

  1. 透明性と信頼性の確保

  2. データの一元管理

  3. 愛の行動の促進

  4. 評価制度の確立

  5. データのセキュリティとプライバシーの保護

これらの要件を基に、愛記システムの基本設計を進めていくことが重要である。次のステップでは、これらの要件を具体的なアーキテクチャ設計に反映していくことになる。まずは、要件定義の解決すべき問題を一つずつクリアにしていきたい。

透明性と信頼性の確保

1. データの完全性と一貫性の保証

  • データの完全性: すべての取引と愛の行動記録が正確かつ改ざんされていないことを保証する。

  • データの一貫性: システム全体でデータが統一されていることを確認する。

2. 取引の透明性の確保

  • 公開取引データ: 取引データを公開し、誰でも確認できるようにする。

  • 監査ログ: 取引や行動のすべての変更履歴を保持し、監査可能にする。

3. データのセキュリティとプライバシー保護

  • 暗号化: データの送受信時および保存時に暗号化を行い、データの安全性を確保。

  • アクセス制御: データへのアクセスを制御し、必要な権限を持つユーザーのみに限定。

4. 取引の正当性の確認

  • 署名の生成と検証: 取引データの署名を生成し、取引の正当性を検証。

  • 取引の検証: 各取引を検証し、不正行為や二重支出を防止。

これらの項目を詳細に決定し、実装することで、愛記システムの透明性と信頼性を確保することができる。各項目については、具体的な技術要件や設計仕様を定義し、システム開発の各フェーズで反映させることが重要である。

データの一貫性について

1. データの一貫性

システム全体でデータが統一されていることを確認するために、次の点に注意する。

  • トランザクションのアトミック性: トランザクションが完全に成功するか、まったく成功しないかを保証する。
  • データの整合性チェック: データが一貫性を保つための整合性チェックを設計する。
  • データの同期: 分散システム間でデータの同期を適切に行う。

2. データの完全性
データが損なわれないようにするために、次の点に注意する。

  • データの暗号化: データの機密性を確保するために、保存時および通信時にデータを暗号化する。
  • データのバリデーション: 入力データをバリデーションして、不正なデータがシステムに入らないようにする。

3. 監査ログ

すべての操作を監査ログに記録し、後から検証可能にする。では、以下に、データの完全性と一貫性を確保するための具体的なプログラム例を示す。

 

・監査ログ:

すべての操作を監査ログに記録し、後から検証可能にするというが、具体的にいつ、何を、どうしたらいいのか、設計案をプログラムで示したい。なお、監査ログを導入することで、すべての操作を記録し、後から検証可能にする。これにより、データの完全性と一貫性を確保できる。以下に、市町村のブロックチェーンプログラムに監査ログ機能を追加する具体的な設計案とプログラム例を示す。

・設計案

  1. 監査ログデータ構造の定義

    • 操作の種類(トランザクション受信、承認、ブロック生成など)
    • 操作の詳細(トランザクションID、承認者情報など)
    • タイムスタンプ
    • 操作結果(成功、失敗)
  2. 監査ログの記録タイミング

    • トランザクション受信時
    • トランザクション承認時
    • ブロック生成時
    • メインチェーンへの同期時
  3. 監査ログの記録方法

    • 各操作の後にログを記録する関数を呼び出す

・プログラム例

以下に、監査ログ機能を含む市町村のブロックチェーンプログラムの例を示す。
use actix_web::{post, web, App, HttpResponse, HttpServer, Responder};
use pqcrypto_ntru::ntruhps4096821::*;
use serde::{Deserialize, Serialize};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// データ構造定義
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Polygon {
    coordinates: Vec<(f64, f64)>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Municipality {
    name: String,
    polygon: Polygon,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Transaction {
    transaction_id: String,
    sender: String,
    receiver: String,
    amount: f64,
    action_content: String,
    location: String,
    municipality: String,
    timestamp: String,
    signature: String,
    proof_of_place: Option<ProofOfPlace>,
    approver_info: Option<ApproverInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProofOfPlace {
    latitude: f64,
    longitude: f64,
    timestamp: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct ApproverInfo {
    user_id: String,
    public_key: String,
    approval_timestamp: String,
    signature: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Block {
    index: u64,
    timestamp: String,
    transactions: Vec<Transaction>,
    previous_hash: String,
    hash: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct AuditLog {
    operation: String,
    details: String,
    timestamp: String,
    result: String,
}

struct AppState {
    pending_transactions: Mutex<HashMap<String, Transaction>>,
    balances: Mutex<HashMap<String, f64>>,
    municipalities: Mutex<HashMap<String, Municipality>>,
    blocks: Mutex<Vec<Block>>,
    audit_logs: Mutex<Vec<AuditLog>>, // 監査ログを保持するためのベクター
}

// トランザクション受信
#[post("/transactions")]
async fn receive_transaction(data: web::Data<AppState>, req: web::Json<Transaction>) -> impl Responder {
    let transaction = req.into_inner();
    let mut pending_transactions = data.pending_transactions.lock().unwrap();

    // トランザクションの検証フェーズ
    if !verify_transaction(&transaction) {
        record_audit_log(data.clone(), "Transaction Received".to_string(), format!("{:?}", transaction), "Failed".to_string());
        return HttpResponse::BadRequest().json("Invalid transaction");
    }

    pending_transactions.insert(transaction.transaction_id.clone(), transaction.clone());
    record_audit_log(data.clone(), "Transaction Received".to_string(), format!("{:?}", transaction), "Success".to_string());
    HttpResponse::Ok().json("Transaction received")
}

// 承認処理
#[post("/approve")]
async fn approve_transaction(data: web::Data<AppState>, req: web::Json<(String, ApproverInfo)>) -> impl Responder {
    let (transaction_id, approver_info) = req.into_inner();
    let mut pending_transactions = data.pending_transactions.lock().unwrap();
    let mut balances = data.balances.lock().unwrap();

    if let Some(mut transaction) = pending_transactions.remove(&transaction_id) {
        if let Some(proof_of_place) = &transaction.proof_of_place {
            if verify_proof_of_place(&data, proof_of_place) {
                let amount = transaction.amount;
                *balances.entry(transaction.sender.clone()).or_insert(0.0) -= amount;
                *balances.entry(transaction.receiver.clone()).or_insert(0.0) += amount;

                transaction.approver_info = Some(approver_info.clone());

                // 承認時のPoPとPoHを記録
                transaction.proof_of_place = Some(proof_of_place.clone());

                // ブロック生成フェーズ
                if !add_transaction_to_block(data.clone(), transaction.clone()) {
                    record_audit_log(data.clone(), "Transaction Approved".to_string(), format!("{:?}", transaction), "Failed".to_string());
                    return HttpResponse::InternalServerError().json("Failed to add transaction to block");
                }

                record_audit_log(data.clone(), "Transaction Approved".to_string(), format!("{:?}", transaction), "Success".to_string());
                HttpResponse::Ok().json("Transaction approved")
            } else {
                record_audit_log(data.clone(), "Transaction Approved".to_string(), format!("{:?}", transaction), "Failed - Invalid Proof of Place".to_string());
                HttpResponse::BadRequest().json("Invalid Proof of Place")
            }
        } else {
            record_audit_log(data.clone(), "Transaction Approved".to_string(), format!("{:?}", transaction), "Failed - Proof of Place not found".to_string());
            HttpResponse::BadRequest().json("Proof of Place not found")
        }
    } else {
        record_audit_log(data.clone(), "Transaction Approved".to_string(), format!("{:?}", transaction), "Failed - Transaction not found".to_string());
        HttpResponse::BadRequest().json("Transaction not found")
    }
}

// PoP検証
fn verify_proof_of_place(data: &web::Data<AppState>, pop: &ProofOfPlace) -> bool {
    let municipalities = data.municipalities.lock().unwrap();
    for municipality in municipalities.values() {
        if is_inside_polygon(&municipality.polygon, pop.latitude, pop.longitude) {
            return true;
        }
    }
    false
}

// ポリゴン内確認
fn is_inside_polygon(polygon: &Polygon, latitude: f64, longitude: f64) -> bool {
    let mut is_inside = false;
    let mut j = polygon.coordinates.len() - 1;

    for i in 0..polygon.coordinates.len() {
        let (xi, yi) = polygon.coordinates[i];
        let (xj, yj) = polygon.coordinates[j];

        let intersect = ((yi > longitude) != (yj > longitude))
            && (latitude < (xj - xi) * (longitude - yi) / (yj - yi) + xi);

        if intersect {
            is_inside = !is_inside;
        }

        j = i;
    }

    is_inside
}

// トランザクションの検証
fn verify_transaction(transaction: &Transaction) -> bool {
    // トランザクションID、金額、署名の検証など
    if transaction.transaction_id.is_empty() || transaction.amount <= 0.0 || transaction.signature.is_empty() {
        return false;
    }
    true
}

// ブロック生成フェーズ
fn add_transaction_to_block(data: web::Data<AppState>, transaction: Transaction) -> bool {
    let mut blocks = data.blocks.lock().unwrap();
    let previous_block = blocks.last().cloned();

    let new_block = Block {
        index: previous_block.map_or(0, |b| b.index + 1),
        timestamp: Utc::now().to_string(),
        transactions: vec![transaction],
        previous_hash: previous_block.map_or(String::new(), |b| b.hash),
        hash: String::new(), // 一旦空で作成し、後でハッシュを計算する
    };

    if !verify_block(&new_block) {
        return false;
    }

    // ハッシュの計算
    let block_hash = calculate_block_hash(&new_block);
    let mut final_block = new_block.clone();
    final_block.hash = block_hash;

    blocks.push(final_block);
    true
}

// ブロックの検証
fn verify_block(block: &Block) -> bool {
    // データの整合性チェックなど
    !block.transactions.is_empty() && !block.timestamp.is_empty()
}

// ブロックのハッシュを計算
fn calculate_block_hash(block: &Block) -> String {
    let block_data = format!("{}{}{:?}{}", block.index, block.timestamp, block.transactions, block.previous_hash);
    let mut hasher = Sha256::new();
    hasher.update(block_data.as_bytes());
    let result = hasher.finalize();
    format!("{:x}", result)
}

// メインチェーンへの同期フェーズ
fn sync_to_main_chain(data: web::Data<AppState>) -> bool {
    let blocks = data.blocks.lock().unwrap();
    for block in blocks.iter() {
        if !verify_block(block) {
            return false;
        }
    }
    true
}

// 監査ログの記録
fn record_audit_log(data: web::Data<AppState>, operation: String, details: String, result: String) {
    let timestamp = Utc::now().to_string();
    let audit_log = AuditLog {
        operation,
        details,
        timestamp,
        result,
    };
    let mut audit_logs = data.audit_logs.lock().unwrap();
    audit_logs.push(audit_log);
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let app_state = web::Data::new(AppState {
        pending_transactions: Mutex::new(HashMap::new()),
        balances: Mutex::new(HashMap::new()),
        municipalities: Mutex::new(HashMap::new()),
        blocks: Mutex::new(Vec::new()),
        audit_logs: Mutex::new(Vec::new()),
    });

    HttpServer::new(move || {
        App::new()
            .app_data(app_state.clone())
            .service(receive_transaction)
            .service(approve_transaction)
    })
    .bind("0.0.0.0:8080")?
    .run()
    .await
}
 

・説明:

  1. トランザクションの受信

    • receive_transaction 関数でトランザクションを受信し、verify_transaction 関数でデータの形式と内容を検証する。
    • 検証が成功すればトランザクションを pending_transactions に追加し、監査ログを記録する。
  2. トランザクションの承認

    • approve_transaction 関数でトランザクションを承認する。
    • verify_proof_of_place 関数でPoPを検証し、承認が成功すればトランザクションをブロックに追加する。
    • 監査ログを記録する。
  3. ブロックの生成と検証

    • add_transaction_to_block 関数でトランザクションを新しいブロックに追加し、verify_block 関数でデータの一貫性を確認する。
    • ブロックのハッシュを計算してブロックチェーンに追加する。
  4. メインチェーンへの同期

    • sync_to_main_chain 関数でブロックチェーンをメインチェーンに同期させる。
    • ブロックのデータ整合性を再確認する。
  5. 監査ログの記録

    • record_audit_log 関数で操作の詳細と結果を監査ログに記録する。
       

○DApps側であるPythonプログラム

以下に、DApps側であるPythonプログラムに監査ログ機能を追加する具体的な設計案とプログラム例を上記同様に示す。

・設計案

  1. 監査ログデータ構造の定義

    • 操作の種類(トランザクション生成、送信、受信など)
    • 操作の詳細(トランザクションID、ユーザー情報など)
    • タイムスタンプ
    • 操作結果(成功、失敗)
  2. 監査ログの記録タイミング

    • トランザクション生成時
    • トランザクション送信時
    • トランザクション受信時
  3. 監査ログの記録方法

    • 各操作の後にログを記録する関数を呼び出す

・プログラム例

以下に、監査ログ機能を含むDApps側であるPythonプログラムの例を示す。
import hashlib
import datetime
import re
import json
from pymongo import MongoClient, errors

# MongoDBクライアントとデータベースのセットアップ
client = MongoClient('mongodb://localhost:27017/')
db = client['blockchain_db']

# データ構造定義
def validate_input_data(data):
    """ユーザー入力データのバリデーション"""
    if not re.match(r'^[a-zA-Z0-9]+$', data['transaction_id']):
        raise ValueError("Invalid Transaction ID")
    if not isinstance(data['amount'], (int, float)) or data['amount'] <= 0:
        raise ValueError("Invalid Amount")
    # 他のバリデーションチェックを追加

def verify_signature(transaction_data, public_key, signature):
    """署名を検証する疑似関数"""
    return hashlib.sha256(str(transaction_data + public_key).encode()).hexdigest() == signature

def check_transaction_consistency(transaction):
    """トランザクションの整合性をチェックする"""
    if not verify_signature(transaction['data'], transaction['public_key'], transaction['signature']):
        raise ValueError("Invalid Signature")
    if transaction['timestamp'] > datetime.datetime.now().timestamp():
        raise ValueError("Future timestamp")

def record_audit_log(operation, details, result):
    """監査ログを記録する"""
    timestamp = datetime.datetime.now().isoformat()
    audit_log = {
        'operation': operation,
        'details': details,
        'timestamp': timestamp,
        'result': result
    }
    try:
        db.audit_logs.insert_one(audit_log)
        print("Audit log recorded successfully.")
    except errors.OperationFailure as e:
        print(f"Failed to record audit log: {e}")

def add_transaction(transaction):
    """トランザクションを安全にデータベースに追加する"""
    try:
        validate_input_data(transaction)
        with client.start_session() as session:
            with session.start_transaction():
                check_transaction_consistency(transaction)
                db.transactions.insert_one(transaction, session=session)
                record_audit_log("Transaction Added", transaction, "Success")
                print("Transaction added successfully.")
    except ValueError as e:
        record_audit_log("Transaction Add Failed", transaction, f"Failed - {e}")
        print(f"Error adding transaction: {e}")

def receive_transaction(transaction_id):
    """トランザクションを受信し、承認をリクエストする"""
    try:
        transaction = db.transactions.find_one({'transaction_id': transaction_id})
        if not transaction:
            raise ValueError("Transaction not found")
        
        # トランザクションを市町村のブロックチェーンに送信
        # 送信ロジック(APIリクエストなど)をここに記述
        
        record_audit_log("Transaction Sent", transaction, "Success")
        print("Transaction sent successfully.")
    except ValueError as e:
        record_audit_log("Transaction Send Failed", {'transaction_id': transaction_id}, f"Failed - {e}")
        print(f"Error sending transaction: {e}")

# トランザクションの例
transaction = {
    'transaction_id': '123456',
    'data': 'some important data',
    'timestamp': datetime.datetime.now().timestamp(),
    'public_key': 'user123publickey',
    'signature': 'validsignature123',
    'amount': 100
}

# トランザクションの追加
add_transaction(transaction)

# トランザクションの受信と送信
receive_transaction('123456')
 

・説明:

  1. トランザクションの生成

    • validate_input_data 関数でユーザー入力データをバリデーションする。
    • check_transaction_consistency 関数でトランザクションの整合性を確認する。
    • トランザクションがデータベースに追加されるときに、record_audit_log 関数で監査ログを記録する。
  2. トランザクションの送信

    • receive_transaction 関数でトランザクションを受信し、市町村のブロックチェーンに送信する(送信ロジックは省略)。
    • トランザクションが送信されたときに、record_audit_log 関数で監査ログを記録する。
  3. 監査ログの記録

    • record_audit_log 関数で操作の詳細と結果を監査ログに記録する。

このように、DApps側でトランザクションの生成、送信、受信の際に監査ログを記録することで、データの完全性と一貫性を確保できる。

 

 

いかがであろうか、今回は監査ログについて記載した。しっかりとチェックをかけていくことが後々の正確な処理に繋がっていくのだからしっかりと監査ログを入れておきたいところだ。