愛記システムの基本設計:DApps側である愛記システム データの一元管理 データ統合 | 続・ティール組織 研究会のブログ

続・ティール組織 研究会のブログ

ティール組織が話題になっているが、具現化するにはどうしたらよいか?
その研究を続けるにあたり、さらに次の形態である、続・ティール組織なるものまで視野に入れ、具体的な施策・行動内容を研究・支援する会。

先までは、"愛記"についての記載で、どのようにブロックチェーンSNSに組み込んで実装していけばよいのか、概念的なところからアプローチ方法を記載していった。概念設計としてはひとまず終えた。次は、フェデレーションモデル全体の基本設計といえるところまで、基本設計書に着手できるようなところまで、概念を具体化していきたい。そして、それにつながるDApps側である「愛記システム」を、Pythonプログラムで開発していきたい。

 

愛の行動のPL,BSを決算書として、個人単位、市町村単位、で公表するような愛記システムというものを考えている。愛の行動のデータベースはブロックチェーンのプログラムであり、日々の愛の行動による愛貨の移動を決算書にまとめていきたい。なお、市町村のブロックチェーンのプログラムは以前にも記載している。その市町村のブロックチェーンのプログラムにつながる愛記システムを、DApps側であるPythonプログラムとして設計したい。その場合、基本設計をどのような手順で進めていけばよいか、詳しく見ていこう。

 

愛記システムを設計するための基本手順を以下に示す。このシステムは、Pythonを用いて市町村のブロックチェーンと連携し、個人および市町村単位での愛の行動のデータを収集、記録し、決算書(PL、BS)として公表するものである。

基本設計のステップ

  1. 要件定義
  2. アーキテクチャ設計
  3. データベース設計
  4. API設計
  5. ブロックチェーンインターフェース
  6. 決算書の生成
  7. フロントエンド開発
  8. テストとデプロイ

基本設計の各ステップを順番に進めることで、ブロックチェーンとDAppsとして繋がる「愛記システム」の詳細な設計が可能になる。各ステップでは、関係者との協議やレビューを通じて設計内容を確定していくことが重要である。

1.要件定義

まず、基本設計の最初のステップである要件定義をしていきたい。どのような機能が必要か、どのような問題を解決するのかを洗い出したい。要件定義はシステム設計の最初の重要なステップであり、システムが解決するべき問題と、必要な機能を明確に定義するプロセスである。以下に、愛記システムのプログラムに必要な機能と解決すべき問題を列挙してみよう。

機能要件

  1. 愛の行動の記録

  2. 愛貨の移動の記録

  3. 決算書の生成

  4. 個人および市町村単位でのデータの集約

  5. データのブロックチェーンへの記録と取得

  6. 愛貨の管理

  7. ユーザー管理

  8. 通知機能

  9. レポート機能

  10. ダッシュボード

非機能要件

  1. セキュリティ

  2. 可用性

  3. パフォーマンス

  4. スケーラビリティ

  5. ユーザビリティ

  6. コンプライアンス

解決すべき問題

  1. 透明性と信頼性の確保

  2. データの一元管理

  3. 愛の行動の促進

  4. 評価制度の確立

  5. データのセキュリティとプライバシーの保護

これらの要件を基に、愛記システムの基本設計を進めていくことが重要である。次のステップでは、これらの要件を具体的なアーキテクチャ設計に反映していくことになる。まずは、要件定義の解決すべき問題を一つずつクリアにしていきたい。

データの一元管理

愛記システムにおけるデータの一元管理は、データの収集、統合、保存、アクセス制御、およびバックアップを適切に行うことが重要である。以下に、これを実現するための設計を示す。

1. データ収集

・タイミング:

  • ユーザーが愛の行動を記録したとき

  • 愛貨の移動が発生したとき

  • 定期的な集計およびレポート生成時

・データの種類:

  • 愛の行動データ(行動の内容、日時、場所、関与者)

  • 愛貨の移動データ(送信者、受信者、金額、移動日時)

  • 決算データ(収支、資産、負債)

  • ユーザーデータ(ユーザーID、プロフィール、認証情報)

  • 市町村データ(市町村ID、名前、地理情報)

・方法:

  • 各種データを専用のAPIエンドポイントを通じて収集

  • フォーム入力、スキャン、GPSデータなどのユーザーインターフェースからの入力

2. データ統合

・タイミング:

  • リアルタイムでのデータ入力時

  • 定期的なバッチ処理(例: 毎日夜間)

・データの種類:

  • 各データソースから取得した生データを統合データベースに変換

・方法:

  • ETL(Extract, Transform, Load)プロセスを使用して、異なるフォーマットのデータを統合

  • リアルタイムデータのストリーム処理(例: Kafkaなどのデータストリーミングプラットフォーム)

3. データ保存

・タイミング:

  • データ収集時

  • データ統合後

・データの種類:

  • 統合データベース(例: MongoDB、PostgreSQL)

  • ブロックチェーンデータ

・方法:

  • 各データベースに適切にインデックスを設定し、効率的なデータ格納を実現

  • ブロックチェーンに重要データをハッシュ化して記録、ハッシュのみをオンチェーンに保存し、詳細データはオフチェーンに保存

4. アクセス制御

・タイミング:

  • データアクセスリクエスト時

・データの種類:

  • ユーザー認証情報

  • アクセス制御リスト(ACL)

・方法:

  • JWT(JSON Web Tokens)を使用してユーザー認証を行い、各APIリクエストの認証と認可を実施

  • ユーザーごとにアクセス権限を設定し、アクセス制御リストで管理

  • RBAC(Role-Based Access Control)を導入し、役割ごとにアクセス権限を定義

5. データのバックアップとリカバリ

・タイミング:

  • 定期的なバックアップ(例: 毎日、毎週)

  • システム障害発生時

・データの種類:

  • 全データベースのスナップショット

  • ブロックチェーンデータ

・方法:

  • 定期的なバックアップスケジュールを設定し、バックアップをクラウドストレージやオフサイトストレージに保存

  • 自動バックアップと手動バックアップの両方をサポート

  • データリカバリ計画を策定し、定期的にリカバリテストを実施
     

データ統合の設計

データ統合は、リアルタイムでのデータ入力時および定期的なバッチ処理によって行う。ETLプロセスとリアルタイムデータのストリーム処理を使用して、異なるフォーマットのデータを統合する。
 

リアルタイム処理:

リアルタイムでのデータ入力時には、データがDAppsに到達した際に即座に処理される。データは専用のAPIエンドポイントを通じて収集され、MongoDBに保存される。リアルタイムデータのストリーム処理は、データが発生した瞬間に即座に処理・分析・保存を行う方法である。この処理を実現するために、Kafkaなどのデータストリーミングプラットフォームを使用する。Kafkaは、データのパイプラインを構築し、大量のデータをリアルタイムで処理するための分散型メッセージングシステムである。
 

定期的なバッチ処理:

定期的なバッチ処理は、評価制度や市町村分析要素などの集計・分析に使用される。毎日夜間にバッチ処理を実行し、データを統合データベースに保存する。

1. データ収集
・タイミング

  • リアルタイム: ユーザーが愛の行動を記録したとき、または愛貨の移動が発生したときに即座にデータを収集する。
  • 定期バッチ: 毎日夜間に定期的なバッチ処理でデータを収集・集計する。

・データの種類

  • 愛の行動データ: 行動の内容、日時、場所、関与者
  • 愛貨の移動データ: 送信者、受信者、金額、移動日時
  • 決算データ: 収支、資産、負債
  • ユーザーデータ: ユーザーID、プロフィール、認証情報
  • 市町村データ: 市町村ID、名前、地理情報

・方法

  • リアルタイム処理: Kafkaを使用したデータストリーミング
  • バッチ処理: データベースから定期的にデータを抽出・変換・ロード(ETL)

2. データ統合の流れ
・リアルタイム処理

  • データプロデューサー: ユーザーの愛の行動データをKafkaに送信
  • Kafkaトピック: データを保持し、コンシューマーに送信
  • データコンシューマー: Kafkaトピックからデータを受信し、MongoDBなどのデータベースに保存

・バッチ処理

  • データ抽出: MongoDBから愛の行動データ、愛貨の移動データ、決算データを抽出
  • データ変換: 必要なフォーマットに変換
  • データロード: データを統合データベースに保存

3. データ統合の設計とプログラム例
・Kafkaデータプロデューサー
○役割
Kafkaデータプロデューサーは、データを生成し、それをKafkaトピックに送信する役割を担う。具体的には、アプリケーションやデータソースからデータを収集し、Kafkaブローカーにデータを送信する。
○動作
1.データ生成:
アプリケーション内部で生成されたデータや外部データソースから取得されたデータを収集する。例えば、ユーザーの愛の行動データや愛貨の移動データなどが該当する。
2.データのシリアライズ:
収集したデータをJSON形式などのシリアライズされた形式に変換する。これにより、データがKafkaトピックに適した形式で送信される。
3.Kafkaトピックへのデータ送信:
シリアライズされたデータをKafkaトピックに送信する。送信するトピックは、例えばlove_action_dataやlove_currency_movementsなどである。
4.Kafkaデータプロデューサーのプログラム例:
以下に、具体的なPythonのコード例を示す。このコードでは、Kafkaプロデューサーを用いてデータを生成し、Kafkaトピックに送信する処理を行う。

# data_producer.py
from kafka import KafkaProducer
import json
from datetime import datetime

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def send_love_action_data(action_type, details, location, user_id, municipality_id):
    data = {
        "action_type": action_type,
        "details": details,
        "location": location,
        "timestamp": datetime.utcnow().isoformat(),
        "user_id": user_id,
        "municipality_id": municipality_id
    }
    producer.send('love_action_data', value=data)
    producer.flush()

# データ送信例
send_love_action_data("Helping others", "Helped an elderly person cross the street", "City A", "user123", "municipalityA")


・Kafkaデータコンシューマー
Kafkaデータコンシューマーは、Kafkaトピックからデータを受信し、受信したデータをデータベースに保存する役割を果たす。以下に具体的な役割とその動作について説明する。
・Kafkaデータコンシューマーの役割と動作
○役割
データ受信: Kafkaトピック(例: love_action_data)からメッセージを継続的に受信する。
データ処理: 受信したデータを適切な形式に変換し、データベースに保存する。
データ保存: データベース(例: MongoDB)にデータを格納し、後での利用や分析に備える。
○動作
1.Kafkaコンシューマーのセットアップ:
Kafkaコンシューマーを初期化し、特定のトピック(例: love_action_data)にサブスクライブする。コンシューマーは、指定されたKafkaサーバーに接続し、トピックからメッセージを受信する準備を行う。
2.メッセージの受信:
Kafkaトピックに新しいメッセージが到着すると、コンシューマーはそのメッセージを受信する。メッセージはJSON形式でエンコードされており、デシリアライズ(JSONからPythonの辞書に変換)される。
3.データベースへの保存:
デシリアライズされたデータは、指定されたデータベース(例: MongoDB)の適切なコレクション(例: love_actions)に保存される。保存されたデータは後での利用や分析に使用できる。
4.Kafkaデータコンシューマーのプログラム例:
以下に、具体的なPythonのコード例を示す。このコードでは、Kafkaコンシューマーを用いてリアルタイムでデータを受信し、MongoDBに保存する処理を行う。

# data_consumer.py
from kafka import KafkaConsumer
import json
import pymongo

consumer = KafkaConsumer(
    'love_action_data',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["love_currency_db"]

for message in consumer:
    love_action_data = message.value
    db.love_actions.insert_one(love_action_data)
    print(f"Received and stored data: {love_action_data}")


・バッチ処理
○役割
バッチ処理は、一定期間ごとに大量のデータをまとめて処理し、データの集計、分析、統合を行う役割を担う。リアルタイム処理では対処しきれない大規模なデータセットや複雑な処理を効率的に行うために使用される。
○動作
1.データ収集:
データベースやログファイルから一定期間分のデータを一括で抽出する。例: 前日分の愛の行動データや愛貨の移動データを収集。
2.データの変換と集計:
収集したデータを必要な形式に変換し、集計処理を行う。例: 各ユーザーごとの愛貨の移動履歴を集計し、月次の収支報告書を作成。
3.データの統合と保存:
集計・変換されたデータを統合データベースやデータウェアハウスに保存する。例: データウェアハウスに月次の愛貨の移動データを保存し、分析用に整備。
4.バッチ処理のプログラム例:
以下に、Pythonを用いたバッチ処理の具体的な例を示す。このコードは、MongoDBから前日分のデータを収集し、集計・変換してデータウェアハウスに保存する流れを示している。
# batch_processor.py
import pymongo
import pandas as pd

client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["love_currency_db"]

def extract_data():
    love_actions = list(db.love_actions.find())
    love_movements = list(db.love_movements.find())
    financial_statements = list(db.financial_statements.find())
    return love_actions, love_movements, financial_statements

def transform_data(love_actions, love_movements, financial_statements):
    # 必要なデータ変換処理を実施
    love_actions_df = pd.DataFrame(love_actions)
    love_movements_df = pd.DataFrame(love_movements)
    financial_statements_df = pd.DataFrame(financial_statements)
    return love_actions_df, love_movements_df, financial_statements_df

def load_data(love_actions_df, love_movements_df, financial_statements_df):
    db.integrated_data.love_actions.insert_many(love_actions_df.to_dict('records'))
    db.integrated_data.love_movements.insert_many(love_movements_df.to_dict('records'))
    db.integrated_data.financial_statements.insert_many(financial_statements_df.to_dict('records'))

def main():
    love_actions, love_movements, financial_statements = extract_data()
    love_actions_df, love_movements_df, financial_statements_df = transform_data(love_actions, love_movements, financial_statements)
    load_data(love_actions_df, love_movements_df, financial_statements_df)

if __name__ == "__main__":
    main()

4. DApps側での処理
・愛の行動データの記録
# dapps/app.py
from flask import Flask, request, jsonify
from data_producer import send_love_action_data

app = Flask(__name__)

@app.route("/record_action", methods=["POST"])
def record_action():
    data = request.json
    action_type = data.get("action_type")
    details = data.get("details")
    location = data.get("location")
    user_id = data.get("user_id")
    municipality_id = data.get("municipality_id")

    send_love_action_data(action_type, details, location, user_id, municipality_id)
    return jsonify({"message": "Action recorded successfully"}), 200

if __name__ == "__main__":
    app.run(debug=True)


・愛貨の移動データの記録
# dapps/app.py (continued)
@app.route("/record_movement", methods=["POST"])
def record_movement():
    data = request.json
    sender_id = data.get("sender_id")
    receiver_id = data.get("receiver_id")
    amount = data.get("amount")
    timestamp = datetime.utcnow().isoformat()
    movement_data = {
        "sender_id": sender_id,
        "receiver_id": receiver_id,
        "amount": amount,
        "timestamp": timestamp
    }

    db.love_movements.insert_one(movement_data)
    return jsonify({"message": "Movement recorded successfully"}), 200

・まとめ

以上が、リアルタイム処理とバッチ処理を組み合わせてデータの一元管理を実現するための設計と実装例である。リアルタイム処理ではKafkaを用いてデータを即座に収集・保存し、バッチ処理では定期的にデータを抽出・変換・統合して保存する。この設計により、愛の行動データ、ユーザーデータ、市町村データ、愛貨の移動データ、決算データが適切に管理される。
 

 

いかがであろうか、今回はデータ一元管理におけるデータ統合について記載した。データ収集と保存を即座に行う処理はとても利便性が高い。このように設計していくと効率的だろう。